langfuse
Langfuse Python SDK
Installation
The SDK was rewritten in v3 and released in June 2025. Refer to the v3 migration guide for instructions on updating your code.
pip install langfuse
Docs
Please see our docs for detailed information on this SDK.
1""".. include:: ../README.md""" 2 3from langfuse.experiment import Evaluation 4 5from ._client import client as _client_module 6from ._client.attributes import LangfuseOtelSpanAttributes 7from ._client.constants import ObservationTypeLiteral 8from ._client.get_client import get_client 9from ._client.observe import observe 10from ._client.span import ( 11 LangfuseAgent, 12 LangfuseChain, 13 LangfuseEmbedding, 14 LangfuseEvaluator, 15 LangfuseEvent, 16 LangfuseGeneration, 17 LangfuseGuardrail, 18 LangfuseRetriever, 19 LangfuseSpan, 20 LangfuseTool, 21) 22 23Langfuse = _client_module.Langfuse 24 25__all__ = [ 26 "Langfuse", 27 "get_client", 28 "observe", 29 "ObservationTypeLiteral", 30 "LangfuseSpan", 31 "LangfuseGeneration", 32 "LangfuseEvent", 33 "LangfuseOtelSpanAttributes", 34 "LangfuseAgent", 35 "LangfuseTool", 36 "LangfuseChain", 37 "LangfuseEmbedding", 38 "LangfuseEvaluator", 39 "LangfuseRetriever", 40 "LangfuseGuardrail", 41 "Evaluation", 42 "experiment", 43 "api", 44]
116class Langfuse: 117 """Main client for Langfuse tracing and platform features. 118 119 This class provides an interface for creating and managing traces, spans, 120 and generations in Langfuse as well as interacting with the Langfuse API. 121 122 The client features a thread-safe singleton pattern for each unique public API key, 123 ensuring consistent trace context propagation across your application. It implements 124 efficient batching of spans with configurable flush settings and includes background 125 thread management for media uploads and score ingestion. 126 127 Configuration is flexible through either direct parameters or environment variables, 128 with graceful fallbacks and runtime configuration updates. 129 130 Attributes: 131 api: Synchronous API client for Langfuse backend communication 132 async_api: Asynchronous API client for Langfuse backend communication 133 _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components 134 135 Parameters: 136 public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable. 137 secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable. 138 host (Optional[str]): The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_HOST environment variable. 139 timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. 140 httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. 141 debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. 142 tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. 143 flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. 144 flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable. 145 environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. 146 release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release. 147 media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. 148 sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. 149 mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. 150 blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`) 151 additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. 152 tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees. 153 154 Example: 155 ```python 156 from langfuse.otel import Langfuse 157 158 # Initialize the client (reads from env vars if not provided) 159 langfuse = Langfuse( 160 public_key="your-public-key", 161 secret_key="your-secret-key", 162 host="https://cloud.langfuse.com", # Optional, default shown 163 ) 164 165 # Create a trace span 166 with langfuse.start_as_current_span(name="process-query") as span: 167 # Your application code here 168 169 # Create a nested generation span for an LLM call 170 with span.start_as_current_generation( 171 name="generate-response", 172 model="gpt-4", 173 input={"query": "Tell me about AI"}, 174 model_parameters={"temperature": 0.7, "max_tokens": 500} 175 ) as generation: 176 # Generate response here 177 response = "AI is a field of computer science..." 178 179 generation.update( 180 output=response, 181 usage_details={"prompt_tokens": 10, "completion_tokens": 50}, 182 cost_details={"total_cost": 0.0023} 183 ) 184 185 # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) 186 generation.score(name="relevance", value=0.95, data_type="NUMERIC") 187 ``` 188 """ 189 190 _resources: Optional[LangfuseResourceManager] = None 191 _mask: Optional[MaskFunction] = None 192 _otel_tracer: otel_trace_api.Tracer 193 194 def __init__( 195 self, 196 *, 197 public_key: Optional[str] = None, 198 secret_key: Optional[str] = None, 199 host: Optional[str] = None, 200 timeout: Optional[int] = None, 201 httpx_client: Optional[httpx.Client] = None, 202 debug: bool = False, 203 tracing_enabled: Optional[bool] = True, 204 flush_at: Optional[int] = None, 205 flush_interval: Optional[float] = None, 206 environment: Optional[str] = None, 207 release: Optional[str] = None, 208 media_upload_thread_count: Optional[int] = None, 209 sample_rate: Optional[float] = None, 210 mask: Optional[MaskFunction] = None, 211 blocked_instrumentation_scopes: Optional[List[str]] = None, 212 additional_headers: Optional[Dict[str, str]] = None, 213 tracer_provider: Optional[TracerProvider] = None, 214 ): 215 self._host = host or cast( 216 str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 217 ) 218 self._environment = environment or cast( 219 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 220 ) 221 self._project_id: Optional[str] = None 222 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 223 if not 0.0 <= sample_rate <= 1.0: 224 raise ValueError( 225 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 226 ) 227 228 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 229 230 self._tracing_enabled = ( 231 tracing_enabled 232 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 233 ) 234 if not self._tracing_enabled: 235 langfuse_logger.info( 236 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 237 ) 238 239 debug = ( 240 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 241 ) 242 if debug: 243 logging.basicConfig( 244 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 245 ) 246 langfuse_logger.setLevel(logging.DEBUG) 247 248 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 249 if public_key is None: 250 langfuse_logger.warning( 251 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 252 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 253 ) 254 self._otel_tracer = otel_trace_api.NoOpTracer() 255 return 256 257 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 258 if secret_key is None: 259 langfuse_logger.warning( 260 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 261 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 262 ) 263 self._otel_tracer = otel_trace_api.NoOpTracer() 264 return 265 266 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 267 langfuse_logger.warning( 268 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 269 ) 270 271 # Initialize api and tracer if requirements are met 272 self._resources = LangfuseResourceManager( 273 public_key=public_key, 274 secret_key=secret_key, 275 host=self._host, 276 timeout=timeout, 277 environment=self._environment, 278 release=release, 279 flush_at=flush_at, 280 flush_interval=flush_interval, 281 httpx_client=httpx_client, 282 media_upload_thread_count=media_upload_thread_count, 283 sample_rate=sample_rate, 284 mask=mask, 285 tracing_enabled=self._tracing_enabled, 286 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 287 additional_headers=additional_headers, 288 tracer_provider=tracer_provider, 289 ) 290 self._mask = self._resources.mask 291 292 self._otel_tracer = ( 293 self._resources.tracer 294 if self._tracing_enabled and self._resources.tracer is not None 295 else otel_trace_api.NoOpTracer() 296 ) 297 self.api = self._resources.api 298 self.async_api = self._resources.async_api 299 300 def start_span( 301 self, 302 *, 303 trace_context: Optional[TraceContext] = None, 304 name: str, 305 input: Optional[Any] = None, 306 output: Optional[Any] = None, 307 metadata: Optional[Any] = None, 308 version: Optional[str] = None, 309 level: Optional[SpanLevel] = None, 310 status_message: Optional[str] = None, 311 ) -> LangfuseSpan: 312 """Create a new span for tracing a unit of work. 313 314 This method creates a new span but does not set it as the current span in the 315 context. To create and use a span within a context, use start_as_current_span(). 316 317 The created span will be the child of the current span in the context. 318 319 Args: 320 trace_context: Optional context for connecting to an existing trace 321 name: Name of the span (e.g., function or operation name) 322 input: Input data for the operation (can be any JSON-serializable object) 323 output: Output data from the operation (can be any JSON-serializable object) 324 metadata: Additional metadata to associate with the span 325 version: Version identifier for the code or component 326 level: Importance level of the span (info, warning, error) 327 status_message: Optional status message for the span 328 329 Returns: 330 A LangfuseSpan object that must be ended with .end() when the operation completes 331 332 Example: 333 ```python 334 span = langfuse.start_span(name="process-data") 335 try: 336 # Do work 337 span.update(output="result") 338 finally: 339 span.end() 340 ``` 341 """ 342 return self.start_observation( 343 trace_context=trace_context, 344 name=name, 345 as_type="span", 346 input=input, 347 output=output, 348 metadata=metadata, 349 version=version, 350 level=level, 351 status_message=status_message, 352 ) 353 354 def start_as_current_span( 355 self, 356 *, 357 trace_context: Optional[TraceContext] = None, 358 name: str, 359 input: Optional[Any] = None, 360 output: Optional[Any] = None, 361 metadata: Optional[Any] = None, 362 version: Optional[str] = None, 363 level: Optional[SpanLevel] = None, 364 status_message: Optional[str] = None, 365 end_on_exit: Optional[bool] = None, 366 ) -> _AgnosticContextManager[LangfuseSpan]: 367 """Create a new span and set it as the current span in a context manager. 368 369 This method creates a new span and sets it as the current span within a context 370 manager. Use this method with a 'with' statement to automatically handle span 371 lifecycle within a code block. 372 373 The created span will be the child of the current span in the context. 374 375 Args: 376 trace_context: Optional context for connecting to an existing trace 377 name: Name of the span (e.g., function or operation name) 378 input: Input data for the operation (can be any JSON-serializable object) 379 output: Output data from the operation (can be any JSON-serializable object) 380 metadata: Additional metadata to associate with the span 381 version: Version identifier for the code or component 382 level: Importance level of the span (info, warning, error) 383 status_message: Optional status message for the span 384 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 385 386 Returns: 387 A context manager that yields a LangfuseSpan 388 389 Example: 390 ```python 391 with langfuse.start_as_current_span(name="process-query") as span: 392 # Do work 393 result = process_data() 394 span.update(output=result) 395 396 # Create a child span automatically 397 with span.start_as_current_span(name="sub-operation") as child_span: 398 # Do sub-operation work 399 child_span.update(output="sub-result") 400 ``` 401 """ 402 return self.start_as_current_observation( 403 trace_context=trace_context, 404 name=name, 405 as_type="span", 406 input=input, 407 output=output, 408 metadata=metadata, 409 version=version, 410 level=level, 411 status_message=status_message, 412 end_on_exit=end_on_exit, 413 ) 414 415 @overload 416 def start_observation( 417 self, 418 *, 419 trace_context: Optional[TraceContext] = None, 420 name: str, 421 as_type: Literal["generation"], 422 input: Optional[Any] = None, 423 output: Optional[Any] = None, 424 metadata: Optional[Any] = None, 425 version: Optional[str] = None, 426 level: Optional[SpanLevel] = None, 427 status_message: Optional[str] = None, 428 completion_start_time: Optional[datetime] = None, 429 model: Optional[str] = None, 430 model_parameters: Optional[Dict[str, MapValue]] = None, 431 usage_details: Optional[Dict[str, int]] = None, 432 cost_details: Optional[Dict[str, float]] = None, 433 prompt: Optional[PromptClient] = None, 434 ) -> LangfuseGeneration: ... 435 436 @overload 437 def start_observation( 438 self, 439 *, 440 trace_context: Optional[TraceContext] = None, 441 name: str, 442 as_type: Literal["span"] = "span", 443 input: Optional[Any] = None, 444 output: Optional[Any] = None, 445 metadata: Optional[Any] = None, 446 version: Optional[str] = None, 447 level: Optional[SpanLevel] = None, 448 status_message: Optional[str] = None, 449 ) -> LangfuseSpan: ... 450 451 @overload 452 def start_observation( 453 self, 454 *, 455 trace_context: Optional[TraceContext] = None, 456 name: str, 457 as_type: Literal["agent"], 458 input: Optional[Any] = None, 459 output: Optional[Any] = None, 460 metadata: Optional[Any] = None, 461 version: Optional[str] = None, 462 level: Optional[SpanLevel] = None, 463 status_message: Optional[str] = None, 464 ) -> LangfuseAgent: ... 465 466 @overload 467 def start_observation( 468 self, 469 *, 470 trace_context: Optional[TraceContext] = None, 471 name: str, 472 as_type: Literal["tool"], 473 input: Optional[Any] = None, 474 output: Optional[Any] = None, 475 metadata: Optional[Any] = None, 476 version: Optional[str] = None, 477 level: Optional[SpanLevel] = None, 478 status_message: Optional[str] = None, 479 ) -> LangfuseTool: ... 480 481 @overload 482 def start_observation( 483 self, 484 *, 485 trace_context: Optional[TraceContext] = None, 486 name: str, 487 as_type: Literal["chain"], 488 input: Optional[Any] = None, 489 output: Optional[Any] = None, 490 metadata: Optional[Any] = None, 491 version: Optional[str] = None, 492 level: Optional[SpanLevel] = None, 493 status_message: Optional[str] = None, 494 ) -> LangfuseChain: ... 495 496 @overload 497 def start_observation( 498 self, 499 *, 500 trace_context: Optional[TraceContext] = None, 501 name: str, 502 as_type: Literal["retriever"], 503 input: Optional[Any] = None, 504 output: Optional[Any] = None, 505 metadata: Optional[Any] = None, 506 version: Optional[str] = None, 507 level: Optional[SpanLevel] = None, 508 status_message: Optional[str] = None, 509 ) -> LangfuseRetriever: ... 510 511 @overload 512 def start_observation( 513 self, 514 *, 515 trace_context: Optional[TraceContext] = None, 516 name: str, 517 as_type: Literal["evaluator"], 518 input: Optional[Any] = None, 519 output: Optional[Any] = None, 520 metadata: Optional[Any] = None, 521 version: Optional[str] = None, 522 level: Optional[SpanLevel] = None, 523 status_message: Optional[str] = None, 524 ) -> LangfuseEvaluator: ... 525 526 @overload 527 def start_observation( 528 self, 529 *, 530 trace_context: Optional[TraceContext] = None, 531 name: str, 532 as_type: Literal["embedding"], 533 input: Optional[Any] = None, 534 output: Optional[Any] = None, 535 metadata: Optional[Any] = None, 536 version: Optional[str] = None, 537 level: Optional[SpanLevel] = None, 538 status_message: Optional[str] = None, 539 completion_start_time: Optional[datetime] = None, 540 model: Optional[str] = None, 541 model_parameters: Optional[Dict[str, MapValue]] = None, 542 usage_details: Optional[Dict[str, int]] = None, 543 cost_details: Optional[Dict[str, float]] = None, 544 prompt: Optional[PromptClient] = None, 545 ) -> LangfuseEmbedding: ... 546 547 @overload 548 def start_observation( 549 self, 550 *, 551 trace_context: Optional[TraceContext] = None, 552 name: str, 553 as_type: Literal["guardrail"], 554 input: Optional[Any] = None, 555 output: Optional[Any] = None, 556 metadata: Optional[Any] = None, 557 version: Optional[str] = None, 558 level: Optional[SpanLevel] = None, 559 status_message: Optional[str] = None, 560 ) -> LangfuseGuardrail: ... 561 562 def start_observation( 563 self, 564 *, 565 trace_context: Optional[TraceContext] = None, 566 name: str, 567 as_type: ObservationTypeLiteralNoEvent = "span", 568 input: Optional[Any] = None, 569 output: Optional[Any] = None, 570 metadata: Optional[Any] = None, 571 version: Optional[str] = None, 572 level: Optional[SpanLevel] = None, 573 status_message: Optional[str] = None, 574 completion_start_time: Optional[datetime] = None, 575 model: Optional[str] = None, 576 model_parameters: Optional[Dict[str, MapValue]] = None, 577 usage_details: Optional[Dict[str, int]] = None, 578 cost_details: Optional[Dict[str, float]] = None, 579 prompt: Optional[PromptClient] = None, 580 ) -> Union[ 581 LangfuseSpan, 582 LangfuseGeneration, 583 LangfuseAgent, 584 LangfuseTool, 585 LangfuseChain, 586 LangfuseRetriever, 587 LangfuseEvaluator, 588 LangfuseEmbedding, 589 LangfuseGuardrail, 590 ]: 591 """Create a new observation of the specified type. 592 593 This method creates a new observation but does not set it as the current span in the 594 context. To create and use an observation within a context, use start_as_current_observation(). 595 596 Args: 597 trace_context: Optional context for connecting to an existing trace 598 name: Name of the observation 599 as_type: Type of observation to create (defaults to "span") 600 input: Input data for the operation 601 output: Output data from the operation 602 metadata: Additional metadata to associate with the observation 603 version: Version identifier for the code or component 604 level: Importance level of the observation 605 status_message: Optional status message for the observation 606 completion_start_time: When the model started generating (for generation types) 607 model: Name/identifier of the AI model used (for generation types) 608 model_parameters: Parameters used for the model (for generation types) 609 usage_details: Token usage information (for generation types) 610 cost_details: Cost information (for generation types) 611 prompt: Associated prompt template (for generation types) 612 613 Returns: 614 An observation object of the appropriate type that must be ended with .end() 615 """ 616 if trace_context: 617 trace_id = trace_context.get("trace_id", None) 618 parent_span_id = trace_context.get("parent_span_id", None) 619 620 if trace_id: 621 remote_parent_span = self._create_remote_parent_span( 622 trace_id=trace_id, parent_span_id=parent_span_id 623 ) 624 625 with otel_trace_api.use_span( 626 cast(otel_trace_api.Span, remote_parent_span) 627 ): 628 otel_span = self._otel_tracer.start_span(name=name) 629 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 630 631 return self._create_observation_from_otel_span( 632 otel_span=otel_span, 633 as_type=as_type, 634 input=input, 635 output=output, 636 metadata=metadata, 637 version=version, 638 level=level, 639 status_message=status_message, 640 completion_start_time=completion_start_time, 641 model=model, 642 model_parameters=model_parameters, 643 usage_details=usage_details, 644 cost_details=cost_details, 645 prompt=prompt, 646 ) 647 648 otel_span = self._otel_tracer.start_span(name=name) 649 650 return self._create_observation_from_otel_span( 651 otel_span=otel_span, 652 as_type=as_type, 653 input=input, 654 output=output, 655 metadata=metadata, 656 version=version, 657 level=level, 658 status_message=status_message, 659 completion_start_time=completion_start_time, 660 model=model, 661 model_parameters=model_parameters, 662 usage_details=usage_details, 663 cost_details=cost_details, 664 prompt=prompt, 665 ) 666 667 def _create_observation_from_otel_span( 668 self, 669 *, 670 otel_span: otel_trace_api.Span, 671 as_type: ObservationTypeLiteralNoEvent, 672 input: Optional[Any] = None, 673 output: Optional[Any] = None, 674 metadata: Optional[Any] = None, 675 version: Optional[str] = None, 676 level: Optional[SpanLevel] = None, 677 status_message: Optional[str] = None, 678 completion_start_time: Optional[datetime] = None, 679 model: Optional[str] = None, 680 model_parameters: Optional[Dict[str, MapValue]] = None, 681 usage_details: Optional[Dict[str, int]] = None, 682 cost_details: Optional[Dict[str, float]] = None, 683 prompt: Optional[PromptClient] = None, 684 ) -> Union[ 685 LangfuseSpan, 686 LangfuseGeneration, 687 LangfuseAgent, 688 LangfuseTool, 689 LangfuseChain, 690 LangfuseRetriever, 691 LangfuseEvaluator, 692 LangfuseEmbedding, 693 LangfuseGuardrail, 694 ]: 695 """Create the appropriate observation type from an OTEL span.""" 696 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 697 observation_class = self._get_span_class(as_type) 698 # Type ignore to prevent overloads of internal _get_span_class function, 699 # issue is that LangfuseEvent could be returned and that classes have diff. args 700 return observation_class( # type: ignore[return-value,call-arg] 701 otel_span=otel_span, 702 langfuse_client=self, 703 environment=self._environment, 704 input=input, 705 output=output, 706 metadata=metadata, 707 version=version, 708 level=level, 709 status_message=status_message, 710 completion_start_time=completion_start_time, 711 model=model, 712 model_parameters=model_parameters, 713 usage_details=usage_details, 714 cost_details=cost_details, 715 prompt=prompt, 716 ) 717 else: 718 # For other types (e.g. span, guardrail), create appropriate class without generation properties 719 observation_class = self._get_span_class(as_type) 720 # Type ignore to prevent overloads of internal _get_span_class function, 721 # issue is that LangfuseEvent could be returned and that classes have diff. args 722 return observation_class( # type: ignore[return-value,call-arg] 723 otel_span=otel_span, 724 langfuse_client=self, 725 environment=self._environment, 726 input=input, 727 output=output, 728 metadata=metadata, 729 version=version, 730 level=level, 731 status_message=status_message, 732 ) 733 # span._observation_type = as_type 734 # span._otel_span.set_attribute("langfuse.observation.type", as_type) 735 # return span 736 737 def start_generation( 738 self, 739 *, 740 trace_context: Optional[TraceContext] = None, 741 name: str, 742 input: Optional[Any] = None, 743 output: Optional[Any] = None, 744 metadata: Optional[Any] = None, 745 version: Optional[str] = None, 746 level: Optional[SpanLevel] = None, 747 status_message: Optional[str] = None, 748 completion_start_time: Optional[datetime] = None, 749 model: Optional[str] = None, 750 model_parameters: Optional[Dict[str, MapValue]] = None, 751 usage_details: Optional[Dict[str, int]] = None, 752 cost_details: Optional[Dict[str, float]] = None, 753 prompt: Optional[PromptClient] = None, 754 ) -> LangfuseGeneration: 755 """Create a new generation span for model generations. 756 757 DEPRECATED: This method is deprecated and will be removed in a future version. 758 Use start_observation(as_type='generation') instead. 759 760 This method creates a specialized span for tracking model generations. 761 It includes additional fields specific to model generations such as model name, 762 token usage, and cost details. 763 764 The created generation span will be the child of the current span in the context. 765 766 Args: 767 trace_context: Optional context for connecting to an existing trace 768 name: Name of the generation operation 769 input: Input data for the model (e.g., prompts) 770 output: Output from the model (e.g., completions) 771 metadata: Additional metadata to associate with the generation 772 version: Version identifier for the model or component 773 level: Importance level of the generation (info, warning, error) 774 status_message: Optional status message for the generation 775 completion_start_time: When the model started generating the response 776 model: Name/identifier of the AI model used (e.g., "gpt-4") 777 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 778 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 779 cost_details: Cost information for the model call 780 prompt: Associated prompt template from Langfuse prompt management 781 782 Returns: 783 A LangfuseGeneration object that must be ended with .end() when complete 784 785 Example: 786 ```python 787 generation = langfuse.start_generation( 788 name="answer-generation", 789 model="gpt-4", 790 input={"prompt": "Explain quantum computing"}, 791 model_parameters={"temperature": 0.7} 792 ) 793 try: 794 # Call model API 795 response = llm.generate(...) 796 797 generation.update( 798 output=response.text, 799 usage_details={ 800 "prompt_tokens": response.usage.prompt_tokens, 801 "completion_tokens": response.usage.completion_tokens 802 } 803 ) 804 finally: 805 generation.end() 806 ``` 807 """ 808 warnings.warn( 809 "start_generation is deprecated and will be removed in a future version. " 810 "Use start_observation(as_type='generation') instead.", 811 DeprecationWarning, 812 stacklevel=2, 813 ) 814 return self.start_observation( 815 trace_context=trace_context, 816 name=name, 817 as_type="generation", 818 input=input, 819 output=output, 820 metadata=metadata, 821 version=version, 822 level=level, 823 status_message=status_message, 824 completion_start_time=completion_start_time, 825 model=model, 826 model_parameters=model_parameters, 827 usage_details=usage_details, 828 cost_details=cost_details, 829 prompt=prompt, 830 ) 831 832 def start_as_current_generation( 833 self, 834 *, 835 trace_context: Optional[TraceContext] = None, 836 name: str, 837 input: Optional[Any] = None, 838 output: Optional[Any] = None, 839 metadata: Optional[Any] = None, 840 version: Optional[str] = None, 841 level: Optional[SpanLevel] = None, 842 status_message: Optional[str] = None, 843 completion_start_time: Optional[datetime] = None, 844 model: Optional[str] = None, 845 model_parameters: Optional[Dict[str, MapValue]] = None, 846 usage_details: Optional[Dict[str, int]] = None, 847 cost_details: Optional[Dict[str, float]] = None, 848 prompt: Optional[PromptClient] = None, 849 end_on_exit: Optional[bool] = None, 850 ) -> _AgnosticContextManager[LangfuseGeneration]: 851 """Create a new generation span and set it as the current span in a context manager. 852 853 DEPRECATED: This method is deprecated and will be removed in a future version. 854 Use start_as_current_observation(as_type='generation') instead. 855 856 This method creates a specialized span for model generations and sets it as the 857 current span within a context manager. Use this method with a 'with' statement to 858 automatically handle the generation span lifecycle within a code block. 859 860 The created generation span will be the child of the current span in the context. 861 862 Args: 863 trace_context: Optional context for connecting to an existing trace 864 name: Name of the generation operation 865 input: Input data for the model (e.g., prompts) 866 output: Output from the model (e.g., completions) 867 metadata: Additional metadata to associate with the generation 868 version: Version identifier for the model or component 869 level: Importance level of the generation (info, warning, error) 870 status_message: Optional status message for the generation 871 completion_start_time: When the model started generating the response 872 model: Name/identifier of the AI model used (e.g., "gpt-4") 873 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 874 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 875 cost_details: Cost information for the model call 876 prompt: Associated prompt template from Langfuse prompt management 877 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 878 879 Returns: 880 A context manager that yields a LangfuseGeneration 881 882 Example: 883 ```python 884 with langfuse.start_as_current_generation( 885 name="answer-generation", 886 model="gpt-4", 887 input={"prompt": "Explain quantum computing"} 888 ) as generation: 889 # Call model API 890 response = llm.generate(...) 891 892 # Update with results 893 generation.update( 894 output=response.text, 895 usage_details={ 896 "prompt_tokens": response.usage.prompt_tokens, 897 "completion_tokens": response.usage.completion_tokens 898 } 899 ) 900 ``` 901 """ 902 warnings.warn( 903 "start_as_current_generation is deprecated and will be removed in a future version. " 904 "Use start_as_current_observation(as_type='generation') instead.", 905 DeprecationWarning, 906 stacklevel=2, 907 ) 908 return self.start_as_current_observation( 909 trace_context=trace_context, 910 name=name, 911 as_type="generation", 912 input=input, 913 output=output, 914 metadata=metadata, 915 version=version, 916 level=level, 917 status_message=status_message, 918 completion_start_time=completion_start_time, 919 model=model, 920 model_parameters=model_parameters, 921 usage_details=usage_details, 922 cost_details=cost_details, 923 prompt=prompt, 924 end_on_exit=end_on_exit, 925 ) 926 927 @overload 928 def start_as_current_observation( 929 self, 930 *, 931 trace_context: Optional[TraceContext] = None, 932 name: str, 933 as_type: Literal["generation"], 934 input: Optional[Any] = None, 935 output: Optional[Any] = None, 936 metadata: Optional[Any] = None, 937 version: Optional[str] = None, 938 level: Optional[SpanLevel] = None, 939 status_message: Optional[str] = None, 940 completion_start_time: Optional[datetime] = None, 941 model: Optional[str] = None, 942 model_parameters: Optional[Dict[str, MapValue]] = None, 943 usage_details: Optional[Dict[str, int]] = None, 944 cost_details: Optional[Dict[str, float]] = None, 945 prompt: Optional[PromptClient] = None, 946 end_on_exit: Optional[bool] = None, 947 ) -> _AgnosticContextManager[LangfuseGeneration]: ... 948 949 @overload 950 def start_as_current_observation( 951 self, 952 *, 953 trace_context: Optional[TraceContext] = None, 954 name: str, 955 as_type: Literal["span"] = "span", 956 input: Optional[Any] = None, 957 output: Optional[Any] = None, 958 metadata: Optional[Any] = None, 959 version: Optional[str] = None, 960 level: Optional[SpanLevel] = None, 961 status_message: Optional[str] = None, 962 end_on_exit: Optional[bool] = None, 963 ) -> _AgnosticContextManager[LangfuseSpan]: ... 964 965 @overload 966 def start_as_current_observation( 967 self, 968 *, 969 trace_context: Optional[TraceContext] = None, 970 name: str, 971 as_type: Literal["agent"], 972 input: Optional[Any] = None, 973 output: Optional[Any] = None, 974 metadata: Optional[Any] = None, 975 version: Optional[str] = None, 976 level: Optional[SpanLevel] = None, 977 status_message: Optional[str] = None, 978 end_on_exit: Optional[bool] = None, 979 ) -> _AgnosticContextManager[LangfuseAgent]: ... 980 981 @overload 982 def start_as_current_observation( 983 self, 984 *, 985 trace_context: Optional[TraceContext] = None, 986 name: str, 987 as_type: Literal["tool"], 988 input: Optional[Any] = None, 989 output: Optional[Any] = None, 990 metadata: Optional[Any] = None, 991 version: Optional[str] = None, 992 level: Optional[SpanLevel] = None, 993 status_message: Optional[str] = None, 994 end_on_exit: Optional[bool] = None, 995 ) -> _AgnosticContextManager[LangfuseTool]: ... 996 997 @overload 998 def start_as_current_observation( 999 self, 1000 *, 1001 trace_context: Optional[TraceContext] = None, 1002 name: str, 1003 as_type: Literal["chain"], 1004 input: Optional[Any] = None, 1005 output: Optional[Any] = None, 1006 metadata: Optional[Any] = None, 1007 version: Optional[str] = None, 1008 level: Optional[SpanLevel] = None, 1009 status_message: Optional[str] = None, 1010 end_on_exit: Optional[bool] = None, 1011 ) -> _AgnosticContextManager[LangfuseChain]: ... 1012 1013 @overload 1014 def start_as_current_observation( 1015 self, 1016 *, 1017 trace_context: Optional[TraceContext] = None, 1018 name: str, 1019 as_type: Literal["retriever"], 1020 input: Optional[Any] = None, 1021 output: Optional[Any] = None, 1022 metadata: Optional[Any] = None, 1023 version: Optional[str] = None, 1024 level: Optional[SpanLevel] = None, 1025 status_message: Optional[str] = None, 1026 end_on_exit: Optional[bool] = None, 1027 ) -> _AgnosticContextManager[LangfuseRetriever]: ... 1028 1029 @overload 1030 def start_as_current_observation( 1031 self, 1032 *, 1033 trace_context: Optional[TraceContext] = None, 1034 name: str, 1035 as_type: Literal["evaluator"], 1036 input: Optional[Any] = None, 1037 output: Optional[Any] = None, 1038 metadata: Optional[Any] = None, 1039 version: Optional[str] = None, 1040 level: Optional[SpanLevel] = None, 1041 status_message: Optional[str] = None, 1042 end_on_exit: Optional[bool] = None, 1043 ) -> _AgnosticContextManager[LangfuseEvaluator]: ... 1044 1045 @overload 1046 def start_as_current_observation( 1047 self, 1048 *, 1049 trace_context: Optional[TraceContext] = None, 1050 name: str, 1051 as_type: Literal["embedding"], 1052 input: Optional[Any] = None, 1053 output: Optional[Any] = None, 1054 metadata: Optional[Any] = None, 1055 version: Optional[str] = None, 1056 level: Optional[SpanLevel] = None, 1057 status_message: Optional[str] = None, 1058 completion_start_time: Optional[datetime] = None, 1059 model: Optional[str] = None, 1060 model_parameters: Optional[Dict[str, MapValue]] = None, 1061 usage_details: Optional[Dict[str, int]] = None, 1062 cost_details: Optional[Dict[str, float]] = None, 1063 prompt: Optional[PromptClient] = None, 1064 end_on_exit: Optional[bool] = None, 1065 ) -> _AgnosticContextManager[LangfuseEmbedding]: ... 1066 1067 @overload 1068 def start_as_current_observation( 1069 self, 1070 *, 1071 trace_context: Optional[TraceContext] = None, 1072 name: str, 1073 as_type: Literal["guardrail"], 1074 input: Optional[Any] = None, 1075 output: Optional[Any] = None, 1076 metadata: Optional[Any] = None, 1077 version: Optional[str] = None, 1078 level: Optional[SpanLevel] = None, 1079 status_message: Optional[str] = None, 1080 end_on_exit: Optional[bool] = None, 1081 ) -> _AgnosticContextManager[LangfuseGuardrail]: ... 1082 1083 def start_as_current_observation( 1084 self, 1085 *, 1086 trace_context: Optional[TraceContext] = None, 1087 name: str, 1088 as_type: ObservationTypeLiteralNoEvent = "span", 1089 input: Optional[Any] = None, 1090 output: Optional[Any] = None, 1091 metadata: Optional[Any] = None, 1092 version: Optional[str] = None, 1093 level: Optional[SpanLevel] = None, 1094 status_message: Optional[str] = None, 1095 completion_start_time: Optional[datetime] = None, 1096 model: Optional[str] = None, 1097 model_parameters: Optional[Dict[str, MapValue]] = None, 1098 usage_details: Optional[Dict[str, int]] = None, 1099 cost_details: Optional[Dict[str, float]] = None, 1100 prompt: Optional[PromptClient] = None, 1101 end_on_exit: Optional[bool] = None, 1102 ) -> Union[ 1103 _AgnosticContextManager[LangfuseGeneration], 1104 _AgnosticContextManager[LangfuseSpan], 1105 _AgnosticContextManager[LangfuseAgent], 1106 _AgnosticContextManager[LangfuseTool], 1107 _AgnosticContextManager[LangfuseChain], 1108 _AgnosticContextManager[LangfuseRetriever], 1109 _AgnosticContextManager[LangfuseEvaluator], 1110 _AgnosticContextManager[LangfuseEmbedding], 1111 _AgnosticContextManager[LangfuseGuardrail], 1112 ]: 1113 """Create a new observation and set it as the current span in a context manager. 1114 1115 This method creates a new observation of the specified type and sets it as the 1116 current span within a context manager. Use this method with a 'with' statement to 1117 automatically handle the observation lifecycle within a code block. 1118 1119 The created observation will be the child of the current span in the context. 1120 1121 Args: 1122 trace_context: Optional context for connecting to an existing trace 1123 name: Name of the observation (e.g., function or operation name) 1124 as_type: Type of observation to create (defaults to "span") 1125 input: Input data for the operation (can be any JSON-serializable object) 1126 output: Output data from the operation (can be any JSON-serializable object) 1127 metadata: Additional metadata to associate with the observation 1128 version: Version identifier for the code or component 1129 level: Importance level of the observation (info, warning, error) 1130 status_message: Optional status message for the observation 1131 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1132 1133 The following parameters are available when as_type is: "generation" or "embedding". 1134 completion_start_time: When the model started generating the response 1135 model: Name/identifier of the AI model used (e.g., "gpt-4") 1136 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1137 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1138 cost_details: Cost information for the model call 1139 prompt: Associated prompt template from Langfuse prompt management 1140 1141 Returns: 1142 A context manager that yields the appropriate observation type based on as_type 1143 1144 Example: 1145 ```python 1146 # Create a span 1147 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1148 # Do work 1149 result = process_data() 1150 span.update(output=result) 1151 1152 # Create a child span automatically 1153 with span.start_as_current_span(name="sub-operation") as child_span: 1154 # Do sub-operation work 1155 child_span.update(output="sub-result") 1156 1157 # Create a tool observation 1158 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1159 # Do tool work 1160 results = search_web(query) 1161 tool.update(output=results) 1162 1163 # Create a generation observation 1164 with langfuse.start_as_current_observation( 1165 name="answer-generation", 1166 as_type="generation", 1167 model="gpt-4" 1168 ) as generation: 1169 # Generate answer 1170 response = llm.generate(...) 1171 generation.update(output=response) 1172 ``` 1173 """ 1174 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1175 if trace_context: 1176 trace_id = trace_context.get("trace_id", None) 1177 parent_span_id = trace_context.get("parent_span_id", None) 1178 1179 if trace_id: 1180 remote_parent_span = self._create_remote_parent_span( 1181 trace_id=trace_id, parent_span_id=parent_span_id 1182 ) 1183 1184 return cast( 1185 Union[ 1186 _AgnosticContextManager[LangfuseGeneration], 1187 _AgnosticContextManager[LangfuseEmbedding], 1188 ], 1189 self._create_span_with_parent_context( 1190 as_type=as_type, 1191 name=name, 1192 remote_parent_span=remote_parent_span, 1193 parent=None, 1194 end_on_exit=end_on_exit, 1195 input=input, 1196 output=output, 1197 metadata=metadata, 1198 version=version, 1199 level=level, 1200 status_message=status_message, 1201 completion_start_time=completion_start_time, 1202 model=model, 1203 model_parameters=model_parameters, 1204 usage_details=usage_details, 1205 cost_details=cost_details, 1206 prompt=prompt, 1207 ), 1208 ) 1209 1210 return cast( 1211 Union[ 1212 _AgnosticContextManager[LangfuseGeneration], 1213 _AgnosticContextManager[LangfuseEmbedding], 1214 ], 1215 self._start_as_current_otel_span_with_processed_media( 1216 as_type=as_type, 1217 name=name, 1218 end_on_exit=end_on_exit, 1219 input=input, 1220 output=output, 1221 metadata=metadata, 1222 version=version, 1223 level=level, 1224 status_message=status_message, 1225 completion_start_time=completion_start_time, 1226 model=model, 1227 model_parameters=model_parameters, 1228 usage_details=usage_details, 1229 cost_details=cost_details, 1230 prompt=prompt, 1231 ), 1232 ) 1233 1234 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1235 if trace_context: 1236 trace_id = trace_context.get("trace_id", None) 1237 parent_span_id = trace_context.get("parent_span_id", None) 1238 1239 if trace_id: 1240 remote_parent_span = self._create_remote_parent_span( 1241 trace_id=trace_id, parent_span_id=parent_span_id 1242 ) 1243 1244 return cast( 1245 Union[ 1246 _AgnosticContextManager[LangfuseSpan], 1247 _AgnosticContextManager[LangfuseAgent], 1248 _AgnosticContextManager[LangfuseTool], 1249 _AgnosticContextManager[LangfuseChain], 1250 _AgnosticContextManager[LangfuseRetriever], 1251 _AgnosticContextManager[LangfuseEvaluator], 1252 _AgnosticContextManager[LangfuseGuardrail], 1253 ], 1254 self._create_span_with_parent_context( 1255 as_type=as_type, 1256 name=name, 1257 remote_parent_span=remote_parent_span, 1258 parent=None, 1259 end_on_exit=end_on_exit, 1260 input=input, 1261 output=output, 1262 metadata=metadata, 1263 version=version, 1264 level=level, 1265 status_message=status_message, 1266 ), 1267 ) 1268 1269 return cast( 1270 Union[ 1271 _AgnosticContextManager[LangfuseSpan], 1272 _AgnosticContextManager[LangfuseAgent], 1273 _AgnosticContextManager[LangfuseTool], 1274 _AgnosticContextManager[LangfuseChain], 1275 _AgnosticContextManager[LangfuseRetriever], 1276 _AgnosticContextManager[LangfuseEvaluator], 1277 _AgnosticContextManager[LangfuseGuardrail], 1278 ], 1279 self._start_as_current_otel_span_with_processed_media( 1280 as_type=as_type, 1281 name=name, 1282 end_on_exit=end_on_exit, 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 ), 1290 ) 1291 1292 # This should never be reached since all valid types are handled above 1293 langfuse_logger.warning( 1294 f"Unknown observation type: {as_type}, falling back to span" 1295 ) 1296 return self._start_as_current_otel_span_with_processed_media( 1297 as_type="span", 1298 name=name, 1299 end_on_exit=end_on_exit, 1300 input=input, 1301 output=output, 1302 metadata=metadata, 1303 version=version, 1304 level=level, 1305 status_message=status_message, 1306 ) 1307 1308 def _get_span_class( 1309 self, 1310 as_type: ObservationTypeLiteral, 1311 ) -> Union[ 1312 Type[LangfuseAgent], 1313 Type[LangfuseTool], 1314 Type[LangfuseChain], 1315 Type[LangfuseRetriever], 1316 Type[LangfuseEvaluator], 1317 Type[LangfuseEmbedding], 1318 Type[LangfuseGuardrail], 1319 Type[LangfuseGeneration], 1320 Type[LangfuseEvent], 1321 Type[LangfuseSpan], 1322 ]: 1323 """Get the appropriate span class based on as_type.""" 1324 normalized_type = as_type.lower() 1325 1326 if normalized_type == "agent": 1327 return LangfuseAgent 1328 elif normalized_type == "tool": 1329 return LangfuseTool 1330 elif normalized_type == "chain": 1331 return LangfuseChain 1332 elif normalized_type == "retriever": 1333 return LangfuseRetriever 1334 elif normalized_type == "evaluator": 1335 return LangfuseEvaluator 1336 elif normalized_type == "embedding": 1337 return LangfuseEmbedding 1338 elif normalized_type == "guardrail": 1339 return LangfuseGuardrail 1340 elif normalized_type == "generation": 1341 return LangfuseGeneration 1342 elif normalized_type == "event": 1343 return LangfuseEvent 1344 elif normalized_type == "span": 1345 return LangfuseSpan 1346 else: 1347 return LangfuseSpan 1348 1349 @_agnosticcontextmanager 1350 def _create_span_with_parent_context( 1351 self, 1352 *, 1353 name: str, 1354 parent: Optional[otel_trace_api.Span] = None, 1355 remote_parent_span: Optional[otel_trace_api.Span] = None, 1356 as_type: ObservationTypeLiteralNoEvent, 1357 end_on_exit: Optional[bool] = None, 1358 input: Optional[Any] = None, 1359 output: Optional[Any] = None, 1360 metadata: Optional[Any] = None, 1361 version: Optional[str] = None, 1362 level: Optional[SpanLevel] = None, 1363 status_message: Optional[str] = None, 1364 completion_start_time: Optional[datetime] = None, 1365 model: Optional[str] = None, 1366 model_parameters: Optional[Dict[str, MapValue]] = None, 1367 usage_details: Optional[Dict[str, int]] = None, 1368 cost_details: Optional[Dict[str, float]] = None, 1369 prompt: Optional[PromptClient] = None, 1370 ) -> Any: 1371 parent_span = parent or cast(otel_trace_api.Span, remote_parent_span) 1372 1373 with otel_trace_api.use_span(parent_span): 1374 with self._start_as_current_otel_span_with_processed_media( 1375 name=name, 1376 as_type=as_type, 1377 end_on_exit=end_on_exit, 1378 input=input, 1379 output=output, 1380 metadata=metadata, 1381 version=version, 1382 level=level, 1383 status_message=status_message, 1384 completion_start_time=completion_start_time, 1385 model=model, 1386 model_parameters=model_parameters, 1387 usage_details=usage_details, 1388 cost_details=cost_details, 1389 prompt=prompt, 1390 ) as langfuse_span: 1391 if remote_parent_span is not None: 1392 langfuse_span._otel_span.set_attribute( 1393 LangfuseOtelSpanAttributes.AS_ROOT, True 1394 ) 1395 1396 yield langfuse_span 1397 1398 @_agnosticcontextmanager 1399 def _start_as_current_otel_span_with_processed_media( 1400 self, 1401 *, 1402 name: str, 1403 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 1404 end_on_exit: Optional[bool] = None, 1405 input: Optional[Any] = None, 1406 output: Optional[Any] = None, 1407 metadata: Optional[Any] = None, 1408 version: Optional[str] = None, 1409 level: Optional[SpanLevel] = None, 1410 status_message: Optional[str] = None, 1411 completion_start_time: Optional[datetime] = None, 1412 model: Optional[str] = None, 1413 model_parameters: Optional[Dict[str, MapValue]] = None, 1414 usage_details: Optional[Dict[str, int]] = None, 1415 cost_details: Optional[Dict[str, float]] = None, 1416 prompt: Optional[PromptClient] = None, 1417 ) -> Any: 1418 with self._otel_tracer.start_as_current_span( 1419 name=name, 1420 end_on_exit=end_on_exit if end_on_exit is not None else True, 1421 ) as otel_span: 1422 span_class = self._get_span_class( 1423 as_type or "generation" 1424 ) # default was "generation" 1425 common_args = { 1426 "otel_span": otel_span, 1427 "langfuse_client": self, 1428 "environment": self._environment, 1429 "input": input, 1430 "output": output, 1431 "metadata": metadata, 1432 "version": version, 1433 "level": level, 1434 "status_message": status_message, 1435 } 1436 1437 if span_class in [ 1438 LangfuseGeneration, 1439 LangfuseEmbedding, 1440 ]: 1441 common_args.update( 1442 { 1443 "completion_start_time": completion_start_time, 1444 "model": model, 1445 "model_parameters": model_parameters, 1446 "usage_details": usage_details, 1447 "cost_details": cost_details, 1448 "prompt": prompt, 1449 } 1450 ) 1451 # For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed 1452 1453 yield span_class(**common_args) # type: ignore[arg-type] 1454 1455 def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]: 1456 current_span = otel_trace_api.get_current_span() 1457 1458 if current_span is otel_trace_api.INVALID_SPAN: 1459 langfuse_logger.warning( 1460 "Context error: No active span in current context. Operations that depend on an active span will be skipped. " 1461 "Ensure spans are created with start_as_current_span() or that you're operating within an active span context." 1462 ) 1463 return None 1464 1465 return current_span 1466 1467 def update_current_generation( 1468 self, 1469 *, 1470 name: Optional[str] = None, 1471 input: Optional[Any] = None, 1472 output: Optional[Any] = None, 1473 metadata: Optional[Any] = None, 1474 version: Optional[str] = None, 1475 level: Optional[SpanLevel] = None, 1476 status_message: Optional[str] = None, 1477 completion_start_time: Optional[datetime] = None, 1478 model: Optional[str] = None, 1479 model_parameters: Optional[Dict[str, MapValue]] = None, 1480 usage_details: Optional[Dict[str, int]] = None, 1481 cost_details: Optional[Dict[str, float]] = None, 1482 prompt: Optional[PromptClient] = None, 1483 ) -> None: 1484 """Update the current active generation span with new information. 1485 1486 This method updates the current generation span in the active context with 1487 additional information. It's useful for adding output, usage stats, or other 1488 details that become available during or after model generation. 1489 1490 Args: 1491 name: The generation name 1492 input: Updated input data for the model 1493 output: Output from the model (e.g., completions) 1494 metadata: Additional metadata to associate with the generation 1495 version: Version identifier for the model or component 1496 level: Importance level of the generation (info, warning, error) 1497 status_message: Optional status message for the generation 1498 completion_start_time: When the model started generating the response 1499 model: Name/identifier of the AI model used (e.g., "gpt-4") 1500 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1501 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1502 cost_details: Cost information for the model call 1503 prompt: Associated prompt template from Langfuse prompt management 1504 1505 Example: 1506 ```python 1507 with langfuse.start_as_current_generation(name="answer-query") as generation: 1508 # Initial setup and API call 1509 response = llm.generate(...) 1510 1511 # Update with results that weren't available at creation time 1512 langfuse.update_current_generation( 1513 output=response.text, 1514 usage_details={ 1515 "prompt_tokens": response.usage.prompt_tokens, 1516 "completion_tokens": response.usage.completion_tokens 1517 } 1518 ) 1519 ``` 1520 """ 1521 if not self._tracing_enabled: 1522 langfuse_logger.debug( 1523 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1524 ) 1525 return 1526 1527 current_otel_span = self._get_current_otel_span() 1528 1529 if current_otel_span is not None: 1530 generation = LangfuseGeneration( 1531 otel_span=current_otel_span, langfuse_client=self 1532 ) 1533 1534 if name: 1535 current_otel_span.update_name(name) 1536 1537 generation.update( 1538 input=input, 1539 output=output, 1540 metadata=metadata, 1541 version=version, 1542 level=level, 1543 status_message=status_message, 1544 completion_start_time=completion_start_time, 1545 model=model, 1546 model_parameters=model_parameters, 1547 usage_details=usage_details, 1548 cost_details=cost_details, 1549 prompt=prompt, 1550 ) 1551 1552 def update_current_span( 1553 self, 1554 *, 1555 name: Optional[str] = None, 1556 input: Optional[Any] = None, 1557 output: Optional[Any] = None, 1558 metadata: Optional[Any] = None, 1559 version: Optional[str] = None, 1560 level: Optional[SpanLevel] = None, 1561 status_message: Optional[str] = None, 1562 ) -> None: 1563 """Update the current active span with new information. 1564 1565 This method updates the current span in the active context with 1566 additional information. It's useful for adding outputs or metadata 1567 that become available during execution. 1568 1569 Args: 1570 name: The span name 1571 input: Updated input data for the operation 1572 output: Output data from the operation 1573 metadata: Additional metadata to associate with the span 1574 version: Version identifier for the code or component 1575 level: Importance level of the span (info, warning, error) 1576 status_message: Optional status message for the span 1577 1578 Example: 1579 ```python 1580 with langfuse.start_as_current_span(name="process-data") as span: 1581 # Initial processing 1582 result = process_first_part() 1583 1584 # Update with intermediate results 1585 langfuse.update_current_span(metadata={"intermediate_result": result}) 1586 1587 # Continue processing 1588 final_result = process_second_part(result) 1589 1590 # Final update 1591 langfuse.update_current_span(output=final_result) 1592 ``` 1593 """ 1594 if not self._tracing_enabled: 1595 langfuse_logger.debug( 1596 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1597 ) 1598 return 1599 1600 current_otel_span = self._get_current_otel_span() 1601 1602 if current_otel_span is not None: 1603 span = LangfuseSpan( 1604 otel_span=current_otel_span, 1605 langfuse_client=self, 1606 environment=self._environment, 1607 ) 1608 1609 if name: 1610 current_otel_span.update_name(name) 1611 1612 span.update( 1613 input=input, 1614 output=output, 1615 metadata=metadata, 1616 version=version, 1617 level=level, 1618 status_message=status_message, 1619 ) 1620 1621 def update_current_trace( 1622 self, 1623 *, 1624 name: Optional[str] = None, 1625 user_id: Optional[str] = None, 1626 session_id: Optional[str] = None, 1627 version: Optional[str] = None, 1628 input: Optional[Any] = None, 1629 output: Optional[Any] = None, 1630 metadata: Optional[Any] = None, 1631 tags: Optional[List[str]] = None, 1632 public: Optional[bool] = None, 1633 ) -> None: 1634 """Update the current trace with additional information. 1635 1636 This method updates the Langfuse trace that the current span belongs to. It's useful for 1637 adding trace-level metadata like user ID, session ID, or tags that apply to 1638 the entire Langfuse trace rather than just a single observation. 1639 1640 Args: 1641 name: Updated name for the Langfuse trace 1642 user_id: ID of the user who initiated the Langfuse trace 1643 session_id: Session identifier for grouping related Langfuse traces 1644 version: Version identifier for the application or service 1645 input: Input data for the overall Langfuse trace 1646 output: Output data from the overall Langfuse trace 1647 metadata: Additional metadata to associate with the Langfuse trace 1648 tags: List of tags to categorize the Langfuse trace 1649 public: Whether the Langfuse trace should be publicly accessible 1650 1651 Example: 1652 ```python 1653 with langfuse.start_as_current_span(name="handle-request") as span: 1654 # Get user information 1655 user = authenticate_user(request) 1656 1657 # Update trace with user context 1658 langfuse.update_current_trace( 1659 user_id=user.id, 1660 session_id=request.session_id, 1661 tags=["production", "web-app"] 1662 ) 1663 1664 # Continue processing 1665 response = process_request(request) 1666 1667 # Update span with results 1668 span.update(output=response) 1669 ``` 1670 """ 1671 if not self._tracing_enabled: 1672 langfuse_logger.debug( 1673 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1674 ) 1675 return 1676 1677 current_otel_span = self._get_current_otel_span() 1678 1679 if current_otel_span is not None: 1680 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1681 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1682 ) 1683 # We need to preserve the class to keep the correct observation type 1684 span_class = self._get_span_class(existing_observation_type) 1685 span = span_class( 1686 otel_span=current_otel_span, 1687 langfuse_client=self, 1688 environment=self._environment, 1689 ) 1690 1691 span.update_trace( 1692 name=name, 1693 user_id=user_id, 1694 session_id=session_id, 1695 version=version, 1696 input=input, 1697 output=output, 1698 metadata=metadata, 1699 tags=tags, 1700 public=public, 1701 ) 1702 1703 def create_event( 1704 self, 1705 *, 1706 trace_context: Optional[TraceContext] = None, 1707 name: str, 1708 input: Optional[Any] = None, 1709 output: Optional[Any] = None, 1710 metadata: Optional[Any] = None, 1711 version: Optional[str] = None, 1712 level: Optional[SpanLevel] = None, 1713 status_message: Optional[str] = None, 1714 ) -> LangfuseEvent: 1715 """Create a new Langfuse observation of type 'EVENT'. 1716 1717 The created Langfuse Event observation will be the child of the current span in the context. 1718 1719 Args: 1720 trace_context: Optional context for connecting to an existing trace 1721 name: Name of the span (e.g., function or operation name) 1722 input: Input data for the operation (can be any JSON-serializable object) 1723 output: Output data from the operation (can be any JSON-serializable object) 1724 metadata: Additional metadata to associate with the span 1725 version: Version identifier for the code or component 1726 level: Importance level of the span (info, warning, error) 1727 status_message: Optional status message for the span 1728 1729 Returns: 1730 The Langfuse Event object 1731 1732 Example: 1733 ```python 1734 event = langfuse.create_event(name="process-event") 1735 ``` 1736 """ 1737 timestamp = time_ns() 1738 1739 if trace_context: 1740 trace_id = trace_context.get("trace_id", None) 1741 parent_span_id = trace_context.get("parent_span_id", None) 1742 1743 if trace_id: 1744 remote_parent_span = self._create_remote_parent_span( 1745 trace_id=trace_id, parent_span_id=parent_span_id 1746 ) 1747 1748 with otel_trace_api.use_span( 1749 cast(otel_trace_api.Span, remote_parent_span) 1750 ): 1751 otel_span = self._otel_tracer.start_span( 1752 name=name, start_time=timestamp 1753 ) 1754 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1755 1756 return cast( 1757 LangfuseEvent, 1758 LangfuseEvent( 1759 otel_span=otel_span, 1760 langfuse_client=self, 1761 environment=self._environment, 1762 input=input, 1763 output=output, 1764 metadata=metadata, 1765 version=version, 1766 level=level, 1767 status_message=status_message, 1768 ).end(end_time=timestamp), 1769 ) 1770 1771 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1772 1773 return cast( 1774 LangfuseEvent, 1775 LangfuseEvent( 1776 otel_span=otel_span, 1777 langfuse_client=self, 1778 environment=self._environment, 1779 input=input, 1780 output=output, 1781 metadata=metadata, 1782 version=version, 1783 level=level, 1784 status_message=status_message, 1785 ).end(end_time=timestamp), 1786 ) 1787 1788 def _create_remote_parent_span( 1789 self, *, trace_id: str, parent_span_id: Optional[str] 1790 ) -> Any: 1791 if not self._is_valid_trace_id(trace_id): 1792 langfuse_logger.warning( 1793 f"Passed trace ID '{trace_id}' is not a valid 32 lowercase hex char Langfuse trace id. Ignoring trace ID." 1794 ) 1795 1796 if parent_span_id and not self._is_valid_span_id(parent_span_id): 1797 langfuse_logger.warning( 1798 f"Passed span ID '{parent_span_id}' is not a valid 16 lowercase hex char Langfuse span id. Ignoring parent span ID." 1799 ) 1800 1801 int_trace_id = int(trace_id, 16) 1802 int_parent_span_id = ( 1803 int(parent_span_id, 16) 1804 if parent_span_id 1805 else RandomIdGenerator().generate_span_id() 1806 ) 1807 1808 span_context = otel_trace_api.SpanContext( 1809 trace_id=int_trace_id, 1810 span_id=int_parent_span_id, 1811 trace_flags=otel_trace_api.TraceFlags(0x01), # mark span as sampled 1812 is_remote=False, 1813 ) 1814 1815 return trace.NonRecordingSpan(span_context) 1816 1817 def _is_valid_trace_id(self, trace_id: str) -> bool: 1818 pattern = r"^[0-9a-f]{32}$" 1819 1820 return bool(re.match(pattern, trace_id)) 1821 1822 def _is_valid_span_id(self, span_id: str) -> bool: 1823 pattern = r"^[0-9a-f]{16}$" 1824 1825 return bool(re.match(pattern, span_id)) 1826 1827 def _create_observation_id(self, *, seed: Optional[str] = None) -> str: 1828 """Create a unique observation ID for use with Langfuse. 1829 1830 This method generates a unique observation ID (span ID in OpenTelemetry terms) 1831 for use with various Langfuse APIs. It can either generate a random ID or 1832 create a deterministic ID based on a seed string. 1833 1834 Observation IDs must be 16 lowercase hexadecimal characters, representing 8 bytes. 1835 This method ensures the generated ID meets this requirement. If you need to 1836 correlate an external ID with a Langfuse observation ID, use the external ID as 1837 the seed to get a valid, deterministic observation ID. 1838 1839 Args: 1840 seed: Optional string to use as a seed for deterministic ID generation. 1841 If provided, the same seed will always produce the same ID. 1842 If not provided, a random ID will be generated. 1843 1844 Returns: 1845 A 16-character lowercase hexadecimal string representing the observation ID. 1846 1847 Example: 1848 ```python 1849 # Generate a random observation ID 1850 obs_id = langfuse.create_observation_id() 1851 1852 # Generate a deterministic ID based on a seed 1853 user_obs_id = langfuse.create_observation_id(seed="user-123-feedback") 1854 1855 # Correlate an external item ID with a Langfuse observation ID 1856 item_id = "item-789012" 1857 correlated_obs_id = langfuse.create_observation_id(seed=item_id) 1858 1859 # Use the ID with Langfuse APIs 1860 langfuse.create_score( 1861 name="relevance", 1862 value=0.95, 1863 trace_id=trace_id, 1864 observation_id=obs_id 1865 ) 1866 ``` 1867 """ 1868 if not seed: 1869 span_id_int = RandomIdGenerator().generate_span_id() 1870 1871 return self._format_otel_span_id(span_id_int) 1872 1873 return sha256(seed.encode("utf-8")).digest()[:8].hex() 1874 1875 @staticmethod 1876 def create_trace_id(*, seed: Optional[str] = None) -> str: 1877 """Create a unique trace ID for use with Langfuse. 1878 1879 This method generates a unique trace ID for use with various Langfuse APIs. 1880 It can either generate a random ID or create a deterministic ID based on 1881 a seed string. 1882 1883 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1884 This method ensures the generated ID meets this requirement. If you need to 1885 correlate an external ID with a Langfuse trace ID, use the external ID as the 1886 seed to get a valid, deterministic Langfuse trace ID. 1887 1888 Args: 1889 seed: Optional string to use as a seed for deterministic ID generation. 1890 If provided, the same seed will always produce the same ID. 1891 If not provided, a random ID will be generated. 1892 1893 Returns: 1894 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1895 1896 Example: 1897 ```python 1898 # Generate a random trace ID 1899 trace_id = langfuse.create_trace_id() 1900 1901 # Generate a deterministic ID based on a seed 1902 session_trace_id = langfuse.create_trace_id(seed="session-456") 1903 1904 # Correlate an external ID with a Langfuse trace ID 1905 external_id = "external-system-123456" 1906 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1907 1908 # Use the ID with trace context 1909 with langfuse.start_as_current_span( 1910 name="process-request", 1911 trace_context={"trace_id": trace_id} 1912 ) as span: 1913 # Operation will be part of the specific trace 1914 pass 1915 ``` 1916 """ 1917 if not seed: 1918 trace_id_int = RandomIdGenerator().generate_trace_id() 1919 1920 return Langfuse._format_otel_trace_id(trace_id_int) 1921 1922 return sha256(seed.encode("utf-8")).digest()[:16].hex() 1923 1924 def _get_otel_trace_id(self, otel_span: otel_trace_api.Span) -> str: 1925 span_context = otel_span.get_span_context() 1926 1927 return self._format_otel_trace_id(span_context.trace_id) 1928 1929 def _get_otel_span_id(self, otel_span: otel_trace_api.Span) -> str: 1930 span_context = otel_span.get_span_context() 1931 1932 return self._format_otel_span_id(span_context.span_id) 1933 1934 @staticmethod 1935 def _format_otel_span_id(span_id_int: int) -> str: 1936 """Format an integer span ID to a 16-character lowercase hex string. 1937 1938 Internal method to convert an OpenTelemetry integer span ID to the standard 1939 W3C Trace Context format (16-character lowercase hex string). 1940 1941 Args: 1942 span_id_int: 64-bit integer representing a span ID 1943 1944 Returns: 1945 A 16-character lowercase hexadecimal string 1946 """ 1947 return format(span_id_int, "016x") 1948 1949 @staticmethod 1950 def _format_otel_trace_id(trace_id_int: int) -> str: 1951 """Format an integer trace ID to a 32-character lowercase hex string. 1952 1953 Internal method to convert an OpenTelemetry integer trace ID to the standard 1954 W3C Trace Context format (32-character lowercase hex string). 1955 1956 Args: 1957 trace_id_int: 128-bit integer representing a trace ID 1958 1959 Returns: 1960 A 32-character lowercase hexadecimal string 1961 """ 1962 return format(trace_id_int, "032x") 1963 1964 @overload 1965 def create_score( 1966 self, 1967 *, 1968 name: str, 1969 value: float, 1970 session_id: Optional[str] = None, 1971 dataset_run_id: Optional[str] = None, 1972 trace_id: Optional[str] = None, 1973 observation_id: Optional[str] = None, 1974 score_id: Optional[str] = None, 1975 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 1976 comment: Optional[str] = None, 1977 config_id: Optional[str] = None, 1978 metadata: Optional[Any] = None, 1979 ) -> None: ... 1980 1981 @overload 1982 def create_score( 1983 self, 1984 *, 1985 name: str, 1986 value: str, 1987 session_id: Optional[str] = None, 1988 dataset_run_id: Optional[str] = None, 1989 trace_id: Optional[str] = None, 1990 score_id: Optional[str] = None, 1991 observation_id: Optional[str] = None, 1992 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 1993 comment: Optional[str] = None, 1994 config_id: Optional[str] = None, 1995 metadata: Optional[Any] = None, 1996 ) -> None: ... 1997 1998 def create_score( 1999 self, 2000 *, 2001 name: str, 2002 value: Union[float, str], 2003 session_id: Optional[str] = None, 2004 dataset_run_id: Optional[str] = None, 2005 trace_id: Optional[str] = None, 2006 observation_id: Optional[str] = None, 2007 score_id: Optional[str] = None, 2008 data_type: Optional[ScoreDataType] = None, 2009 comment: Optional[str] = None, 2010 config_id: Optional[str] = None, 2011 metadata: Optional[Any] = None, 2012 ) -> None: 2013 """Create a score for a specific trace or observation. 2014 2015 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2016 used to track quality metrics, user feedback, or automated evaluations. 2017 2018 Args: 2019 name: Name of the score (e.g., "relevance", "accuracy") 2020 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2021 session_id: ID of the Langfuse session to associate the score with 2022 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2023 trace_id: ID of the Langfuse trace to associate the score with 2024 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2025 score_id: Optional custom ID for the score (auto-generated if not provided) 2026 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2027 comment: Optional comment or explanation for the score 2028 config_id: Optional ID of a score config defined in Langfuse 2029 metadata: Optional metadata to be attached to the score 2030 2031 Example: 2032 ```python 2033 # Create a numeric score for accuracy 2034 langfuse.create_score( 2035 name="accuracy", 2036 value=0.92, 2037 trace_id="abcdef1234567890abcdef1234567890", 2038 data_type="NUMERIC", 2039 comment="High accuracy with minor irrelevant details" 2040 ) 2041 2042 # Create a categorical score for sentiment 2043 langfuse.create_score( 2044 name="sentiment", 2045 value="positive", 2046 trace_id="abcdef1234567890abcdef1234567890", 2047 observation_id="abcdef1234567890", 2048 data_type="CATEGORICAL" 2049 ) 2050 ``` 2051 """ 2052 if not self._tracing_enabled: 2053 return 2054 2055 score_id = score_id or self._create_observation_id() 2056 2057 try: 2058 new_body = ScoreBody( 2059 id=score_id, 2060 sessionId=session_id, 2061 datasetRunId=dataset_run_id, 2062 traceId=trace_id, 2063 observationId=observation_id, 2064 name=name, 2065 value=value, 2066 dataType=data_type, # type: ignore 2067 comment=comment, 2068 configId=config_id, 2069 environment=self._environment, 2070 metadata=metadata, 2071 ) 2072 2073 event = { 2074 "id": self.create_trace_id(), 2075 "type": "score-create", 2076 "timestamp": _get_timestamp(), 2077 "body": new_body, 2078 } 2079 2080 if self._resources is not None: 2081 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2082 force_sample = ( 2083 not self._is_valid_trace_id(trace_id) if trace_id else True 2084 ) 2085 2086 self._resources.add_score_task( 2087 event, 2088 force_sample=force_sample, 2089 ) 2090 2091 except Exception as e: 2092 langfuse_logger.exception( 2093 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2094 ) 2095 2096 @overload 2097 def score_current_span( 2098 self, 2099 *, 2100 name: str, 2101 value: float, 2102 score_id: Optional[str] = None, 2103 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2104 comment: Optional[str] = None, 2105 config_id: Optional[str] = None, 2106 ) -> None: ... 2107 2108 @overload 2109 def score_current_span( 2110 self, 2111 *, 2112 name: str, 2113 value: str, 2114 score_id: Optional[str] = None, 2115 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2116 comment: Optional[str] = None, 2117 config_id: Optional[str] = None, 2118 ) -> None: ... 2119 2120 def score_current_span( 2121 self, 2122 *, 2123 name: str, 2124 value: Union[float, str], 2125 score_id: Optional[str] = None, 2126 data_type: Optional[ScoreDataType] = None, 2127 comment: Optional[str] = None, 2128 config_id: Optional[str] = None, 2129 ) -> None: 2130 """Create a score for the current active span. 2131 2132 This method scores the currently active span in the context. It's a convenient 2133 way to score the current operation without needing to know its trace and span IDs. 2134 2135 Args: 2136 name: Name of the score (e.g., "relevance", "accuracy") 2137 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2138 score_id: Optional custom ID for the score (auto-generated if not provided) 2139 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2140 comment: Optional comment or explanation for the score 2141 config_id: Optional ID of a score config defined in Langfuse 2142 2143 Example: 2144 ```python 2145 with langfuse.start_as_current_generation(name="answer-query") as generation: 2146 # Generate answer 2147 response = generate_answer(...) 2148 generation.update(output=response) 2149 2150 # Score the generation 2151 langfuse.score_current_span( 2152 name="relevance", 2153 value=0.85, 2154 data_type="NUMERIC", 2155 comment="Mostly relevant but contains some tangential information" 2156 ) 2157 ``` 2158 """ 2159 current_span = self._get_current_otel_span() 2160 2161 if current_span is not None: 2162 trace_id = self._get_otel_trace_id(current_span) 2163 observation_id = self._get_otel_span_id(current_span) 2164 2165 langfuse_logger.info( 2166 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2167 ) 2168 2169 self.create_score( 2170 trace_id=trace_id, 2171 observation_id=observation_id, 2172 name=name, 2173 value=cast(str, value), 2174 score_id=score_id, 2175 data_type=cast(Literal["CATEGORICAL"], data_type), 2176 comment=comment, 2177 config_id=config_id, 2178 ) 2179 2180 @overload 2181 def score_current_trace( 2182 self, 2183 *, 2184 name: str, 2185 value: float, 2186 score_id: Optional[str] = None, 2187 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2188 comment: Optional[str] = None, 2189 config_id: Optional[str] = None, 2190 ) -> None: ... 2191 2192 @overload 2193 def score_current_trace( 2194 self, 2195 *, 2196 name: str, 2197 value: str, 2198 score_id: Optional[str] = None, 2199 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2200 comment: Optional[str] = None, 2201 config_id: Optional[str] = None, 2202 ) -> None: ... 2203 2204 def score_current_trace( 2205 self, 2206 *, 2207 name: str, 2208 value: Union[float, str], 2209 score_id: Optional[str] = None, 2210 data_type: Optional[ScoreDataType] = None, 2211 comment: Optional[str] = None, 2212 config_id: Optional[str] = None, 2213 ) -> None: 2214 """Create a score for the current trace. 2215 2216 This method scores the trace of the currently active span. Unlike score_current_span, 2217 this method associates the score with the entire trace rather than a specific span. 2218 It's useful for scoring overall performance or quality of the entire operation. 2219 2220 Args: 2221 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2222 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2223 score_id: Optional custom ID for the score (auto-generated if not provided) 2224 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2225 comment: Optional comment or explanation for the score 2226 config_id: Optional ID of a score config defined in Langfuse 2227 2228 Example: 2229 ```python 2230 with langfuse.start_as_current_span(name="process-user-request") as span: 2231 # Process request 2232 result = process_complete_request() 2233 span.update(output=result) 2234 2235 # Score the overall trace 2236 langfuse.score_current_trace( 2237 name="overall_quality", 2238 value=0.95, 2239 data_type="NUMERIC", 2240 comment="High quality end-to-end response" 2241 ) 2242 ``` 2243 """ 2244 current_span = self._get_current_otel_span() 2245 2246 if current_span is not None: 2247 trace_id = self._get_otel_trace_id(current_span) 2248 2249 langfuse_logger.info( 2250 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2251 ) 2252 2253 self.create_score( 2254 trace_id=trace_id, 2255 name=name, 2256 value=cast(str, value), 2257 score_id=score_id, 2258 data_type=cast(Literal["CATEGORICAL"], data_type), 2259 comment=comment, 2260 config_id=config_id, 2261 ) 2262 2263 def flush(self) -> None: 2264 """Force flush all pending spans and events to the Langfuse API. 2265 2266 This method manually flushes any pending spans, scores, and other events to the 2267 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2268 before proceeding, without waiting for the automatic flush interval. 2269 2270 Example: 2271 ```python 2272 # Record some spans and scores 2273 with langfuse.start_as_current_span(name="operation") as span: 2274 # Do work... 2275 pass 2276 2277 # Ensure all data is sent to Langfuse before proceeding 2278 langfuse.flush() 2279 2280 # Continue with other work 2281 ``` 2282 """ 2283 if self._resources is not None: 2284 self._resources.flush() 2285 2286 def shutdown(self) -> None: 2287 """Shut down the Langfuse client and flush all pending data. 2288 2289 This method cleanly shuts down the Langfuse client, ensuring all pending data 2290 is flushed to the API and all background threads are properly terminated. 2291 2292 It's important to call this method when your application is shutting down to 2293 prevent data loss and resource leaks. For most applications, using the client 2294 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2295 2296 Example: 2297 ```python 2298 # Initialize Langfuse 2299 langfuse = Langfuse(public_key="...", secret_key="...") 2300 2301 # Use Langfuse throughout your application 2302 # ... 2303 2304 # When application is shutting down 2305 langfuse.shutdown() 2306 ``` 2307 """ 2308 if self._resources is not None: 2309 self._resources.shutdown() 2310 2311 def get_current_trace_id(self) -> Optional[str]: 2312 """Get the trace ID of the current active span. 2313 2314 This method retrieves the trace ID from the currently active span in the context. 2315 It can be used to get the trace ID for referencing in logs, external systems, 2316 or for creating related operations. 2317 2318 Returns: 2319 The current trace ID as a 32-character lowercase hexadecimal string, 2320 or None if there is no active span. 2321 2322 Example: 2323 ```python 2324 with langfuse.start_as_current_span(name="process-request") as span: 2325 # Get the current trace ID for reference 2326 trace_id = langfuse.get_current_trace_id() 2327 2328 # Use it for external correlation 2329 log.info(f"Processing request with trace_id: {trace_id}") 2330 2331 # Or pass to another system 2332 external_system.process(data, trace_id=trace_id) 2333 ``` 2334 """ 2335 if not self._tracing_enabled: 2336 langfuse_logger.debug( 2337 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2338 ) 2339 return None 2340 2341 current_otel_span = self._get_current_otel_span() 2342 2343 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None 2344 2345 def get_current_observation_id(self) -> Optional[str]: 2346 """Get the observation ID (span ID) of the current active span. 2347 2348 This method retrieves the observation ID from the currently active span in the context. 2349 It can be used to get the observation ID for referencing in logs, external systems, 2350 or for creating scores or other related operations. 2351 2352 Returns: 2353 The current observation ID as a 16-character lowercase hexadecimal string, 2354 or None if there is no active span. 2355 2356 Example: 2357 ```python 2358 with langfuse.start_as_current_span(name="process-user-query") as span: 2359 # Get the current observation ID 2360 observation_id = langfuse.get_current_observation_id() 2361 2362 # Store it for later reference 2363 cache.set(f"query_{query_id}_observation", observation_id) 2364 2365 # Process the query... 2366 ``` 2367 """ 2368 if not self._tracing_enabled: 2369 langfuse_logger.debug( 2370 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2371 ) 2372 return None 2373 2374 current_otel_span = self._get_current_otel_span() 2375 2376 return self._get_otel_span_id(current_otel_span) if current_otel_span else None 2377 2378 def _get_project_id(self) -> Optional[str]: 2379 """Fetch and return the current project id. Persisted across requests. Returns None if no project id is found for api keys.""" 2380 if not self._project_id: 2381 proj = self.api.projects.get() 2382 if not proj.data or not proj.data[0].id: 2383 return None 2384 2385 self._project_id = proj.data[0].id 2386 2387 return self._project_id 2388 2389 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2390 """Get the URL to view a trace in the Langfuse UI. 2391 2392 This method generates a URL that links directly to a trace in the Langfuse UI. 2393 It's useful for providing links in logs, notifications, or debugging tools. 2394 2395 Args: 2396 trace_id: Optional trace ID to generate a URL for. If not provided, 2397 the trace ID of the current active span will be used. 2398 2399 Returns: 2400 A URL string pointing to the trace in the Langfuse UI, 2401 or None if the project ID couldn't be retrieved or no trace ID is available. 2402 2403 Example: 2404 ```python 2405 # Get URL for the current trace 2406 with langfuse.start_as_current_span(name="process-request") as span: 2407 trace_url = langfuse.get_trace_url() 2408 log.info(f"Processing trace: {trace_url}") 2409 2410 # Get URL for a specific trace 2411 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2412 send_notification(f"Review needed for trace: {specific_trace_url}") 2413 ``` 2414 """ 2415 project_id = self._get_project_id() 2416 final_trace_id = trace_id or self.get_current_trace_id() 2417 2418 return ( 2419 f"{self._host}/project/{project_id}/traces/{final_trace_id}" 2420 if project_id and final_trace_id 2421 else None 2422 ) 2423 2424 def get_dataset( 2425 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2426 ) -> "DatasetClient": 2427 """Fetch a dataset by its name. 2428 2429 Args: 2430 name (str): The name of the dataset to fetch. 2431 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2432 2433 Returns: 2434 DatasetClient: The dataset with the given name. 2435 """ 2436 try: 2437 langfuse_logger.debug(f"Getting datasets {name}") 2438 dataset = self.api.datasets.get(dataset_name=name) 2439 2440 dataset_items = [] 2441 page = 1 2442 2443 while True: 2444 new_items = self.api.dataset_items.list( 2445 dataset_name=self._url_encode(name, is_url_param=True), 2446 page=page, 2447 limit=fetch_items_page_size, 2448 ) 2449 dataset_items.extend(new_items.data) 2450 2451 if new_items.meta.total_pages <= page: 2452 break 2453 2454 page += 1 2455 2456 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2457 2458 return DatasetClient(dataset, items=items) 2459 2460 except Error as e: 2461 handle_fern_exception(e) 2462 raise e 2463 2464 def run_experiment( 2465 self, 2466 *, 2467 name: str, 2468 run_name: Optional[str] = None, 2469 description: Optional[str] = None, 2470 data: ExperimentData, 2471 task: TaskFunction, 2472 evaluators: List[EvaluatorFunction] = [], 2473 run_evaluators: List[RunEvaluatorFunction] = [], 2474 max_concurrency: int = 50, 2475 metadata: Optional[Dict[str, Any]] = None, 2476 ) -> ExperimentResult: 2477 """Run an experiment on a dataset with automatic tracing and evaluation. 2478 2479 This method executes a task function on each item in the provided dataset, 2480 automatically traces all executions with Langfuse for observability, runs 2481 item-level and run-level evaluators on the outputs, and returns comprehensive 2482 results with evaluation metrics. 2483 2484 The experiment system provides: 2485 - Automatic tracing of all task executions 2486 - Concurrent processing with configurable limits 2487 - Comprehensive error handling that isolates failures 2488 - Integration with Langfuse datasets for experiment tracking 2489 - Flexible evaluation framework supporting both sync and async evaluators 2490 2491 Args: 2492 name: Human-readable name for the experiment. Used for identification 2493 in the Langfuse UI. 2494 run_name: Optional exact name for the experiment run. If provided, this will be 2495 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2496 If not provided, this will default to the experiment name appended with an ISO timestamp. 2497 description: Optional description explaining the experiment's purpose, 2498 methodology, or expected outcomes. 2499 data: Array of data items to process. Can be either: 2500 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2501 - List of Langfuse DatasetItem objects from dataset.items 2502 task: Function that processes each data item and returns output. 2503 Must accept 'item' as keyword argument and can return sync or async results. 2504 The task function signature should be: task(*, item, **kwargs) -> Any 2505 evaluators: List of functions to evaluate each item's output individually. 2506 Each evaluator receives input, output, expected_output, and metadata. 2507 Can return single Evaluation dict or list of Evaluation dicts. 2508 run_evaluators: List of functions to evaluate the entire experiment run. 2509 Each run evaluator receives all item_results and can compute aggregate metrics. 2510 Useful for calculating averages, distributions, or cross-item comparisons. 2511 max_concurrency: Maximum number of concurrent task executions (default: 50). 2512 Controls the number of items processed simultaneously. Adjust based on 2513 API rate limits and system resources. 2514 metadata: Optional metadata dictionary to attach to all experiment traces. 2515 This metadata will be included in every trace created during the experiment. 2516 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2517 2518 Returns: 2519 ExperimentResult containing: 2520 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2521 - item_results: List of results for each processed item with outputs and evaluations 2522 - run_evaluations: List of aggregate evaluation results for the entire run 2523 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2524 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2525 2526 Raises: 2527 ValueError: If required parameters are missing or invalid 2528 Exception: If experiment setup fails (individual item failures are handled gracefully) 2529 2530 Examples: 2531 Basic experiment with local data: 2532 ```python 2533 def summarize_text(*, item, **kwargs): 2534 return f"Summary: {item['input'][:50]}..." 2535 2536 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2537 return { 2538 "name": "output_length", 2539 "value": len(output), 2540 "comment": f"Output contains {len(output)} characters" 2541 } 2542 2543 result = langfuse.run_experiment( 2544 name="Text Summarization Test", 2545 description="Evaluate summarization quality and length", 2546 data=[ 2547 {"input": "Long article text...", "expected_output": "Expected summary"}, 2548 {"input": "Another article...", "expected_output": "Another summary"} 2549 ], 2550 task=summarize_text, 2551 evaluators=[length_evaluator] 2552 ) 2553 2554 print(f"Processed {len(result.item_results)} items") 2555 for item_result in result.item_results: 2556 print(f"Input: {item_result.item['input']}") 2557 print(f"Output: {item_result.output}") 2558 print(f"Evaluations: {item_result.evaluations}") 2559 ``` 2560 2561 Advanced experiment with async task and multiple evaluators: 2562 ```python 2563 async def llm_task(*, item, **kwargs): 2564 # Simulate async LLM call 2565 response = await openai_client.chat.completions.create( 2566 model="gpt-4", 2567 messages=[{"role": "user", "content": item["input"]}] 2568 ) 2569 return response.choices[0].message.content 2570 2571 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2572 if expected_output and expected_output.lower() in output.lower(): 2573 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2574 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2575 2576 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2577 # Simulate toxicity check 2578 toxicity_score = check_toxicity(output) # Your toxicity checker 2579 return { 2580 "name": "toxicity", 2581 "value": toxicity_score, 2582 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2583 } 2584 2585 def average_accuracy(*, item_results, **kwargs): 2586 accuracies = [ 2587 eval.value for result in item_results 2588 for eval in result.evaluations 2589 if eval.name == "accuracy" 2590 ] 2591 return { 2592 "name": "average_accuracy", 2593 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2594 "comment": f"Average accuracy across {len(accuracies)} items" 2595 } 2596 2597 result = langfuse.run_experiment( 2598 name="LLM Safety and Accuracy Test", 2599 description="Evaluate model accuracy and safety across diverse prompts", 2600 data=test_dataset, # Your dataset items 2601 task=llm_task, 2602 evaluators=[accuracy_evaluator, toxicity_evaluator], 2603 run_evaluators=[average_accuracy], 2604 max_concurrency=5, # Limit concurrent API calls 2605 metadata={"model": "gpt-4", "temperature": 0.7} 2606 ) 2607 ``` 2608 2609 Using with Langfuse datasets: 2610 ```python 2611 # Get dataset from Langfuse 2612 dataset = langfuse.get_dataset("my-eval-dataset") 2613 2614 result = dataset.run_experiment( 2615 name="Production Model Evaluation", 2616 description="Monthly evaluation of production model performance", 2617 task=my_production_task, 2618 evaluators=[accuracy_evaluator, latency_evaluator] 2619 ) 2620 2621 # Results automatically linked to dataset in Langfuse UI 2622 print(f"View results: {result['dataset_run_url']}") 2623 ``` 2624 2625 Note: 2626 - Task and evaluator functions can be either synchronous or asynchronous 2627 - Individual item failures are logged but don't stop the experiment 2628 - All executions are automatically traced and visible in Langfuse UI 2629 - When using Langfuse datasets, results are automatically linked for easy comparison 2630 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2631 - Async execution is handled automatically with smart event loop detection 2632 """ 2633 return cast( 2634 ExperimentResult, 2635 run_async_safely( 2636 self._run_experiment_async( 2637 name=name, 2638 run_name=self._create_experiment_run_name( 2639 name=name, run_name=run_name 2640 ), 2641 description=description, 2642 data=data, 2643 task=task, 2644 evaluators=evaluators or [], 2645 run_evaluators=run_evaluators or [], 2646 max_concurrency=max_concurrency, 2647 metadata=metadata or {}, 2648 ), 2649 ), 2650 ) 2651 2652 async def _run_experiment_async( 2653 self, 2654 *, 2655 name: str, 2656 run_name: str, 2657 description: Optional[str], 2658 data: ExperimentData, 2659 task: TaskFunction, 2660 evaluators: List[EvaluatorFunction], 2661 run_evaluators: List[RunEvaluatorFunction], 2662 max_concurrency: int, 2663 metadata: Dict[str, Any], 2664 ) -> ExperimentResult: 2665 langfuse_logger.debug( 2666 f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" 2667 ) 2668 2669 # Set up concurrency control 2670 semaphore = asyncio.Semaphore(max_concurrency) 2671 2672 # Process all items 2673 async def process_item(item: ExperimentItem) -> ExperimentItemResult: 2674 async with semaphore: 2675 return await self._process_experiment_item( 2676 item, task, evaluators, name, run_name, description, metadata 2677 ) 2678 2679 # Run all items concurrently 2680 tasks = [process_item(item) for item in data] 2681 item_results = await asyncio.gather(*tasks, return_exceptions=True) 2682 2683 # Filter out any exceptions and log errors 2684 valid_results: List[ExperimentItemResult] = [] 2685 for i, result in enumerate(item_results): 2686 if isinstance(result, Exception): 2687 langfuse_logger.error(f"Item {i} failed: {result}") 2688 elif isinstance(result, ExperimentItemResult): 2689 valid_results.append(result) # type: ignore 2690 2691 # Run experiment-level evaluators 2692 run_evaluations: List[Evaluation] = [] 2693 for run_evaluator in run_evaluators: 2694 try: 2695 evaluations = await _run_evaluator( 2696 run_evaluator, item_results=valid_results 2697 ) 2698 run_evaluations.extend(evaluations) 2699 except Exception as e: 2700 langfuse_logger.error(f"Run evaluator failed: {e}") 2701 2702 # Generate dataset run URL if applicable 2703 dataset_run_id = valid_results[0].dataset_run_id if valid_results else None 2704 dataset_run_url = None 2705 if dataset_run_id and data: 2706 try: 2707 # Check if the first item has dataset_id (for DatasetItem objects) 2708 first_item = data[0] 2709 dataset_id = None 2710 2711 if hasattr(first_item, "dataset_id"): 2712 dataset_id = getattr(first_item, "dataset_id", None) 2713 2714 if dataset_id: 2715 project_id = self._get_project_id() 2716 2717 if project_id: 2718 dataset_run_url = f"{self._host}/project/{project_id}/datasets/{dataset_id}/runs/{dataset_run_id}" 2719 2720 except Exception: 2721 pass # URL generation is optional 2722 2723 # Store run-level evaluations as scores 2724 for evaluation in run_evaluations: 2725 try: 2726 if dataset_run_id: 2727 self.create_score( 2728 dataset_run_id=dataset_run_id, 2729 name=evaluation.name or "<unknown>", 2730 value=evaluation.value, # type: ignore 2731 comment=evaluation.comment, 2732 metadata=evaluation.metadata, 2733 data_type=evaluation.data_type, # type: ignore 2734 config_id=evaluation.config_id, 2735 ) 2736 2737 except Exception as e: 2738 langfuse_logger.error(f"Failed to store run evaluation: {e}") 2739 2740 # Flush scores and traces 2741 self.flush() 2742 2743 return ExperimentResult( 2744 name=name, 2745 run_name=run_name, 2746 description=description, 2747 item_results=valid_results, 2748 run_evaluations=run_evaluations, 2749 dataset_run_id=dataset_run_id, 2750 dataset_run_url=dataset_run_url, 2751 ) 2752 2753 async def _process_experiment_item( 2754 self, 2755 item: ExperimentItem, 2756 task: Callable, 2757 evaluators: List[Callable], 2758 experiment_name: str, 2759 experiment_run_name: str, 2760 experiment_description: Optional[str], 2761 experiment_metadata: Dict[str, Any], 2762 ) -> ExperimentItemResult: 2763 # Execute task with tracing 2764 span_name = "experiment-item-run" 2765 2766 with self.start_as_current_span(name=span_name) as span: 2767 try: 2768 output = await _run_task(task, item) 2769 2770 input_data = ( 2771 item.get("input") 2772 if isinstance(item, dict) 2773 else getattr(item, "input", None) 2774 ) 2775 2776 item_metadata: Dict[str, Any] = {} 2777 2778 if isinstance(item, dict): 2779 item_metadata = item.get("metadata", None) or {} 2780 2781 final_metadata = { 2782 "experiment_name": experiment_name, 2783 "experiment_run_name": experiment_run_name, 2784 **experiment_metadata, 2785 } 2786 2787 if ( 2788 not isinstance(item, dict) 2789 and hasattr(item, "dataset_id") 2790 and hasattr(item, "id") 2791 ): 2792 final_metadata.update( 2793 {"dataset_id": item.dataset_id, "dataset_item_id": item.id} 2794 ) 2795 2796 if isinstance(item_metadata, dict): 2797 final_metadata.update(item_metadata) 2798 2799 span.update( 2800 input=input_data, 2801 output=output, 2802 metadata=final_metadata, 2803 ) 2804 2805 # Get trace ID for linking 2806 trace_id = span.trace_id 2807 dataset_run_id = None 2808 2809 # Link to dataset run if this is a dataset item 2810 if hasattr(item, "id") and hasattr(item, "dataset_id"): 2811 try: 2812 dataset_run_item = self.api.dataset_run_items.create( 2813 request=CreateDatasetRunItemRequest( 2814 runName=experiment_run_name, 2815 runDescription=experiment_description, 2816 metadata=experiment_metadata, 2817 datasetItemId=item.id, # type: ignore 2818 traceId=trace_id, 2819 observationId=span.id, 2820 ) 2821 ) 2822 2823 dataset_run_id = dataset_run_item.dataset_run_id 2824 2825 except Exception as e: 2826 langfuse_logger.error(f"Failed to create dataset run item: {e}") 2827 2828 # Run evaluators 2829 evaluations = [] 2830 2831 for evaluator in evaluators: 2832 try: 2833 expected_output = None 2834 2835 if isinstance(item, dict): 2836 expected_output = item.get("expected_output") 2837 elif hasattr(item, "expected_output"): 2838 expected_output = item.expected_output 2839 2840 eval_metadata: Optional[Dict[str, Any]] = None 2841 2842 if isinstance(item, dict): 2843 eval_metadata = item.get("metadata") 2844 elif hasattr(item, "metadata"): 2845 eval_metadata = item.metadata 2846 2847 eval_results = await _run_evaluator( 2848 evaluator, 2849 input=input_data, 2850 output=output, 2851 expected_output=expected_output, 2852 metadata=eval_metadata, 2853 ) 2854 evaluations.extend(eval_results) 2855 2856 # Store evaluations as scores 2857 for evaluation in eval_results: 2858 self.create_score( 2859 trace_id=trace_id, 2860 name=evaluation.name, 2861 value=evaluation.value, # type: ignore 2862 comment=evaluation.comment, 2863 metadata=evaluation.metadata, 2864 config_id=evaluation.config_id, 2865 data_type=evaluation.data_type, # type: ignore 2866 ) 2867 2868 except Exception as e: 2869 langfuse_logger.error(f"Evaluator failed: {e}") 2870 2871 return ExperimentItemResult( 2872 item=item, 2873 output=output, 2874 evaluations=evaluations, 2875 trace_id=trace_id, 2876 dataset_run_id=dataset_run_id, 2877 ) 2878 2879 except Exception as e: 2880 span.update( 2881 output=f"Error: {str(e)}", level="ERROR", status_message=str(e) 2882 ) 2883 raise e 2884 2885 def _create_experiment_run_name( 2886 self, *, name: Optional[str] = None, run_name: Optional[str] = None 2887 ) -> str: 2888 if run_name: 2889 return run_name 2890 2891 iso_timestamp = _get_timestamp().isoformat().replace("+00:00", "Z") 2892 2893 return f"{name} - {iso_timestamp}" 2894 2895 def auth_check(self) -> bool: 2896 """Check if the provided credentials (public and secret key) are valid. 2897 2898 Raises: 2899 Exception: If no projects were found for the provided credentials. 2900 2901 Note: 2902 This method is blocking. It is discouraged to use it in production code. 2903 """ 2904 try: 2905 projects = self.api.projects.get() 2906 langfuse_logger.debug( 2907 f"Auth check successful, found {len(projects.data)} projects" 2908 ) 2909 if len(projects.data) == 0: 2910 raise Exception( 2911 "Auth check failed, no project found for the keys provided." 2912 ) 2913 return True 2914 2915 except AttributeError as e: 2916 langfuse_logger.warning( 2917 f"Auth check failed: Client not properly initialized. Error: {e}" 2918 ) 2919 return False 2920 2921 except Error as e: 2922 handle_fern_exception(e) 2923 raise e 2924 2925 def create_dataset( 2926 self, 2927 *, 2928 name: str, 2929 description: Optional[str] = None, 2930 metadata: Optional[Any] = None, 2931 ) -> Dataset: 2932 """Create a dataset with the given name on Langfuse. 2933 2934 Args: 2935 name: Name of the dataset to create. 2936 description: Description of the dataset. Defaults to None. 2937 metadata: Additional metadata. Defaults to None. 2938 2939 Returns: 2940 Dataset: The created dataset as returned by the Langfuse API. 2941 """ 2942 try: 2943 body = CreateDatasetRequest( 2944 name=name, description=description, metadata=metadata 2945 ) 2946 langfuse_logger.debug(f"Creating datasets {body}") 2947 2948 return self.api.datasets.create(request=body) 2949 2950 except Error as e: 2951 handle_fern_exception(e) 2952 raise e 2953 2954 def create_dataset_item( 2955 self, 2956 *, 2957 dataset_name: str, 2958 input: Optional[Any] = None, 2959 expected_output: Optional[Any] = None, 2960 metadata: Optional[Any] = None, 2961 source_trace_id: Optional[str] = None, 2962 source_observation_id: Optional[str] = None, 2963 status: Optional[DatasetStatus] = None, 2964 id: Optional[str] = None, 2965 ) -> DatasetItem: 2966 """Create a dataset item. 2967 2968 Upserts if an item with id already exists. 2969 2970 Args: 2971 dataset_name: Name of the dataset in which the dataset item should be created. 2972 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2973 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2974 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2975 source_trace_id: Id of the source trace. Defaults to None. 2976 source_observation_id: Id of the source observation. Defaults to None. 2977 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2978 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2979 2980 Returns: 2981 DatasetItem: The created dataset item as returned by the Langfuse API. 2982 2983 Example: 2984 ```python 2985 from langfuse import Langfuse 2986 2987 langfuse = Langfuse() 2988 2989 # Uploading items to the Langfuse dataset named "capital_cities" 2990 langfuse.create_dataset_item( 2991 dataset_name="capital_cities", 2992 input={"input": {"country": "Italy"}}, 2993 expected_output={"expected_output": "Rome"}, 2994 metadata={"foo": "bar"} 2995 ) 2996 ``` 2997 """ 2998 try: 2999 body = CreateDatasetItemRequest( 3000 datasetName=dataset_name, 3001 input=input, 3002 expectedOutput=expected_output, 3003 metadata=metadata, 3004 sourceTraceId=source_trace_id, 3005 sourceObservationId=source_observation_id, 3006 status=status, 3007 id=id, 3008 ) 3009 langfuse_logger.debug(f"Creating dataset item {body}") 3010 return self.api.dataset_items.create(request=body) 3011 except Error as e: 3012 handle_fern_exception(e) 3013 raise e 3014 3015 def resolve_media_references( 3016 self, 3017 *, 3018 obj: Any, 3019 resolve_with: Literal["base64_data_uri"], 3020 max_depth: int = 10, 3021 content_fetch_timeout_seconds: int = 5, 3022 ) -> Any: 3023 """Replace media reference strings in an object with base64 data URIs. 3024 3025 This method recursively traverses an object (up to max_depth) looking for media reference strings 3026 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3027 the provided Langfuse client and replaces the reference string with a base64 data URI. 3028 3029 If fetching media content fails for a reference string, a warning is logged and the reference 3030 string is left unchanged. 3031 3032 Args: 3033 obj: The object to process. Can be a primitive value, array, or nested object. 3034 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3035 resolve_with: The representation of the media content to replace the media reference string with. 3036 Currently only "base64_data_uri" is supported. 3037 max_depth: int: The maximum depth to traverse the object. Default is 10. 3038 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3039 3040 Returns: 3041 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3042 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3043 3044 Example: 3045 obj = { 3046 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3047 "nested": { 3048 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3049 } 3050 } 3051 3052 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3053 3054 # Result: 3055 # { 3056 # "image": "...", 3057 # "nested": { 3058 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3059 # } 3060 # } 3061 """ 3062 return LangfuseMedia.resolve_media_references( 3063 langfuse_client=self, 3064 obj=obj, 3065 resolve_with=resolve_with, 3066 max_depth=max_depth, 3067 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3068 ) 3069 3070 @overload 3071 def get_prompt( 3072 self, 3073 name: str, 3074 *, 3075 version: Optional[int] = None, 3076 label: Optional[str] = None, 3077 type: Literal["chat"], 3078 cache_ttl_seconds: Optional[int] = None, 3079 fallback: Optional[List[ChatMessageDict]] = None, 3080 max_retries: Optional[int] = None, 3081 fetch_timeout_seconds: Optional[int] = None, 3082 ) -> ChatPromptClient: ... 3083 3084 @overload 3085 def get_prompt( 3086 self, 3087 name: str, 3088 *, 3089 version: Optional[int] = None, 3090 label: Optional[str] = None, 3091 type: Literal["text"] = "text", 3092 cache_ttl_seconds: Optional[int] = None, 3093 fallback: Optional[str] = None, 3094 max_retries: Optional[int] = None, 3095 fetch_timeout_seconds: Optional[int] = None, 3096 ) -> TextPromptClient: ... 3097 3098 def get_prompt( 3099 self, 3100 name: str, 3101 *, 3102 version: Optional[int] = None, 3103 label: Optional[str] = None, 3104 type: Literal["chat", "text"] = "text", 3105 cache_ttl_seconds: Optional[int] = None, 3106 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3107 max_retries: Optional[int] = None, 3108 fetch_timeout_seconds: Optional[int] = None, 3109 ) -> PromptClient: 3110 """Get a prompt. 3111 3112 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3113 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3114 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3115 return the expired prompt as a fallback. 3116 3117 Args: 3118 name (str): The name of the prompt to retrieve. 3119 3120 Keyword Args: 3121 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3122 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3123 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3124 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3125 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3126 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3127 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3128 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3129 3130 Returns: 3131 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3132 - TextPromptClient, if type argument is 'text'. 3133 - ChatPromptClient, if type argument is 'chat'. 3134 3135 Raises: 3136 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3137 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3138 """ 3139 if self._resources is None: 3140 raise Error( 3141 "SDK is not correctly initialized. Check the init logs for more details." 3142 ) 3143 if version is not None and label is not None: 3144 raise ValueError("Cannot specify both version and label at the same time.") 3145 3146 if not name: 3147 raise ValueError("Prompt name cannot be empty.") 3148 3149 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3150 bounded_max_retries = self._get_bounded_max_retries( 3151 max_retries, default_max_retries=2, max_retries_upper_bound=4 3152 ) 3153 3154 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3155 cached_prompt = self._resources.prompt_cache.get(cache_key) 3156 3157 if cached_prompt is None or cache_ttl_seconds == 0: 3158 langfuse_logger.debug( 3159 f"Prompt '{cache_key}' not found in cache or caching disabled." 3160 ) 3161 try: 3162 return self._fetch_prompt_and_update_cache( 3163 name, 3164 version=version, 3165 label=label, 3166 ttl_seconds=cache_ttl_seconds, 3167 max_retries=bounded_max_retries, 3168 fetch_timeout_seconds=fetch_timeout_seconds, 3169 ) 3170 except Exception as e: 3171 if fallback: 3172 langfuse_logger.warning( 3173 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3174 ) 3175 3176 fallback_client_args: Dict[str, Any] = { 3177 "name": name, 3178 "prompt": fallback, 3179 "type": type, 3180 "version": version or 0, 3181 "config": {}, 3182 "labels": [label] if label else [], 3183 "tags": [], 3184 } 3185 3186 if type == "text": 3187 return TextPromptClient( 3188 prompt=Prompt_Text(**fallback_client_args), 3189 is_fallback=True, 3190 ) 3191 3192 if type == "chat": 3193 return ChatPromptClient( 3194 prompt=Prompt_Chat(**fallback_client_args), 3195 is_fallback=True, 3196 ) 3197 3198 raise e 3199 3200 if cached_prompt.is_expired(): 3201 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3202 try: 3203 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3204 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3205 3206 def refresh_task() -> None: 3207 self._fetch_prompt_and_update_cache( 3208 name, 3209 version=version, 3210 label=label, 3211 ttl_seconds=cache_ttl_seconds, 3212 max_retries=bounded_max_retries, 3213 fetch_timeout_seconds=fetch_timeout_seconds, 3214 ) 3215 3216 self._resources.prompt_cache.add_refresh_prompt_task( 3217 cache_key, 3218 refresh_task, 3219 ) 3220 langfuse_logger.debug( 3221 f"Returning stale prompt '{cache_key}' from cache." 3222 ) 3223 # return stale prompt 3224 return cached_prompt.value 3225 3226 except Exception as e: 3227 langfuse_logger.warning( 3228 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3229 ) 3230 # creation of refresh prompt task failed, return stale prompt 3231 return cached_prompt.value 3232 3233 return cached_prompt.value 3234 3235 def _fetch_prompt_and_update_cache( 3236 self, 3237 name: str, 3238 *, 3239 version: Optional[int] = None, 3240 label: Optional[str] = None, 3241 ttl_seconds: Optional[int] = None, 3242 max_retries: int, 3243 fetch_timeout_seconds: Optional[int], 3244 ) -> PromptClient: 3245 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3246 langfuse_logger.debug(f"Fetching prompt '{cache_key}' from server...") 3247 3248 try: 3249 3250 @backoff.on_exception( 3251 backoff.constant, Exception, max_tries=max_retries + 1, logger=None 3252 ) 3253 def fetch_prompts() -> Any: 3254 return self.api.prompts.get( 3255 self._url_encode(name), 3256 version=version, 3257 label=label, 3258 request_options={ 3259 "timeout_in_seconds": fetch_timeout_seconds, 3260 } 3261 if fetch_timeout_seconds is not None 3262 else None, 3263 ) 3264 3265 prompt_response = fetch_prompts() 3266 3267 prompt: PromptClient 3268 if prompt_response.type == "chat": 3269 prompt = ChatPromptClient(prompt_response) 3270 else: 3271 prompt = TextPromptClient(prompt_response) 3272 3273 if self._resources is not None: 3274 self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) 3275 3276 return prompt 3277 3278 except Exception as e: 3279 langfuse_logger.error( 3280 f"Error while fetching prompt '{cache_key}': {str(e)}" 3281 ) 3282 raise e 3283 3284 def _get_bounded_max_retries( 3285 self, 3286 max_retries: Optional[int], 3287 *, 3288 default_max_retries: int = 2, 3289 max_retries_upper_bound: int = 4, 3290 ) -> int: 3291 if max_retries is None: 3292 return default_max_retries 3293 3294 bounded_max_retries = min( 3295 max(max_retries, 0), 3296 max_retries_upper_bound, 3297 ) 3298 3299 return bounded_max_retries 3300 3301 @overload 3302 def create_prompt( 3303 self, 3304 *, 3305 name: str, 3306 prompt: List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]], 3307 labels: List[str] = [], 3308 tags: Optional[List[str]] = None, 3309 type: Optional[Literal["chat"]], 3310 config: Optional[Any] = None, 3311 commit_message: Optional[str] = None, 3312 ) -> ChatPromptClient: ... 3313 3314 @overload 3315 def create_prompt( 3316 self, 3317 *, 3318 name: str, 3319 prompt: str, 3320 labels: List[str] = [], 3321 tags: Optional[List[str]] = None, 3322 type: Optional[Literal["text"]] = "text", 3323 config: Optional[Any] = None, 3324 commit_message: Optional[str] = None, 3325 ) -> TextPromptClient: ... 3326 3327 def create_prompt( 3328 self, 3329 *, 3330 name: str, 3331 prompt: Union[ 3332 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3333 ], 3334 labels: List[str] = [], 3335 tags: Optional[List[str]] = None, 3336 type: Optional[Literal["chat", "text"]] = "text", 3337 config: Optional[Any] = None, 3338 commit_message: Optional[str] = None, 3339 ) -> PromptClient: 3340 """Create a new prompt in Langfuse. 3341 3342 Keyword Args: 3343 name : The name of the prompt to be created. 3344 prompt : The content of the prompt to be created. 3345 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3346 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3347 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3348 config: Additional structured data to be saved with the prompt. Defaults to None. 3349 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3350 commit_message: Optional string describing the change. 3351 3352 Returns: 3353 TextPromptClient: The prompt if type argument is 'text'. 3354 ChatPromptClient: The prompt if type argument is 'chat'. 3355 """ 3356 try: 3357 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3358 3359 if type == "chat": 3360 if not isinstance(prompt, list): 3361 raise ValueError( 3362 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3363 ) 3364 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3365 CreatePromptRequest_Chat( 3366 name=name, 3367 prompt=cast(Any, prompt), 3368 labels=labels, 3369 tags=tags, 3370 config=config or {}, 3371 commitMessage=commit_message, 3372 type="chat", 3373 ) 3374 ) 3375 server_prompt = self.api.prompts.create(request=request) 3376 3377 if self._resources is not None: 3378 self._resources.prompt_cache.invalidate(name) 3379 3380 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3381 3382 if not isinstance(prompt, str): 3383 raise ValueError("For 'text' type, 'prompt' must be a string.") 3384 3385 request = CreatePromptRequest_Text( 3386 name=name, 3387 prompt=prompt, 3388 labels=labels, 3389 tags=tags, 3390 config=config or {}, 3391 commitMessage=commit_message, 3392 type="text", 3393 ) 3394 3395 server_prompt = self.api.prompts.create(request=request) 3396 3397 if self._resources is not None: 3398 self._resources.prompt_cache.invalidate(name) 3399 3400 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3401 3402 except Error as e: 3403 handle_fern_exception(e) 3404 raise e 3405 3406 def update_prompt( 3407 self, 3408 *, 3409 name: str, 3410 version: int, 3411 new_labels: List[str] = [], 3412 ) -> Any: 3413 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3414 3415 Args: 3416 name (str): The name of the prompt to update. 3417 version (int): The version number of the prompt to update. 3418 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3419 3420 Returns: 3421 Prompt: The updated prompt from the Langfuse API. 3422 3423 """ 3424 updated_prompt = self.api.prompt_version.update( 3425 name=self._url_encode(name), 3426 version=version, 3427 new_labels=new_labels, 3428 ) 3429 3430 if self._resources is not None: 3431 self._resources.prompt_cache.invalidate(name) 3432 3433 return updated_prompt 3434 3435 def _url_encode(self, url: str, *, is_url_param: Optional[bool] = False) -> str: 3436 # httpx ≥ 0.28 does its own WHATWG-compliant quoting (eg. encodes bare 3437 # “%”, “?”, “#”, “|”, … in query/path parts). Re-quoting here would 3438 # double-encode, so we skip when the value is about to be sent straight 3439 # to httpx (`is_url_param=True`) and the installed version is ≥ 0.28. 3440 if is_url_param and Version(httpx.__version__) >= Version("0.28.0"): 3441 return url 3442 3443 # urllib.parse.quote does not escape slashes "/" by default; we need to add safe="" to force escaping 3444 # we need add safe="" to force escaping of slashes 3445 # This is necessary for prompts in prompt folders 3446 return urllib.parse.quote(url, safe="") 3447 3448 def clear_prompt_cache(self) -> None: 3449 """Clear the entire prompt cache, removing all cached prompts. 3450 3451 This method is useful when you want to force a complete refresh of all 3452 cached prompts, for example after major updates or when you need to 3453 ensure the latest versions are fetched from the server. 3454 """ 3455 if self._resources is not None: 3456 self._resources.prompt_cache.clear()
Main client for Langfuse tracing and platform features.
This class provides an interface for creating and managing traces, spans, and generations in Langfuse as well as interacting with the Langfuse API.
The client features a thread-safe singleton pattern for each unique public API key, ensuring consistent trace context propagation across your application. It implements efficient batching of spans with configurable flush settings and includes background thread management for media uploads and score ingestion.
Configuration is flexible through either direct parameters or environment variables, with graceful fallbacks and runtime configuration updates.
Attributes:
- api: Synchronous API client for Langfuse backend communication
- async_api: Asynchronous API client for Langfuse backend communication
- _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components
Arguments:
- public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable.
- secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable.
- host (Optional[str]): The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_HOST environment variable.
- timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
- httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
- debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
- tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
- flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
- flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable.
- environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
- release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
- media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
- sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
- mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
- blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (
metadata.scope.name
) - additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
- tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.
Example:
from langfuse.otel import Langfuse # Initialize the client (reads from env vars if not provided) langfuse = Langfuse( public_key="your-public-key", secret_key="your-secret-key", host="https://cloud.langfuse.com", # Optional, default shown ) # Create a trace span with langfuse.start_as_current_span(name="process-query") as span: # Your application code here # Create a nested generation span for an LLM call with span.start_as_current_generation( name="generate-response", model="gpt-4", input={"query": "Tell me about AI"}, model_parameters={"temperature": 0.7, "max_tokens": 500} ) as generation: # Generate response here response = "AI is a field of computer science..." generation.update( output=response, usage_details={"prompt_tokens": 10, "completion_tokens": 50}, cost_details={"total_cost": 0.0023} ) # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) generation.score(name="relevance", value=0.95, data_type="NUMERIC")
194 def __init__( 195 self, 196 *, 197 public_key: Optional[str] = None, 198 secret_key: Optional[str] = None, 199 host: Optional[str] = None, 200 timeout: Optional[int] = None, 201 httpx_client: Optional[httpx.Client] = None, 202 debug: bool = False, 203 tracing_enabled: Optional[bool] = True, 204 flush_at: Optional[int] = None, 205 flush_interval: Optional[float] = None, 206 environment: Optional[str] = None, 207 release: Optional[str] = None, 208 media_upload_thread_count: Optional[int] = None, 209 sample_rate: Optional[float] = None, 210 mask: Optional[MaskFunction] = None, 211 blocked_instrumentation_scopes: Optional[List[str]] = None, 212 additional_headers: Optional[Dict[str, str]] = None, 213 tracer_provider: Optional[TracerProvider] = None, 214 ): 215 self._host = host or cast( 216 str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 217 ) 218 self._environment = environment or cast( 219 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 220 ) 221 self._project_id: Optional[str] = None 222 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 223 if not 0.0 <= sample_rate <= 1.0: 224 raise ValueError( 225 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 226 ) 227 228 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 229 230 self._tracing_enabled = ( 231 tracing_enabled 232 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 233 ) 234 if not self._tracing_enabled: 235 langfuse_logger.info( 236 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 237 ) 238 239 debug = ( 240 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 241 ) 242 if debug: 243 logging.basicConfig( 244 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 245 ) 246 langfuse_logger.setLevel(logging.DEBUG) 247 248 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 249 if public_key is None: 250 langfuse_logger.warning( 251 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 252 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 253 ) 254 self._otel_tracer = otel_trace_api.NoOpTracer() 255 return 256 257 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 258 if secret_key is None: 259 langfuse_logger.warning( 260 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 261 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 262 ) 263 self._otel_tracer = otel_trace_api.NoOpTracer() 264 return 265 266 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 267 langfuse_logger.warning( 268 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 269 ) 270 271 # Initialize api and tracer if requirements are met 272 self._resources = LangfuseResourceManager( 273 public_key=public_key, 274 secret_key=secret_key, 275 host=self._host, 276 timeout=timeout, 277 environment=self._environment, 278 release=release, 279 flush_at=flush_at, 280 flush_interval=flush_interval, 281 httpx_client=httpx_client, 282 media_upload_thread_count=media_upload_thread_count, 283 sample_rate=sample_rate, 284 mask=mask, 285 tracing_enabled=self._tracing_enabled, 286 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 287 additional_headers=additional_headers, 288 tracer_provider=tracer_provider, 289 ) 290 self._mask = self._resources.mask 291 292 self._otel_tracer = ( 293 self._resources.tracer 294 if self._tracing_enabled and self._resources.tracer is not None 295 else otel_trace_api.NoOpTracer() 296 ) 297 self.api = self._resources.api 298 self.async_api = self._resources.async_api
300 def start_span( 301 self, 302 *, 303 trace_context: Optional[TraceContext] = None, 304 name: str, 305 input: Optional[Any] = None, 306 output: Optional[Any] = None, 307 metadata: Optional[Any] = None, 308 version: Optional[str] = None, 309 level: Optional[SpanLevel] = None, 310 status_message: Optional[str] = None, 311 ) -> LangfuseSpan: 312 """Create a new span for tracing a unit of work. 313 314 This method creates a new span but does not set it as the current span in the 315 context. To create and use a span within a context, use start_as_current_span(). 316 317 The created span will be the child of the current span in the context. 318 319 Args: 320 trace_context: Optional context for connecting to an existing trace 321 name: Name of the span (e.g., function or operation name) 322 input: Input data for the operation (can be any JSON-serializable object) 323 output: Output data from the operation (can be any JSON-serializable object) 324 metadata: Additional metadata to associate with the span 325 version: Version identifier for the code or component 326 level: Importance level of the span (info, warning, error) 327 status_message: Optional status message for the span 328 329 Returns: 330 A LangfuseSpan object that must be ended with .end() when the operation completes 331 332 Example: 333 ```python 334 span = langfuse.start_span(name="process-data") 335 try: 336 # Do work 337 span.update(output="result") 338 finally: 339 span.end() 340 ``` 341 """ 342 return self.start_observation( 343 trace_context=trace_context, 344 name=name, 345 as_type="span", 346 input=input, 347 output=output, 348 metadata=metadata, 349 version=version, 350 level=level, 351 status_message=status_message, 352 )
Create a new span for tracing a unit of work.
This method creates a new span but does not set it as the current span in the context. To create and use a span within a context, use start_as_current_span().
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A LangfuseSpan object that must be ended with .end() when the operation completes
Example:
span = langfuse.start_span(name="process-data") try: # Do work span.update(output="result") finally: span.end()
354 def start_as_current_span( 355 self, 356 *, 357 trace_context: Optional[TraceContext] = None, 358 name: str, 359 input: Optional[Any] = None, 360 output: Optional[Any] = None, 361 metadata: Optional[Any] = None, 362 version: Optional[str] = None, 363 level: Optional[SpanLevel] = None, 364 status_message: Optional[str] = None, 365 end_on_exit: Optional[bool] = None, 366 ) -> _AgnosticContextManager[LangfuseSpan]: 367 """Create a new span and set it as the current span in a context manager. 368 369 This method creates a new span and sets it as the current span within a context 370 manager. Use this method with a 'with' statement to automatically handle span 371 lifecycle within a code block. 372 373 The created span will be the child of the current span in the context. 374 375 Args: 376 trace_context: Optional context for connecting to an existing trace 377 name: Name of the span (e.g., function or operation name) 378 input: Input data for the operation (can be any JSON-serializable object) 379 output: Output data from the operation (can be any JSON-serializable object) 380 metadata: Additional metadata to associate with the span 381 version: Version identifier for the code or component 382 level: Importance level of the span (info, warning, error) 383 status_message: Optional status message for the span 384 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 385 386 Returns: 387 A context manager that yields a LangfuseSpan 388 389 Example: 390 ```python 391 with langfuse.start_as_current_span(name="process-query") as span: 392 # Do work 393 result = process_data() 394 span.update(output=result) 395 396 # Create a child span automatically 397 with span.start_as_current_span(name="sub-operation") as child_span: 398 # Do sub-operation work 399 child_span.update(output="sub-result") 400 ``` 401 """ 402 return self.start_as_current_observation( 403 trace_context=trace_context, 404 name=name, 405 as_type="span", 406 input=input, 407 output=output, 408 metadata=metadata, 409 version=version, 410 level=level, 411 status_message=status_message, 412 end_on_exit=end_on_exit, 413 )
Create a new span and set it as the current span in a context manager.
This method creates a new span and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle span lifecycle within a code block.
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-query") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result")
562 def start_observation( 563 self, 564 *, 565 trace_context: Optional[TraceContext] = None, 566 name: str, 567 as_type: ObservationTypeLiteralNoEvent = "span", 568 input: Optional[Any] = None, 569 output: Optional[Any] = None, 570 metadata: Optional[Any] = None, 571 version: Optional[str] = None, 572 level: Optional[SpanLevel] = None, 573 status_message: Optional[str] = None, 574 completion_start_time: Optional[datetime] = None, 575 model: Optional[str] = None, 576 model_parameters: Optional[Dict[str, MapValue]] = None, 577 usage_details: Optional[Dict[str, int]] = None, 578 cost_details: Optional[Dict[str, float]] = None, 579 prompt: Optional[PromptClient] = None, 580 ) -> Union[ 581 LangfuseSpan, 582 LangfuseGeneration, 583 LangfuseAgent, 584 LangfuseTool, 585 LangfuseChain, 586 LangfuseRetriever, 587 LangfuseEvaluator, 588 LangfuseEmbedding, 589 LangfuseGuardrail, 590 ]: 591 """Create a new observation of the specified type. 592 593 This method creates a new observation but does not set it as the current span in the 594 context. To create and use an observation within a context, use start_as_current_observation(). 595 596 Args: 597 trace_context: Optional context for connecting to an existing trace 598 name: Name of the observation 599 as_type: Type of observation to create (defaults to "span") 600 input: Input data for the operation 601 output: Output data from the operation 602 metadata: Additional metadata to associate with the observation 603 version: Version identifier for the code or component 604 level: Importance level of the observation 605 status_message: Optional status message for the observation 606 completion_start_time: When the model started generating (for generation types) 607 model: Name/identifier of the AI model used (for generation types) 608 model_parameters: Parameters used for the model (for generation types) 609 usage_details: Token usage information (for generation types) 610 cost_details: Cost information (for generation types) 611 prompt: Associated prompt template (for generation types) 612 613 Returns: 614 An observation object of the appropriate type that must be ended with .end() 615 """ 616 if trace_context: 617 trace_id = trace_context.get("trace_id", None) 618 parent_span_id = trace_context.get("parent_span_id", None) 619 620 if trace_id: 621 remote_parent_span = self._create_remote_parent_span( 622 trace_id=trace_id, parent_span_id=parent_span_id 623 ) 624 625 with otel_trace_api.use_span( 626 cast(otel_trace_api.Span, remote_parent_span) 627 ): 628 otel_span = self._otel_tracer.start_span(name=name) 629 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 630 631 return self._create_observation_from_otel_span( 632 otel_span=otel_span, 633 as_type=as_type, 634 input=input, 635 output=output, 636 metadata=metadata, 637 version=version, 638 level=level, 639 status_message=status_message, 640 completion_start_time=completion_start_time, 641 model=model, 642 model_parameters=model_parameters, 643 usage_details=usage_details, 644 cost_details=cost_details, 645 prompt=prompt, 646 ) 647 648 otel_span = self._otel_tracer.start_span(name=name) 649 650 return self._create_observation_from_otel_span( 651 otel_span=otel_span, 652 as_type=as_type, 653 input=input, 654 output=output, 655 metadata=metadata, 656 version=version, 657 level=level, 658 status_message=status_message, 659 completion_start_time=completion_start_time, 660 model=model, 661 model_parameters=model_parameters, 662 usage_details=usage_details, 663 cost_details=cost_details, 664 prompt=prompt, 665 )
Create a new observation of the specified type.
This method creates a new observation but does not set it as the current span in the context. To create and use an observation within a context, use start_as_current_observation().
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation
- status_message: Optional status message for the observation
- completion_start_time: When the model started generating (for generation types)
- model: Name/identifier of the AI model used (for generation types)
- model_parameters: Parameters used for the model (for generation types)
- usage_details: Token usage information (for generation types)
- cost_details: Cost information (for generation types)
- prompt: Associated prompt template (for generation types)
Returns:
An observation object of the appropriate type that must be ended with .end()
737 def start_generation( 738 self, 739 *, 740 trace_context: Optional[TraceContext] = None, 741 name: str, 742 input: Optional[Any] = None, 743 output: Optional[Any] = None, 744 metadata: Optional[Any] = None, 745 version: Optional[str] = None, 746 level: Optional[SpanLevel] = None, 747 status_message: Optional[str] = None, 748 completion_start_time: Optional[datetime] = None, 749 model: Optional[str] = None, 750 model_parameters: Optional[Dict[str, MapValue]] = None, 751 usage_details: Optional[Dict[str, int]] = None, 752 cost_details: Optional[Dict[str, float]] = None, 753 prompt: Optional[PromptClient] = None, 754 ) -> LangfuseGeneration: 755 """Create a new generation span for model generations. 756 757 DEPRECATED: This method is deprecated and will be removed in a future version. 758 Use start_observation(as_type='generation') instead. 759 760 This method creates a specialized span for tracking model generations. 761 It includes additional fields specific to model generations such as model name, 762 token usage, and cost details. 763 764 The created generation span will be the child of the current span in the context. 765 766 Args: 767 trace_context: Optional context for connecting to an existing trace 768 name: Name of the generation operation 769 input: Input data for the model (e.g., prompts) 770 output: Output from the model (e.g., completions) 771 metadata: Additional metadata to associate with the generation 772 version: Version identifier for the model or component 773 level: Importance level of the generation (info, warning, error) 774 status_message: Optional status message for the generation 775 completion_start_time: When the model started generating the response 776 model: Name/identifier of the AI model used (e.g., "gpt-4") 777 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 778 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 779 cost_details: Cost information for the model call 780 prompt: Associated prompt template from Langfuse prompt management 781 782 Returns: 783 A LangfuseGeneration object that must be ended with .end() when complete 784 785 Example: 786 ```python 787 generation = langfuse.start_generation( 788 name="answer-generation", 789 model="gpt-4", 790 input={"prompt": "Explain quantum computing"}, 791 model_parameters={"temperature": 0.7} 792 ) 793 try: 794 # Call model API 795 response = llm.generate(...) 796 797 generation.update( 798 output=response.text, 799 usage_details={ 800 "prompt_tokens": response.usage.prompt_tokens, 801 "completion_tokens": response.usage.completion_tokens 802 } 803 ) 804 finally: 805 generation.end() 806 ``` 807 """ 808 warnings.warn( 809 "start_generation is deprecated and will be removed in a future version. " 810 "Use start_observation(as_type='generation') instead.", 811 DeprecationWarning, 812 stacklevel=2, 813 ) 814 return self.start_observation( 815 trace_context=trace_context, 816 name=name, 817 as_type="generation", 818 input=input, 819 output=output, 820 metadata=metadata, 821 version=version, 822 level=level, 823 status_message=status_message, 824 completion_start_time=completion_start_time, 825 model=model, 826 model_parameters=model_parameters, 827 usage_details=usage_details, 828 cost_details=cost_details, 829 prompt=prompt, 830 )
Create a new generation span for model generations.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a specialized span for tracking model generations. It includes additional fields specific to model generations such as model name, token usage, and cost details.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A LangfuseGeneration object that must be ended with .end() when complete
Example:
generation = langfuse.start_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"}, model_parameters={"temperature": 0.7} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end()
832 def start_as_current_generation( 833 self, 834 *, 835 trace_context: Optional[TraceContext] = None, 836 name: str, 837 input: Optional[Any] = None, 838 output: Optional[Any] = None, 839 metadata: Optional[Any] = None, 840 version: Optional[str] = None, 841 level: Optional[SpanLevel] = None, 842 status_message: Optional[str] = None, 843 completion_start_time: Optional[datetime] = None, 844 model: Optional[str] = None, 845 model_parameters: Optional[Dict[str, MapValue]] = None, 846 usage_details: Optional[Dict[str, int]] = None, 847 cost_details: Optional[Dict[str, float]] = None, 848 prompt: Optional[PromptClient] = None, 849 end_on_exit: Optional[bool] = None, 850 ) -> _AgnosticContextManager[LangfuseGeneration]: 851 """Create a new generation span and set it as the current span in a context manager. 852 853 DEPRECATED: This method is deprecated and will be removed in a future version. 854 Use start_as_current_observation(as_type='generation') instead. 855 856 This method creates a specialized span for model generations and sets it as the 857 current span within a context manager. Use this method with a 'with' statement to 858 automatically handle the generation span lifecycle within a code block. 859 860 The created generation span will be the child of the current span in the context. 861 862 Args: 863 trace_context: Optional context for connecting to an existing trace 864 name: Name of the generation operation 865 input: Input data for the model (e.g., prompts) 866 output: Output from the model (e.g., completions) 867 metadata: Additional metadata to associate with the generation 868 version: Version identifier for the model or component 869 level: Importance level of the generation (info, warning, error) 870 status_message: Optional status message for the generation 871 completion_start_time: When the model started generating the response 872 model: Name/identifier of the AI model used (e.g., "gpt-4") 873 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 874 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 875 cost_details: Cost information for the model call 876 prompt: Associated prompt template from Langfuse prompt management 877 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 878 879 Returns: 880 A context manager that yields a LangfuseGeneration 881 882 Example: 883 ```python 884 with langfuse.start_as_current_generation( 885 name="answer-generation", 886 model="gpt-4", 887 input={"prompt": "Explain quantum computing"} 888 ) as generation: 889 # Call model API 890 response = llm.generate(...) 891 892 # Update with results 893 generation.update( 894 output=response.text, 895 usage_details={ 896 "prompt_tokens": response.usage.prompt_tokens, 897 "completion_tokens": response.usage.completion_tokens 898 } 899 ) 900 ``` 901 """ 902 warnings.warn( 903 "start_as_current_generation is deprecated and will be removed in a future version. " 904 "Use start_as_current_observation(as_type='generation') instead.", 905 DeprecationWarning, 906 stacklevel=2, 907 ) 908 return self.start_as_current_observation( 909 trace_context=trace_context, 910 name=name, 911 as_type="generation", 912 input=input, 913 output=output, 914 metadata=metadata, 915 version=version, 916 level=level, 917 status_message=status_message, 918 completion_start_time=completion_start_time, 919 model=model, 920 model_parameters=model_parameters, 921 usage_details=usage_details, 922 cost_details=cost_details, 923 prompt=prompt, 924 end_on_exit=end_on_exit, 925 )
Create a new generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a specialized span for model generations and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the generation span lifecycle within a code block.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseGeneration
Example:
with langfuse.start_as_current_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"} ) as generation: # Call model API response = llm.generate(...) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1083 def start_as_current_observation( 1084 self, 1085 *, 1086 trace_context: Optional[TraceContext] = None, 1087 name: str, 1088 as_type: ObservationTypeLiteralNoEvent = "span", 1089 input: Optional[Any] = None, 1090 output: Optional[Any] = None, 1091 metadata: Optional[Any] = None, 1092 version: Optional[str] = None, 1093 level: Optional[SpanLevel] = None, 1094 status_message: Optional[str] = None, 1095 completion_start_time: Optional[datetime] = None, 1096 model: Optional[str] = None, 1097 model_parameters: Optional[Dict[str, MapValue]] = None, 1098 usage_details: Optional[Dict[str, int]] = None, 1099 cost_details: Optional[Dict[str, float]] = None, 1100 prompt: Optional[PromptClient] = None, 1101 end_on_exit: Optional[bool] = None, 1102 ) -> Union[ 1103 _AgnosticContextManager[LangfuseGeneration], 1104 _AgnosticContextManager[LangfuseSpan], 1105 _AgnosticContextManager[LangfuseAgent], 1106 _AgnosticContextManager[LangfuseTool], 1107 _AgnosticContextManager[LangfuseChain], 1108 _AgnosticContextManager[LangfuseRetriever], 1109 _AgnosticContextManager[LangfuseEvaluator], 1110 _AgnosticContextManager[LangfuseEmbedding], 1111 _AgnosticContextManager[LangfuseGuardrail], 1112 ]: 1113 """Create a new observation and set it as the current span in a context manager. 1114 1115 This method creates a new observation of the specified type and sets it as the 1116 current span within a context manager. Use this method with a 'with' statement to 1117 automatically handle the observation lifecycle within a code block. 1118 1119 The created observation will be the child of the current span in the context. 1120 1121 Args: 1122 trace_context: Optional context for connecting to an existing trace 1123 name: Name of the observation (e.g., function or operation name) 1124 as_type: Type of observation to create (defaults to "span") 1125 input: Input data for the operation (can be any JSON-serializable object) 1126 output: Output data from the operation (can be any JSON-serializable object) 1127 metadata: Additional metadata to associate with the observation 1128 version: Version identifier for the code or component 1129 level: Importance level of the observation (info, warning, error) 1130 status_message: Optional status message for the observation 1131 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1132 1133 The following parameters are available when as_type is: "generation" or "embedding". 1134 completion_start_time: When the model started generating the response 1135 model: Name/identifier of the AI model used (e.g., "gpt-4") 1136 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1137 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1138 cost_details: Cost information for the model call 1139 prompt: Associated prompt template from Langfuse prompt management 1140 1141 Returns: 1142 A context manager that yields the appropriate observation type based on as_type 1143 1144 Example: 1145 ```python 1146 # Create a span 1147 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1148 # Do work 1149 result = process_data() 1150 span.update(output=result) 1151 1152 # Create a child span automatically 1153 with span.start_as_current_span(name="sub-operation") as child_span: 1154 # Do sub-operation work 1155 child_span.update(output="sub-result") 1156 1157 # Create a tool observation 1158 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1159 # Do tool work 1160 results = search_web(query) 1161 tool.update(output=results) 1162 1163 # Create a generation observation 1164 with langfuse.start_as_current_observation( 1165 name="answer-generation", 1166 as_type="generation", 1167 model="gpt-4" 1168 ) as generation: 1169 # Generate answer 1170 response = llm.generate(...) 1171 generation.update(output=response) 1172 ``` 1173 """ 1174 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1175 if trace_context: 1176 trace_id = trace_context.get("trace_id", None) 1177 parent_span_id = trace_context.get("parent_span_id", None) 1178 1179 if trace_id: 1180 remote_parent_span = self._create_remote_parent_span( 1181 trace_id=trace_id, parent_span_id=parent_span_id 1182 ) 1183 1184 return cast( 1185 Union[ 1186 _AgnosticContextManager[LangfuseGeneration], 1187 _AgnosticContextManager[LangfuseEmbedding], 1188 ], 1189 self._create_span_with_parent_context( 1190 as_type=as_type, 1191 name=name, 1192 remote_parent_span=remote_parent_span, 1193 parent=None, 1194 end_on_exit=end_on_exit, 1195 input=input, 1196 output=output, 1197 metadata=metadata, 1198 version=version, 1199 level=level, 1200 status_message=status_message, 1201 completion_start_time=completion_start_time, 1202 model=model, 1203 model_parameters=model_parameters, 1204 usage_details=usage_details, 1205 cost_details=cost_details, 1206 prompt=prompt, 1207 ), 1208 ) 1209 1210 return cast( 1211 Union[ 1212 _AgnosticContextManager[LangfuseGeneration], 1213 _AgnosticContextManager[LangfuseEmbedding], 1214 ], 1215 self._start_as_current_otel_span_with_processed_media( 1216 as_type=as_type, 1217 name=name, 1218 end_on_exit=end_on_exit, 1219 input=input, 1220 output=output, 1221 metadata=metadata, 1222 version=version, 1223 level=level, 1224 status_message=status_message, 1225 completion_start_time=completion_start_time, 1226 model=model, 1227 model_parameters=model_parameters, 1228 usage_details=usage_details, 1229 cost_details=cost_details, 1230 prompt=prompt, 1231 ), 1232 ) 1233 1234 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1235 if trace_context: 1236 trace_id = trace_context.get("trace_id", None) 1237 parent_span_id = trace_context.get("parent_span_id", None) 1238 1239 if trace_id: 1240 remote_parent_span = self._create_remote_parent_span( 1241 trace_id=trace_id, parent_span_id=parent_span_id 1242 ) 1243 1244 return cast( 1245 Union[ 1246 _AgnosticContextManager[LangfuseSpan], 1247 _AgnosticContextManager[LangfuseAgent], 1248 _AgnosticContextManager[LangfuseTool], 1249 _AgnosticContextManager[LangfuseChain], 1250 _AgnosticContextManager[LangfuseRetriever], 1251 _AgnosticContextManager[LangfuseEvaluator], 1252 _AgnosticContextManager[LangfuseGuardrail], 1253 ], 1254 self._create_span_with_parent_context( 1255 as_type=as_type, 1256 name=name, 1257 remote_parent_span=remote_parent_span, 1258 parent=None, 1259 end_on_exit=end_on_exit, 1260 input=input, 1261 output=output, 1262 metadata=metadata, 1263 version=version, 1264 level=level, 1265 status_message=status_message, 1266 ), 1267 ) 1268 1269 return cast( 1270 Union[ 1271 _AgnosticContextManager[LangfuseSpan], 1272 _AgnosticContextManager[LangfuseAgent], 1273 _AgnosticContextManager[LangfuseTool], 1274 _AgnosticContextManager[LangfuseChain], 1275 _AgnosticContextManager[LangfuseRetriever], 1276 _AgnosticContextManager[LangfuseEvaluator], 1277 _AgnosticContextManager[LangfuseGuardrail], 1278 ], 1279 self._start_as_current_otel_span_with_processed_media( 1280 as_type=as_type, 1281 name=name, 1282 end_on_exit=end_on_exit, 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 ), 1290 ) 1291 1292 # This should never be reached since all valid types are handled above 1293 langfuse_logger.warning( 1294 f"Unknown observation type: {as_type}, falling back to span" 1295 ) 1296 return self._start_as_current_otel_span_with_processed_media( 1297 as_type="span", 1298 name=name, 1299 end_on_exit=end_on_exit, 1300 input=input, 1301 output=output, 1302 metadata=metadata, 1303 version=version, 1304 level=level, 1305 status_message=status_message, 1306 )
Create a new observation and set it as the current span in a context manager.
This method creates a new observation of the specified type and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the observation lifecycle within a code block.
The created observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation (e.g., function or operation name)
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation (info, warning, error)
- status_message: Optional status message for the observation
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
- The following parameters are available when as_type is: "generation" or "embedding".
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields the appropriate observation type based on as_type
Example:
# Create a span with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result") # Create a tool observation with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: # Do tool work results = search_web(query) tool.update(output=results) # Create a generation observation with langfuse.start_as_current_observation( name="answer-generation", as_type="generation", model="gpt-4" ) as generation: # Generate answer response = llm.generate(...) generation.update(output=response)
1467 def update_current_generation( 1468 self, 1469 *, 1470 name: Optional[str] = None, 1471 input: Optional[Any] = None, 1472 output: Optional[Any] = None, 1473 metadata: Optional[Any] = None, 1474 version: Optional[str] = None, 1475 level: Optional[SpanLevel] = None, 1476 status_message: Optional[str] = None, 1477 completion_start_time: Optional[datetime] = None, 1478 model: Optional[str] = None, 1479 model_parameters: Optional[Dict[str, MapValue]] = None, 1480 usage_details: Optional[Dict[str, int]] = None, 1481 cost_details: Optional[Dict[str, float]] = None, 1482 prompt: Optional[PromptClient] = None, 1483 ) -> None: 1484 """Update the current active generation span with new information. 1485 1486 This method updates the current generation span in the active context with 1487 additional information. It's useful for adding output, usage stats, or other 1488 details that become available during or after model generation. 1489 1490 Args: 1491 name: The generation name 1492 input: Updated input data for the model 1493 output: Output from the model (e.g., completions) 1494 metadata: Additional metadata to associate with the generation 1495 version: Version identifier for the model or component 1496 level: Importance level of the generation (info, warning, error) 1497 status_message: Optional status message for the generation 1498 completion_start_time: When the model started generating the response 1499 model: Name/identifier of the AI model used (e.g., "gpt-4") 1500 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1501 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1502 cost_details: Cost information for the model call 1503 prompt: Associated prompt template from Langfuse prompt management 1504 1505 Example: 1506 ```python 1507 with langfuse.start_as_current_generation(name="answer-query") as generation: 1508 # Initial setup and API call 1509 response = llm.generate(...) 1510 1511 # Update with results that weren't available at creation time 1512 langfuse.update_current_generation( 1513 output=response.text, 1514 usage_details={ 1515 "prompt_tokens": response.usage.prompt_tokens, 1516 "completion_tokens": response.usage.completion_tokens 1517 } 1518 ) 1519 ``` 1520 """ 1521 if not self._tracing_enabled: 1522 langfuse_logger.debug( 1523 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1524 ) 1525 return 1526 1527 current_otel_span = self._get_current_otel_span() 1528 1529 if current_otel_span is not None: 1530 generation = LangfuseGeneration( 1531 otel_span=current_otel_span, langfuse_client=self 1532 ) 1533 1534 if name: 1535 current_otel_span.update_name(name) 1536 1537 generation.update( 1538 input=input, 1539 output=output, 1540 metadata=metadata, 1541 version=version, 1542 level=level, 1543 status_message=status_message, 1544 completion_start_time=completion_start_time, 1545 model=model, 1546 model_parameters=model_parameters, 1547 usage_details=usage_details, 1548 cost_details=cost_details, 1549 prompt=prompt, 1550 )
Update the current active generation span with new information.
This method updates the current generation span in the active context with additional information. It's useful for adding output, usage stats, or other details that become available during or after model generation.
Arguments:
- name: The generation name
- input: Updated input data for the model
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Initial setup and API call response = llm.generate(...) # Update with results that weren't available at creation time langfuse.update_current_generation( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1552 def update_current_span( 1553 self, 1554 *, 1555 name: Optional[str] = None, 1556 input: Optional[Any] = None, 1557 output: Optional[Any] = None, 1558 metadata: Optional[Any] = None, 1559 version: Optional[str] = None, 1560 level: Optional[SpanLevel] = None, 1561 status_message: Optional[str] = None, 1562 ) -> None: 1563 """Update the current active span with new information. 1564 1565 This method updates the current span in the active context with 1566 additional information. It's useful for adding outputs or metadata 1567 that become available during execution. 1568 1569 Args: 1570 name: The span name 1571 input: Updated input data for the operation 1572 output: Output data from the operation 1573 metadata: Additional metadata to associate with the span 1574 version: Version identifier for the code or component 1575 level: Importance level of the span (info, warning, error) 1576 status_message: Optional status message for the span 1577 1578 Example: 1579 ```python 1580 with langfuse.start_as_current_span(name="process-data") as span: 1581 # Initial processing 1582 result = process_first_part() 1583 1584 # Update with intermediate results 1585 langfuse.update_current_span(metadata={"intermediate_result": result}) 1586 1587 # Continue processing 1588 final_result = process_second_part(result) 1589 1590 # Final update 1591 langfuse.update_current_span(output=final_result) 1592 ``` 1593 """ 1594 if not self._tracing_enabled: 1595 langfuse_logger.debug( 1596 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1597 ) 1598 return 1599 1600 current_otel_span = self._get_current_otel_span() 1601 1602 if current_otel_span is not None: 1603 span = LangfuseSpan( 1604 otel_span=current_otel_span, 1605 langfuse_client=self, 1606 environment=self._environment, 1607 ) 1608 1609 if name: 1610 current_otel_span.update_name(name) 1611 1612 span.update( 1613 input=input, 1614 output=output, 1615 metadata=metadata, 1616 version=version, 1617 level=level, 1618 status_message=status_message, 1619 )
Update the current active span with new information.
This method updates the current span in the active context with additional information. It's useful for adding outputs or metadata that become available during execution.
Arguments:
- name: The span name
- input: Updated input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Example:
with langfuse.start_as_current_span(name="process-data") as span: # Initial processing result = process_first_part() # Update with intermediate results langfuse.update_current_span(metadata={"intermediate_result": result}) # Continue processing final_result = process_second_part(result) # Final update langfuse.update_current_span(output=final_result)
1621 def update_current_trace( 1622 self, 1623 *, 1624 name: Optional[str] = None, 1625 user_id: Optional[str] = None, 1626 session_id: Optional[str] = None, 1627 version: Optional[str] = None, 1628 input: Optional[Any] = None, 1629 output: Optional[Any] = None, 1630 metadata: Optional[Any] = None, 1631 tags: Optional[List[str]] = None, 1632 public: Optional[bool] = None, 1633 ) -> None: 1634 """Update the current trace with additional information. 1635 1636 This method updates the Langfuse trace that the current span belongs to. It's useful for 1637 adding trace-level metadata like user ID, session ID, or tags that apply to 1638 the entire Langfuse trace rather than just a single observation. 1639 1640 Args: 1641 name: Updated name for the Langfuse trace 1642 user_id: ID of the user who initiated the Langfuse trace 1643 session_id: Session identifier for grouping related Langfuse traces 1644 version: Version identifier for the application or service 1645 input: Input data for the overall Langfuse trace 1646 output: Output data from the overall Langfuse trace 1647 metadata: Additional metadata to associate with the Langfuse trace 1648 tags: List of tags to categorize the Langfuse trace 1649 public: Whether the Langfuse trace should be publicly accessible 1650 1651 Example: 1652 ```python 1653 with langfuse.start_as_current_span(name="handle-request") as span: 1654 # Get user information 1655 user = authenticate_user(request) 1656 1657 # Update trace with user context 1658 langfuse.update_current_trace( 1659 user_id=user.id, 1660 session_id=request.session_id, 1661 tags=["production", "web-app"] 1662 ) 1663 1664 # Continue processing 1665 response = process_request(request) 1666 1667 # Update span with results 1668 span.update(output=response) 1669 ``` 1670 """ 1671 if not self._tracing_enabled: 1672 langfuse_logger.debug( 1673 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1674 ) 1675 return 1676 1677 current_otel_span = self._get_current_otel_span() 1678 1679 if current_otel_span is not None: 1680 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1681 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1682 ) 1683 # We need to preserve the class to keep the correct observation type 1684 span_class = self._get_span_class(existing_observation_type) 1685 span = span_class( 1686 otel_span=current_otel_span, 1687 langfuse_client=self, 1688 environment=self._environment, 1689 ) 1690 1691 span.update_trace( 1692 name=name, 1693 user_id=user_id, 1694 session_id=session_id, 1695 version=version, 1696 input=input, 1697 output=output, 1698 metadata=metadata, 1699 tags=tags, 1700 public=public, 1701 )
Update the current trace with additional information.
This method updates the Langfuse trace that the current span belongs to. It's useful for adding trace-level metadata like user ID, session ID, or tags that apply to the entire Langfuse trace rather than just a single observation.
Arguments:
- name: Updated name for the Langfuse trace
- user_id: ID of the user who initiated the Langfuse trace
- session_id: Session identifier for grouping related Langfuse traces
- version: Version identifier for the application or service
- input: Input data for the overall Langfuse trace
- output: Output data from the overall Langfuse trace
- metadata: Additional metadata to associate with the Langfuse trace
- tags: List of tags to categorize the Langfuse trace
- public: Whether the Langfuse trace should be publicly accessible
Example:
with langfuse.start_as_current_span(name="handle-request") as span: # Get user information user = authenticate_user(request) # Update trace with user context langfuse.update_current_trace( user_id=user.id, session_id=request.session_id, tags=["production", "web-app"] ) # Continue processing response = process_request(request) # Update span with results span.update(output=response)
1703 def create_event( 1704 self, 1705 *, 1706 trace_context: Optional[TraceContext] = None, 1707 name: str, 1708 input: Optional[Any] = None, 1709 output: Optional[Any] = None, 1710 metadata: Optional[Any] = None, 1711 version: Optional[str] = None, 1712 level: Optional[SpanLevel] = None, 1713 status_message: Optional[str] = None, 1714 ) -> LangfuseEvent: 1715 """Create a new Langfuse observation of type 'EVENT'. 1716 1717 The created Langfuse Event observation will be the child of the current span in the context. 1718 1719 Args: 1720 trace_context: Optional context for connecting to an existing trace 1721 name: Name of the span (e.g., function or operation name) 1722 input: Input data for the operation (can be any JSON-serializable object) 1723 output: Output data from the operation (can be any JSON-serializable object) 1724 metadata: Additional metadata to associate with the span 1725 version: Version identifier for the code or component 1726 level: Importance level of the span (info, warning, error) 1727 status_message: Optional status message for the span 1728 1729 Returns: 1730 The Langfuse Event object 1731 1732 Example: 1733 ```python 1734 event = langfuse.create_event(name="process-event") 1735 ``` 1736 """ 1737 timestamp = time_ns() 1738 1739 if trace_context: 1740 trace_id = trace_context.get("trace_id", None) 1741 parent_span_id = trace_context.get("parent_span_id", None) 1742 1743 if trace_id: 1744 remote_parent_span = self._create_remote_parent_span( 1745 trace_id=trace_id, parent_span_id=parent_span_id 1746 ) 1747 1748 with otel_trace_api.use_span( 1749 cast(otel_trace_api.Span, remote_parent_span) 1750 ): 1751 otel_span = self._otel_tracer.start_span( 1752 name=name, start_time=timestamp 1753 ) 1754 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1755 1756 return cast( 1757 LangfuseEvent, 1758 LangfuseEvent( 1759 otel_span=otel_span, 1760 langfuse_client=self, 1761 environment=self._environment, 1762 input=input, 1763 output=output, 1764 metadata=metadata, 1765 version=version, 1766 level=level, 1767 status_message=status_message, 1768 ).end(end_time=timestamp), 1769 ) 1770 1771 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1772 1773 return cast( 1774 LangfuseEvent, 1775 LangfuseEvent( 1776 otel_span=otel_span, 1777 langfuse_client=self, 1778 environment=self._environment, 1779 input=input, 1780 output=output, 1781 metadata=metadata, 1782 version=version, 1783 level=level, 1784 status_message=status_message, 1785 ).end(end_time=timestamp), 1786 )
Create a new Langfuse observation of type 'EVENT'.
The created Langfuse Event observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The Langfuse Event object
Example:
event = langfuse.create_event(name="process-event")
1875 @staticmethod 1876 def create_trace_id(*, seed: Optional[str] = None) -> str: 1877 """Create a unique trace ID for use with Langfuse. 1878 1879 This method generates a unique trace ID for use with various Langfuse APIs. 1880 It can either generate a random ID or create a deterministic ID based on 1881 a seed string. 1882 1883 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1884 This method ensures the generated ID meets this requirement. If you need to 1885 correlate an external ID with a Langfuse trace ID, use the external ID as the 1886 seed to get a valid, deterministic Langfuse trace ID. 1887 1888 Args: 1889 seed: Optional string to use as a seed for deterministic ID generation. 1890 If provided, the same seed will always produce the same ID. 1891 If not provided, a random ID will be generated. 1892 1893 Returns: 1894 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1895 1896 Example: 1897 ```python 1898 # Generate a random trace ID 1899 trace_id = langfuse.create_trace_id() 1900 1901 # Generate a deterministic ID based on a seed 1902 session_trace_id = langfuse.create_trace_id(seed="session-456") 1903 1904 # Correlate an external ID with a Langfuse trace ID 1905 external_id = "external-system-123456" 1906 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1907 1908 # Use the ID with trace context 1909 with langfuse.start_as_current_span( 1910 name="process-request", 1911 trace_context={"trace_id": trace_id} 1912 ) as span: 1913 # Operation will be part of the specific trace 1914 pass 1915 ``` 1916 """ 1917 if not seed: 1918 trace_id_int = RandomIdGenerator().generate_trace_id() 1919 1920 return Langfuse._format_otel_trace_id(trace_id_int) 1921 1922 return sha256(seed.encode("utf-8")).digest()[:16].hex()
Create a unique trace ID for use with Langfuse.
This method generates a unique trace ID for use with various Langfuse APIs. It can either generate a random ID or create a deterministic ID based on a seed string.
Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. This method ensures the generated ID meets this requirement. If you need to correlate an external ID with a Langfuse trace ID, use the external ID as the seed to get a valid, deterministic Langfuse trace ID.
Arguments:
- seed: Optional string to use as a seed for deterministic ID generation. If provided, the same seed will always produce the same ID. If not provided, a random ID will be generated.
Returns:
A 32-character lowercase hexadecimal string representing the Langfuse trace ID.
Example:
# Generate a random trace ID trace_id = langfuse.create_trace_id() # Generate a deterministic ID based on a seed session_trace_id = langfuse.create_trace_id(seed="session-456") # Correlate an external ID with a Langfuse trace ID external_id = "external-system-123456" correlated_trace_id = langfuse.create_trace_id(seed=external_id) # Use the ID with trace context with langfuse.start_as_current_span( name="process-request", trace_context={"trace_id": trace_id} ) as span: # Operation will be part of the specific trace pass
1998 def create_score( 1999 self, 2000 *, 2001 name: str, 2002 value: Union[float, str], 2003 session_id: Optional[str] = None, 2004 dataset_run_id: Optional[str] = None, 2005 trace_id: Optional[str] = None, 2006 observation_id: Optional[str] = None, 2007 score_id: Optional[str] = None, 2008 data_type: Optional[ScoreDataType] = None, 2009 comment: Optional[str] = None, 2010 config_id: Optional[str] = None, 2011 metadata: Optional[Any] = None, 2012 ) -> None: 2013 """Create a score for a specific trace or observation. 2014 2015 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2016 used to track quality metrics, user feedback, or automated evaluations. 2017 2018 Args: 2019 name: Name of the score (e.g., "relevance", "accuracy") 2020 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2021 session_id: ID of the Langfuse session to associate the score with 2022 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2023 trace_id: ID of the Langfuse trace to associate the score with 2024 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2025 score_id: Optional custom ID for the score (auto-generated if not provided) 2026 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2027 comment: Optional comment or explanation for the score 2028 config_id: Optional ID of a score config defined in Langfuse 2029 metadata: Optional metadata to be attached to the score 2030 2031 Example: 2032 ```python 2033 # Create a numeric score for accuracy 2034 langfuse.create_score( 2035 name="accuracy", 2036 value=0.92, 2037 trace_id="abcdef1234567890abcdef1234567890", 2038 data_type="NUMERIC", 2039 comment="High accuracy with minor irrelevant details" 2040 ) 2041 2042 # Create a categorical score for sentiment 2043 langfuse.create_score( 2044 name="sentiment", 2045 value="positive", 2046 trace_id="abcdef1234567890abcdef1234567890", 2047 observation_id="abcdef1234567890", 2048 data_type="CATEGORICAL" 2049 ) 2050 ``` 2051 """ 2052 if not self._tracing_enabled: 2053 return 2054 2055 score_id = score_id or self._create_observation_id() 2056 2057 try: 2058 new_body = ScoreBody( 2059 id=score_id, 2060 sessionId=session_id, 2061 datasetRunId=dataset_run_id, 2062 traceId=trace_id, 2063 observationId=observation_id, 2064 name=name, 2065 value=value, 2066 dataType=data_type, # type: ignore 2067 comment=comment, 2068 configId=config_id, 2069 environment=self._environment, 2070 metadata=metadata, 2071 ) 2072 2073 event = { 2074 "id": self.create_trace_id(), 2075 "type": "score-create", 2076 "timestamp": _get_timestamp(), 2077 "body": new_body, 2078 } 2079 2080 if self._resources is not None: 2081 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2082 force_sample = ( 2083 not self._is_valid_trace_id(trace_id) if trace_id else True 2084 ) 2085 2086 self._resources.add_score_task( 2087 event, 2088 force_sample=force_sample, 2089 ) 2090 2091 except Exception as e: 2092 langfuse_logger.exception( 2093 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2094 )
Create a score for a specific trace or observation.
This method creates a score for evaluating a Langfuse trace or observation. Scores can be used to track quality metrics, user feedback, or automated evaluations.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- session_id: ID of the Langfuse session to associate the score with
- dataset_run_id: ID of the Langfuse dataset run to associate the score with
- trace_id: ID of the Langfuse trace to associate the score with
- observation_id: Optional ID of the specific observation to score. Trace ID must be provided too.
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
- metadata: Optional metadata to be attached to the score
Example:
# Create a numeric score for accuracy langfuse.create_score( name="accuracy", value=0.92, trace_id="abcdef1234567890abcdef1234567890", data_type="NUMERIC", comment="High accuracy with minor irrelevant details" ) # Create a categorical score for sentiment langfuse.create_score( name="sentiment", value="positive", trace_id="abcdef1234567890abcdef1234567890", observation_id="abcdef1234567890", data_type="CATEGORICAL" )
2120 def score_current_span( 2121 self, 2122 *, 2123 name: str, 2124 value: Union[float, str], 2125 score_id: Optional[str] = None, 2126 data_type: Optional[ScoreDataType] = None, 2127 comment: Optional[str] = None, 2128 config_id: Optional[str] = None, 2129 ) -> None: 2130 """Create a score for the current active span. 2131 2132 This method scores the currently active span in the context. It's a convenient 2133 way to score the current operation without needing to know its trace and span IDs. 2134 2135 Args: 2136 name: Name of the score (e.g., "relevance", "accuracy") 2137 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2138 score_id: Optional custom ID for the score (auto-generated if not provided) 2139 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2140 comment: Optional comment or explanation for the score 2141 config_id: Optional ID of a score config defined in Langfuse 2142 2143 Example: 2144 ```python 2145 with langfuse.start_as_current_generation(name="answer-query") as generation: 2146 # Generate answer 2147 response = generate_answer(...) 2148 generation.update(output=response) 2149 2150 # Score the generation 2151 langfuse.score_current_span( 2152 name="relevance", 2153 value=0.85, 2154 data_type="NUMERIC", 2155 comment="Mostly relevant but contains some tangential information" 2156 ) 2157 ``` 2158 """ 2159 current_span = self._get_current_otel_span() 2160 2161 if current_span is not None: 2162 trace_id = self._get_otel_trace_id(current_span) 2163 observation_id = self._get_otel_span_id(current_span) 2164 2165 langfuse_logger.info( 2166 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2167 ) 2168 2169 self.create_score( 2170 trace_id=trace_id, 2171 observation_id=observation_id, 2172 name=name, 2173 value=cast(str, value), 2174 score_id=score_id, 2175 data_type=cast(Literal["CATEGORICAL"], data_type), 2176 comment=comment, 2177 config_id=config_id, 2178 )
Create a score for the current active span.
This method scores the currently active span in the context. It's a convenient way to score the current operation without needing to know its trace and span IDs.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Generate answer response = generate_answer(...) generation.update(output=response) # Score the generation langfuse.score_current_span( name="relevance", value=0.85, data_type="NUMERIC", comment="Mostly relevant but contains some tangential information" )
2204 def score_current_trace( 2205 self, 2206 *, 2207 name: str, 2208 value: Union[float, str], 2209 score_id: Optional[str] = None, 2210 data_type: Optional[ScoreDataType] = None, 2211 comment: Optional[str] = None, 2212 config_id: Optional[str] = None, 2213 ) -> None: 2214 """Create a score for the current trace. 2215 2216 This method scores the trace of the currently active span. Unlike score_current_span, 2217 this method associates the score with the entire trace rather than a specific span. 2218 It's useful for scoring overall performance or quality of the entire operation. 2219 2220 Args: 2221 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2222 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2223 score_id: Optional custom ID for the score (auto-generated if not provided) 2224 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2225 comment: Optional comment or explanation for the score 2226 config_id: Optional ID of a score config defined in Langfuse 2227 2228 Example: 2229 ```python 2230 with langfuse.start_as_current_span(name="process-user-request") as span: 2231 # Process request 2232 result = process_complete_request() 2233 span.update(output=result) 2234 2235 # Score the overall trace 2236 langfuse.score_current_trace( 2237 name="overall_quality", 2238 value=0.95, 2239 data_type="NUMERIC", 2240 comment="High quality end-to-end response" 2241 ) 2242 ``` 2243 """ 2244 current_span = self._get_current_otel_span() 2245 2246 if current_span is not None: 2247 trace_id = self._get_otel_trace_id(current_span) 2248 2249 langfuse_logger.info( 2250 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2251 ) 2252 2253 self.create_score( 2254 trace_id=trace_id, 2255 name=name, 2256 value=cast(str, value), 2257 score_id=score_id, 2258 data_type=cast(Literal["CATEGORICAL"], data_type), 2259 comment=comment, 2260 config_id=config_id, 2261 )
Create a score for the current trace.
This method scores the trace of the currently active span. Unlike score_current_span, this method associates the score with the entire trace rather than a specific span. It's useful for scoring overall performance or quality of the entire operation.
Arguments:
- name: Name of the score (e.g., "user_satisfaction", "overall_quality")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_span(name="process-user-request") as span: # Process request result = process_complete_request() span.update(output=result) # Score the overall trace langfuse.score_current_trace( name="overall_quality", value=0.95, data_type="NUMERIC", comment="High quality end-to-end response" )
2263 def flush(self) -> None: 2264 """Force flush all pending spans and events to the Langfuse API. 2265 2266 This method manually flushes any pending spans, scores, and other events to the 2267 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2268 before proceeding, without waiting for the automatic flush interval. 2269 2270 Example: 2271 ```python 2272 # Record some spans and scores 2273 with langfuse.start_as_current_span(name="operation") as span: 2274 # Do work... 2275 pass 2276 2277 # Ensure all data is sent to Langfuse before proceeding 2278 langfuse.flush() 2279 2280 # Continue with other work 2281 ``` 2282 """ 2283 if self._resources is not None: 2284 self._resources.flush()
Force flush all pending spans and events to the Langfuse API.
This method manually flushes any pending spans, scores, and other events to the Langfuse API. It's useful in scenarios where you want to ensure all data is sent before proceeding, without waiting for the automatic flush interval.
Example:
# Record some spans and scores with langfuse.start_as_current_span(name="operation") as span: # Do work... pass # Ensure all data is sent to Langfuse before proceeding langfuse.flush() # Continue with other work
2286 def shutdown(self) -> None: 2287 """Shut down the Langfuse client and flush all pending data. 2288 2289 This method cleanly shuts down the Langfuse client, ensuring all pending data 2290 is flushed to the API and all background threads are properly terminated. 2291 2292 It's important to call this method when your application is shutting down to 2293 prevent data loss and resource leaks. For most applications, using the client 2294 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2295 2296 Example: 2297 ```python 2298 # Initialize Langfuse 2299 langfuse = Langfuse(public_key="...", secret_key="...") 2300 2301 # Use Langfuse throughout your application 2302 # ... 2303 2304 # When application is shutting down 2305 langfuse.shutdown() 2306 ``` 2307 """ 2308 if self._resources is not None: 2309 self._resources.shutdown()
Shut down the Langfuse client and flush all pending data.
This method cleanly shuts down the Langfuse client, ensuring all pending data is flushed to the API and all background threads are properly terminated.
It's important to call this method when your application is shutting down to prevent data loss and resource leaks. For most applications, using the client as a context manager or relying on the automatic shutdown via atexit is sufficient.
Example:
# Initialize Langfuse langfuse = Langfuse(public_key="...", secret_key="...") # Use Langfuse throughout your application # ... # When application is shutting down langfuse.shutdown()
2311 def get_current_trace_id(self) -> Optional[str]: 2312 """Get the trace ID of the current active span. 2313 2314 This method retrieves the trace ID from the currently active span in the context. 2315 It can be used to get the trace ID for referencing in logs, external systems, 2316 or for creating related operations. 2317 2318 Returns: 2319 The current trace ID as a 32-character lowercase hexadecimal string, 2320 or None if there is no active span. 2321 2322 Example: 2323 ```python 2324 with langfuse.start_as_current_span(name="process-request") as span: 2325 # Get the current trace ID for reference 2326 trace_id = langfuse.get_current_trace_id() 2327 2328 # Use it for external correlation 2329 log.info(f"Processing request with trace_id: {trace_id}") 2330 2331 # Or pass to another system 2332 external_system.process(data, trace_id=trace_id) 2333 ``` 2334 """ 2335 if not self._tracing_enabled: 2336 langfuse_logger.debug( 2337 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2338 ) 2339 return None 2340 2341 current_otel_span = self._get_current_otel_span() 2342 2343 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None
Get the trace ID of the current active span.
This method retrieves the trace ID from the currently active span in the context. It can be used to get the trace ID for referencing in logs, external systems, or for creating related operations.
Returns:
The current trace ID as a 32-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Get the current trace ID for reference trace_id = langfuse.get_current_trace_id() # Use it for external correlation log.info(f"Processing request with trace_id: {trace_id}") # Or pass to another system external_system.process(data, trace_id=trace_id)
2345 def get_current_observation_id(self) -> Optional[str]: 2346 """Get the observation ID (span ID) of the current active span. 2347 2348 This method retrieves the observation ID from the currently active span in the context. 2349 It can be used to get the observation ID for referencing in logs, external systems, 2350 or for creating scores or other related operations. 2351 2352 Returns: 2353 The current observation ID as a 16-character lowercase hexadecimal string, 2354 or None if there is no active span. 2355 2356 Example: 2357 ```python 2358 with langfuse.start_as_current_span(name="process-user-query") as span: 2359 # Get the current observation ID 2360 observation_id = langfuse.get_current_observation_id() 2361 2362 # Store it for later reference 2363 cache.set(f"query_{query_id}_observation", observation_id) 2364 2365 # Process the query... 2366 ``` 2367 """ 2368 if not self._tracing_enabled: 2369 langfuse_logger.debug( 2370 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2371 ) 2372 return None 2373 2374 current_otel_span = self._get_current_otel_span() 2375 2376 return self._get_otel_span_id(current_otel_span) if current_otel_span else None
Get the observation ID (span ID) of the current active span.
This method retrieves the observation ID from the currently active span in the context. It can be used to get the observation ID for referencing in logs, external systems, or for creating scores or other related operations.
Returns:
The current observation ID as a 16-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-user-query") as span: # Get the current observation ID observation_id = langfuse.get_current_observation_id() # Store it for later reference cache.set(f"query_{query_id}_observation", observation_id) # Process the query...
2389 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2390 """Get the URL to view a trace in the Langfuse UI. 2391 2392 This method generates a URL that links directly to a trace in the Langfuse UI. 2393 It's useful for providing links in logs, notifications, or debugging tools. 2394 2395 Args: 2396 trace_id: Optional trace ID to generate a URL for. If not provided, 2397 the trace ID of the current active span will be used. 2398 2399 Returns: 2400 A URL string pointing to the trace in the Langfuse UI, 2401 or None if the project ID couldn't be retrieved or no trace ID is available. 2402 2403 Example: 2404 ```python 2405 # Get URL for the current trace 2406 with langfuse.start_as_current_span(name="process-request") as span: 2407 trace_url = langfuse.get_trace_url() 2408 log.info(f"Processing trace: {trace_url}") 2409 2410 # Get URL for a specific trace 2411 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2412 send_notification(f"Review needed for trace: {specific_trace_url}") 2413 ``` 2414 """ 2415 project_id = self._get_project_id() 2416 final_trace_id = trace_id or self.get_current_trace_id() 2417 2418 return ( 2419 f"{self._host}/project/{project_id}/traces/{final_trace_id}" 2420 if project_id and final_trace_id 2421 else None 2422 )
Get the URL to view a trace in the Langfuse UI.
This method generates a URL that links directly to a trace in the Langfuse UI. It's useful for providing links in logs, notifications, or debugging tools.
Arguments:
- trace_id: Optional trace ID to generate a URL for. If not provided, the trace ID of the current active span will be used.
Returns:
A URL string pointing to the trace in the Langfuse UI, or None if the project ID couldn't be retrieved or no trace ID is available.
Example:
# Get URL for the current trace with langfuse.start_as_current_span(name="process-request") as span: trace_url = langfuse.get_trace_url() log.info(f"Processing trace: {trace_url}") # Get URL for a specific trace specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") send_notification(f"Review needed for trace: {specific_trace_url}")
2424 def get_dataset( 2425 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2426 ) -> "DatasetClient": 2427 """Fetch a dataset by its name. 2428 2429 Args: 2430 name (str): The name of the dataset to fetch. 2431 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2432 2433 Returns: 2434 DatasetClient: The dataset with the given name. 2435 """ 2436 try: 2437 langfuse_logger.debug(f"Getting datasets {name}") 2438 dataset = self.api.datasets.get(dataset_name=name) 2439 2440 dataset_items = [] 2441 page = 1 2442 2443 while True: 2444 new_items = self.api.dataset_items.list( 2445 dataset_name=self._url_encode(name, is_url_param=True), 2446 page=page, 2447 limit=fetch_items_page_size, 2448 ) 2449 dataset_items.extend(new_items.data) 2450 2451 if new_items.meta.total_pages <= page: 2452 break 2453 2454 page += 1 2455 2456 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2457 2458 return DatasetClient(dataset, items=items) 2459 2460 except Error as e: 2461 handle_fern_exception(e) 2462 raise e
Fetch a dataset by its name.
Arguments:
- name (str): The name of the dataset to fetch.
- fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
Returns:
DatasetClient: The dataset with the given name.
2464 def run_experiment( 2465 self, 2466 *, 2467 name: str, 2468 run_name: Optional[str] = None, 2469 description: Optional[str] = None, 2470 data: ExperimentData, 2471 task: TaskFunction, 2472 evaluators: List[EvaluatorFunction] = [], 2473 run_evaluators: List[RunEvaluatorFunction] = [], 2474 max_concurrency: int = 50, 2475 metadata: Optional[Dict[str, Any]] = None, 2476 ) -> ExperimentResult: 2477 """Run an experiment on a dataset with automatic tracing and evaluation. 2478 2479 This method executes a task function on each item in the provided dataset, 2480 automatically traces all executions with Langfuse for observability, runs 2481 item-level and run-level evaluators on the outputs, and returns comprehensive 2482 results with evaluation metrics. 2483 2484 The experiment system provides: 2485 - Automatic tracing of all task executions 2486 - Concurrent processing with configurable limits 2487 - Comprehensive error handling that isolates failures 2488 - Integration with Langfuse datasets for experiment tracking 2489 - Flexible evaluation framework supporting both sync and async evaluators 2490 2491 Args: 2492 name: Human-readable name for the experiment. Used for identification 2493 in the Langfuse UI. 2494 run_name: Optional exact name for the experiment run. If provided, this will be 2495 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2496 If not provided, this will default to the experiment name appended with an ISO timestamp. 2497 description: Optional description explaining the experiment's purpose, 2498 methodology, or expected outcomes. 2499 data: Array of data items to process. Can be either: 2500 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2501 - List of Langfuse DatasetItem objects from dataset.items 2502 task: Function that processes each data item and returns output. 2503 Must accept 'item' as keyword argument and can return sync or async results. 2504 The task function signature should be: task(*, item, **kwargs) -> Any 2505 evaluators: List of functions to evaluate each item's output individually. 2506 Each evaluator receives input, output, expected_output, and metadata. 2507 Can return single Evaluation dict or list of Evaluation dicts. 2508 run_evaluators: List of functions to evaluate the entire experiment run. 2509 Each run evaluator receives all item_results and can compute aggregate metrics. 2510 Useful for calculating averages, distributions, or cross-item comparisons. 2511 max_concurrency: Maximum number of concurrent task executions (default: 50). 2512 Controls the number of items processed simultaneously. Adjust based on 2513 API rate limits and system resources. 2514 metadata: Optional metadata dictionary to attach to all experiment traces. 2515 This metadata will be included in every trace created during the experiment. 2516 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2517 2518 Returns: 2519 ExperimentResult containing: 2520 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2521 - item_results: List of results for each processed item with outputs and evaluations 2522 - run_evaluations: List of aggregate evaluation results for the entire run 2523 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2524 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2525 2526 Raises: 2527 ValueError: If required parameters are missing or invalid 2528 Exception: If experiment setup fails (individual item failures are handled gracefully) 2529 2530 Examples: 2531 Basic experiment with local data: 2532 ```python 2533 def summarize_text(*, item, **kwargs): 2534 return f"Summary: {item['input'][:50]}..." 2535 2536 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2537 return { 2538 "name": "output_length", 2539 "value": len(output), 2540 "comment": f"Output contains {len(output)} characters" 2541 } 2542 2543 result = langfuse.run_experiment( 2544 name="Text Summarization Test", 2545 description="Evaluate summarization quality and length", 2546 data=[ 2547 {"input": "Long article text...", "expected_output": "Expected summary"}, 2548 {"input": "Another article...", "expected_output": "Another summary"} 2549 ], 2550 task=summarize_text, 2551 evaluators=[length_evaluator] 2552 ) 2553 2554 print(f"Processed {len(result.item_results)} items") 2555 for item_result in result.item_results: 2556 print(f"Input: {item_result.item['input']}") 2557 print(f"Output: {item_result.output}") 2558 print(f"Evaluations: {item_result.evaluations}") 2559 ``` 2560 2561 Advanced experiment with async task and multiple evaluators: 2562 ```python 2563 async def llm_task(*, item, **kwargs): 2564 # Simulate async LLM call 2565 response = await openai_client.chat.completions.create( 2566 model="gpt-4", 2567 messages=[{"role": "user", "content": item["input"]}] 2568 ) 2569 return response.choices[0].message.content 2570 2571 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2572 if expected_output and expected_output.lower() in output.lower(): 2573 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2574 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2575 2576 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2577 # Simulate toxicity check 2578 toxicity_score = check_toxicity(output) # Your toxicity checker 2579 return { 2580 "name": "toxicity", 2581 "value": toxicity_score, 2582 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2583 } 2584 2585 def average_accuracy(*, item_results, **kwargs): 2586 accuracies = [ 2587 eval.value for result in item_results 2588 for eval in result.evaluations 2589 if eval.name == "accuracy" 2590 ] 2591 return { 2592 "name": "average_accuracy", 2593 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2594 "comment": f"Average accuracy across {len(accuracies)} items" 2595 } 2596 2597 result = langfuse.run_experiment( 2598 name="LLM Safety and Accuracy Test", 2599 description="Evaluate model accuracy and safety across diverse prompts", 2600 data=test_dataset, # Your dataset items 2601 task=llm_task, 2602 evaluators=[accuracy_evaluator, toxicity_evaluator], 2603 run_evaluators=[average_accuracy], 2604 max_concurrency=5, # Limit concurrent API calls 2605 metadata={"model": "gpt-4", "temperature": 0.7} 2606 ) 2607 ``` 2608 2609 Using with Langfuse datasets: 2610 ```python 2611 # Get dataset from Langfuse 2612 dataset = langfuse.get_dataset("my-eval-dataset") 2613 2614 result = dataset.run_experiment( 2615 name="Production Model Evaluation", 2616 description="Monthly evaluation of production model performance", 2617 task=my_production_task, 2618 evaluators=[accuracy_evaluator, latency_evaluator] 2619 ) 2620 2621 # Results automatically linked to dataset in Langfuse UI 2622 print(f"View results: {result['dataset_run_url']}") 2623 ``` 2624 2625 Note: 2626 - Task and evaluator functions can be either synchronous or asynchronous 2627 - Individual item failures are logged but don't stop the experiment 2628 - All executions are automatically traced and visible in Langfuse UI 2629 - When using Langfuse datasets, results are automatically linked for easy comparison 2630 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2631 - Async execution is handled automatically with smart event loop detection 2632 """ 2633 return cast( 2634 ExperimentResult, 2635 run_async_safely( 2636 self._run_experiment_async( 2637 name=name, 2638 run_name=self._create_experiment_run_name( 2639 name=name, run_name=run_name 2640 ), 2641 description=description, 2642 data=data, 2643 task=task, 2644 evaluators=evaluators or [], 2645 run_evaluators=run_evaluators or [], 2646 max_concurrency=max_concurrency, 2647 metadata=metadata or {}, 2648 ), 2649 ), 2650 )
Run an experiment on a dataset with automatic tracing and evaluation.
This method executes a task function on each item in the provided dataset, automatically traces all executions with Langfuse for observability, runs item-level and run-level evaluators on the outputs, and returns comprehensive results with evaluation metrics.
The experiment system provides:
- Automatic tracing of all task executions
- Concurrent processing with configurable limits
- Comprehensive error handling that isolates failures
- Integration with Langfuse datasets for experiment tracking
- Flexible evaluation framework supporting both sync and async evaluators
Arguments:
- name: Human-readable name for the experiment. Used for identification in the Langfuse UI.
- run_name: Optional exact name for the experiment run. If provided, this will be
used as the exact dataset run name if the
data
contains Langfuse dataset items. If not provided, this will default to the experiment name appended with an ISO timestamp. - description: Optional description explaining the experiment's purpose, methodology, or expected outcomes.
- data: Array of data items to process. Can be either:
- List of dict-like items with 'input', 'expected_output', 'metadata' keys
- List of Langfuse DatasetItem objects from dataset.items
- task: Function that processes each data item and returns output. Must accept 'item' as keyword argument and can return sync or async results. The task function signature should be: task(, item, *kwargs) -> Any
- evaluators: List of functions to evaluate each item's output individually. Each evaluator receives input, output, expected_output, and metadata. Can return single Evaluation dict or list of Evaluation dicts.
- run_evaluators: List of functions to evaluate the entire experiment run. Each run evaluator receives all item_results and can compute aggregate metrics. Useful for calculating averages, distributions, or cross-item comparisons.
- max_concurrency: Maximum number of concurrent task executions (default: 50). Controls the number of items processed simultaneously. Adjust based on API rate limits and system resources.
- metadata: Optional metadata dictionary to attach to all experiment traces.
This metadata will be included in every trace created during the experiment.
If
data
are Langfuse dataset items, the metadata will be attached to the dataset run, too.
Returns:
ExperimentResult containing:
- run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset.
- item_results: List of results for each processed item with outputs and evaluations
- run_evaluations: List of aggregate evaluation results for the entire run
- dataset_run_id: ID of the dataset run (if using Langfuse datasets)
- dataset_run_url: Direct URL to view results in Langfuse UI (if applicable)
Raises:
- ValueError: If required parameters are missing or invalid
- Exception: If experiment setup fails (individual item failures are handled gracefully)
Examples:
Basic experiment with local data:
def summarize_text(*, item, **kwargs): return f"Summary: {item['input'][:50]}..." def length_evaluator(*, input, output, expected_output=None, **kwargs): return { "name": "output_length", "value": len(output), "comment": f"Output contains {len(output)} characters" } result = langfuse.run_experiment( name="Text Summarization Test", description="Evaluate summarization quality and length", data=[ {"input": "Long article text...", "expected_output": "Expected summary"}, {"input": "Another article...", "expected_output": "Another summary"} ], task=summarize_text, evaluators=[length_evaluator] ) print(f"Processed {len(result.item_results)} items") for item_result in result.item_results: print(f"Input: {item_result.item['input']}") print(f"Output: {item_result.output}") print(f"Evaluations: {item_result.evaluations}")
Advanced experiment with async task and multiple evaluators:
async def llm_task(*, item, **kwargs): # Simulate async LLM call response = await openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": item["input"]}] ) return response.choices[0].message.content def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if expected_output and expected_output.lower() in output.lower(): return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): # Simulate toxicity check toxicity_score = check_toxicity(output) # Your toxicity checker return { "name": "toxicity", "value": toxicity_score, "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" } def average_accuracy(*, item_results, **kwargs): accuracies = [ eval.value for result in item_results for eval in result.evaluations if eval.name == "accuracy" ] return { "name": "average_accuracy", "value": sum(accuracies) / len(accuracies) if accuracies else 0, "comment": f"Average accuracy across {len(accuracies)} items" } result = langfuse.run_experiment( name="LLM Safety and Accuracy Test", description="Evaluate model accuracy and safety across diverse prompts", data=test_dataset, # Your dataset items task=llm_task, evaluators=[accuracy_evaluator, toxicity_evaluator], run_evaluators=[average_accuracy], max_concurrency=5, # Limit concurrent API calls metadata={"model": "gpt-4", "temperature": 0.7} )
Using with Langfuse datasets:
# Get dataset from Langfuse dataset = langfuse.get_dataset("my-eval-dataset") result = dataset.run_experiment( name="Production Model Evaluation", description="Monthly evaluation of production model performance", task=my_production_task, evaluators=[accuracy_evaluator, latency_evaluator] ) # Results automatically linked to dataset in Langfuse UI print(f"View results: {result['dataset_run_url']}")
Note:
- Task and evaluator functions can be either synchronous or asynchronous
- Individual item failures are logged but don't stop the experiment
- All executions are automatically traced and visible in Langfuse UI
- When using Langfuse datasets, results are automatically linked for easy comparison
- This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
- Async execution is handled automatically with smart event loop detection
2895 def auth_check(self) -> bool: 2896 """Check if the provided credentials (public and secret key) are valid. 2897 2898 Raises: 2899 Exception: If no projects were found for the provided credentials. 2900 2901 Note: 2902 This method is blocking. It is discouraged to use it in production code. 2903 """ 2904 try: 2905 projects = self.api.projects.get() 2906 langfuse_logger.debug( 2907 f"Auth check successful, found {len(projects.data)} projects" 2908 ) 2909 if len(projects.data) == 0: 2910 raise Exception( 2911 "Auth check failed, no project found for the keys provided." 2912 ) 2913 return True 2914 2915 except AttributeError as e: 2916 langfuse_logger.warning( 2917 f"Auth check failed: Client not properly initialized. Error: {e}" 2918 ) 2919 return False 2920 2921 except Error as e: 2922 handle_fern_exception(e) 2923 raise e
Check if the provided credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
2925 def create_dataset( 2926 self, 2927 *, 2928 name: str, 2929 description: Optional[str] = None, 2930 metadata: Optional[Any] = None, 2931 ) -> Dataset: 2932 """Create a dataset with the given name on Langfuse. 2933 2934 Args: 2935 name: Name of the dataset to create. 2936 description: Description of the dataset. Defaults to None. 2937 metadata: Additional metadata. Defaults to None. 2938 2939 Returns: 2940 Dataset: The created dataset as returned by the Langfuse API. 2941 """ 2942 try: 2943 body = CreateDatasetRequest( 2944 name=name, description=description, metadata=metadata 2945 ) 2946 langfuse_logger.debug(f"Creating datasets {body}") 2947 2948 return self.api.datasets.create(request=body) 2949 2950 except Error as e: 2951 handle_fern_exception(e) 2952 raise e
Create a dataset with the given name on Langfuse.
Arguments:
- name: Name of the dataset to create.
- description: Description of the dataset. Defaults to None.
- metadata: Additional metadata. Defaults to None.
Returns:
Dataset: The created dataset as returned by the Langfuse API.
2954 def create_dataset_item( 2955 self, 2956 *, 2957 dataset_name: str, 2958 input: Optional[Any] = None, 2959 expected_output: Optional[Any] = None, 2960 metadata: Optional[Any] = None, 2961 source_trace_id: Optional[str] = None, 2962 source_observation_id: Optional[str] = None, 2963 status: Optional[DatasetStatus] = None, 2964 id: Optional[str] = None, 2965 ) -> DatasetItem: 2966 """Create a dataset item. 2967 2968 Upserts if an item with id already exists. 2969 2970 Args: 2971 dataset_name: Name of the dataset in which the dataset item should be created. 2972 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2973 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2974 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2975 source_trace_id: Id of the source trace. Defaults to None. 2976 source_observation_id: Id of the source observation. Defaults to None. 2977 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2978 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2979 2980 Returns: 2981 DatasetItem: The created dataset item as returned by the Langfuse API. 2982 2983 Example: 2984 ```python 2985 from langfuse import Langfuse 2986 2987 langfuse = Langfuse() 2988 2989 # Uploading items to the Langfuse dataset named "capital_cities" 2990 langfuse.create_dataset_item( 2991 dataset_name="capital_cities", 2992 input={"input": {"country": "Italy"}}, 2993 expected_output={"expected_output": "Rome"}, 2994 metadata={"foo": "bar"} 2995 ) 2996 ``` 2997 """ 2998 try: 2999 body = CreateDatasetItemRequest( 3000 datasetName=dataset_name, 3001 input=input, 3002 expectedOutput=expected_output, 3003 metadata=metadata, 3004 sourceTraceId=source_trace_id, 3005 sourceObservationId=source_observation_id, 3006 status=status, 3007 id=id, 3008 ) 3009 langfuse_logger.debug(f"Creating dataset item {body}") 3010 return self.api.dataset_items.create(request=body) 3011 except Error as e: 3012 handle_fern_exception(e) 3013 raise e
Create a dataset item.
Upserts if an item with id already exists.
Arguments:
- dataset_name: Name of the dataset in which the dataset item should be created.
- input: Input data. Defaults to None. Can contain any dict, list or scalar.
- expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
- metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
- source_trace_id: Id of the source trace. Defaults to None.
- source_observation_id: Id of the source observation. Defaults to None.
- status: Status of the dataset item. Defaults to ACTIVE for newly created items.
- id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
Returns:
DatasetItem: The created dataset item as returned by the Langfuse API.
Example:
from langfuse import Langfuse langfuse = Langfuse() # Uploading items to the Langfuse dataset named "capital_cities" langfuse.create_dataset_item( dataset_name="capital_cities", input={"input": {"country": "Italy"}}, expected_output={"expected_output": "Rome"}, metadata={"foo": "bar"} )
3015 def resolve_media_references( 3016 self, 3017 *, 3018 obj: Any, 3019 resolve_with: Literal["base64_data_uri"], 3020 max_depth: int = 10, 3021 content_fetch_timeout_seconds: int = 5, 3022 ) -> Any: 3023 """Replace media reference strings in an object with base64 data URIs. 3024 3025 This method recursively traverses an object (up to max_depth) looking for media reference strings 3026 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3027 the provided Langfuse client and replaces the reference string with a base64 data URI. 3028 3029 If fetching media content fails for a reference string, a warning is logged and the reference 3030 string is left unchanged. 3031 3032 Args: 3033 obj: The object to process. Can be a primitive value, array, or nested object. 3034 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3035 resolve_with: The representation of the media content to replace the media reference string with. 3036 Currently only "base64_data_uri" is supported. 3037 max_depth: int: The maximum depth to traverse the object. Default is 10. 3038 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3039 3040 Returns: 3041 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3042 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3043 3044 Example: 3045 obj = { 3046 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3047 "nested": { 3048 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3049 } 3050 } 3051 3052 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3053 3054 # Result: 3055 # { 3056 # "image": "...", 3057 # "nested": { 3058 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3059 # } 3060 # } 3061 """ 3062 return LangfuseMedia.resolve_media_references( 3063 langfuse_client=self, 3064 obj=obj, 3065 resolve_with=resolve_with, 3066 max_depth=max_depth, 3067 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3068 )
Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
Arguments:
- obj: The object to process. Can be a primitive value, array, or nested object. If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
- resolve_with: The representation of the media content to replace the media reference string with. Currently only "base64_data_uri" is supported.
- max_depth: int: The maximum depth to traverse the object. Default is 10.
- content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible. If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = { "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", "nested": { "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" } }
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
Result:
{
"image": "...",
"nested": {
"pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
}
}
3098 def get_prompt( 3099 self, 3100 name: str, 3101 *, 3102 version: Optional[int] = None, 3103 label: Optional[str] = None, 3104 type: Literal["chat", "text"] = "text", 3105 cache_ttl_seconds: Optional[int] = None, 3106 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3107 max_retries: Optional[int] = None, 3108 fetch_timeout_seconds: Optional[int] = None, 3109 ) -> PromptClient: 3110 """Get a prompt. 3111 3112 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3113 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3114 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3115 return the expired prompt as a fallback. 3116 3117 Args: 3118 name (str): The name of the prompt to retrieve. 3119 3120 Keyword Args: 3121 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3122 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3123 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3124 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3125 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3126 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3127 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3128 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3129 3130 Returns: 3131 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3132 - TextPromptClient, if type argument is 'text'. 3133 - ChatPromptClient, if type argument is 'chat'. 3134 3135 Raises: 3136 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3137 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3138 """ 3139 if self._resources is None: 3140 raise Error( 3141 "SDK is not correctly initialized. Check the init logs for more details." 3142 ) 3143 if version is not None and label is not None: 3144 raise ValueError("Cannot specify both version and label at the same time.") 3145 3146 if not name: 3147 raise ValueError("Prompt name cannot be empty.") 3148 3149 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3150 bounded_max_retries = self._get_bounded_max_retries( 3151 max_retries, default_max_retries=2, max_retries_upper_bound=4 3152 ) 3153 3154 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3155 cached_prompt = self._resources.prompt_cache.get(cache_key) 3156 3157 if cached_prompt is None or cache_ttl_seconds == 0: 3158 langfuse_logger.debug( 3159 f"Prompt '{cache_key}' not found in cache or caching disabled." 3160 ) 3161 try: 3162 return self._fetch_prompt_and_update_cache( 3163 name, 3164 version=version, 3165 label=label, 3166 ttl_seconds=cache_ttl_seconds, 3167 max_retries=bounded_max_retries, 3168 fetch_timeout_seconds=fetch_timeout_seconds, 3169 ) 3170 except Exception as e: 3171 if fallback: 3172 langfuse_logger.warning( 3173 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3174 ) 3175 3176 fallback_client_args: Dict[str, Any] = { 3177 "name": name, 3178 "prompt": fallback, 3179 "type": type, 3180 "version": version or 0, 3181 "config": {}, 3182 "labels": [label] if label else [], 3183 "tags": [], 3184 } 3185 3186 if type == "text": 3187 return TextPromptClient( 3188 prompt=Prompt_Text(**fallback_client_args), 3189 is_fallback=True, 3190 ) 3191 3192 if type == "chat": 3193 return ChatPromptClient( 3194 prompt=Prompt_Chat(**fallback_client_args), 3195 is_fallback=True, 3196 ) 3197 3198 raise e 3199 3200 if cached_prompt.is_expired(): 3201 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3202 try: 3203 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3204 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3205 3206 def refresh_task() -> None: 3207 self._fetch_prompt_and_update_cache( 3208 name, 3209 version=version, 3210 label=label, 3211 ttl_seconds=cache_ttl_seconds, 3212 max_retries=bounded_max_retries, 3213 fetch_timeout_seconds=fetch_timeout_seconds, 3214 ) 3215 3216 self._resources.prompt_cache.add_refresh_prompt_task( 3217 cache_key, 3218 refresh_task, 3219 ) 3220 langfuse_logger.debug( 3221 f"Returning stale prompt '{cache_key}' from cache." 3222 ) 3223 # return stale prompt 3224 return cached_prompt.value 3225 3226 except Exception as e: 3227 langfuse_logger.warning( 3228 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3229 ) 3230 # creation of refresh prompt task failed, return stale prompt 3231 return cached_prompt.value 3232 3233 return cached_prompt.value
Get a prompt.
This method attempts to fetch the requested prompt from the local cache. If the prompt is not found in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will return the expired prompt as a fallback.
Arguments:
- name (str): The name of the prompt to retrieve.
Keyword Args:
version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the
production
label is returned. Specify either version or label, not both. label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, theproduction
label is returned. Specify either version or label, not both. cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default.
Returns:
The prompt object retrieved from the cache or directly fetched if not cached or expired of type
- TextPromptClient, if type argument is 'text'.
- ChatPromptClient, if type argument is 'chat'.
Raises:
- Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
- expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
3327 def create_prompt( 3328 self, 3329 *, 3330 name: str, 3331 prompt: Union[ 3332 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3333 ], 3334 labels: List[str] = [], 3335 tags: Optional[List[str]] = None, 3336 type: Optional[Literal["chat", "text"]] = "text", 3337 config: Optional[Any] = None, 3338 commit_message: Optional[str] = None, 3339 ) -> PromptClient: 3340 """Create a new prompt in Langfuse. 3341 3342 Keyword Args: 3343 name : The name of the prompt to be created. 3344 prompt : The content of the prompt to be created. 3345 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3346 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3347 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3348 config: Additional structured data to be saved with the prompt. Defaults to None. 3349 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3350 commit_message: Optional string describing the change. 3351 3352 Returns: 3353 TextPromptClient: The prompt if type argument is 'text'. 3354 ChatPromptClient: The prompt if type argument is 'chat'. 3355 """ 3356 try: 3357 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3358 3359 if type == "chat": 3360 if not isinstance(prompt, list): 3361 raise ValueError( 3362 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3363 ) 3364 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3365 CreatePromptRequest_Chat( 3366 name=name, 3367 prompt=cast(Any, prompt), 3368 labels=labels, 3369 tags=tags, 3370 config=config or {}, 3371 commitMessage=commit_message, 3372 type="chat", 3373 ) 3374 ) 3375 server_prompt = self.api.prompts.create(request=request) 3376 3377 if self._resources is not None: 3378 self._resources.prompt_cache.invalidate(name) 3379 3380 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3381 3382 if not isinstance(prompt, str): 3383 raise ValueError("For 'text' type, 'prompt' must be a string.") 3384 3385 request = CreatePromptRequest_Text( 3386 name=name, 3387 prompt=prompt, 3388 labels=labels, 3389 tags=tags, 3390 config=config or {}, 3391 commitMessage=commit_message, 3392 type="text", 3393 ) 3394 3395 server_prompt = self.api.prompts.create(request=request) 3396 3397 if self._resources is not None: 3398 self._resources.prompt_cache.invalidate(name) 3399 3400 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3401 3402 except Error as e: 3403 handle_fern_exception(e) 3404 raise e
Create a new prompt in Langfuse.
Keyword Args:
name : The name of the prompt to be created. prompt : The content of the prompt to be created. is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. config: Additional structured data to be saved with the prompt. Defaults to None. type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". commit_message: Optional string describing the change.
Returns:
TextPromptClient: The prompt if type argument is 'text'. ChatPromptClient: The prompt if type argument is 'chat'.
3406 def update_prompt( 3407 self, 3408 *, 3409 name: str, 3410 version: int, 3411 new_labels: List[str] = [], 3412 ) -> Any: 3413 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3414 3415 Args: 3416 name (str): The name of the prompt to update. 3417 version (int): The version number of the prompt to update. 3418 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3419 3420 Returns: 3421 Prompt: The updated prompt from the Langfuse API. 3422 3423 """ 3424 updated_prompt = self.api.prompt_version.update( 3425 name=self._url_encode(name), 3426 version=version, 3427 new_labels=new_labels, 3428 ) 3429 3430 if self._resources is not None: 3431 self._resources.prompt_cache.invalidate(name) 3432 3433 return updated_prompt
Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name.
Arguments:
- name (str): The name of the prompt to update.
- version (int): The version number of the prompt to update.
- new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to [].
Returns:
Prompt: The updated prompt from the Langfuse API.
3448 def clear_prompt_cache(self) -> None: 3449 """Clear the entire prompt cache, removing all cached prompts. 3450 3451 This method is useful when you want to force a complete refresh of all 3452 cached prompts, for example after major updates or when you need to 3453 ensure the latest versions are fetched from the server. 3454 """ 3455 if self._resources is not None: 3456 self._resources.prompt_cache.clear()
Clear the entire prompt cache, removing all cached prompts.
This method is useful when you want to force a complete refresh of all cached prompts, for example after major updates or when you need to ensure the latest versions are fetched from the server.
59def get_client(*, public_key: Optional[str] = None) -> Langfuse: 60 """Get or create a Langfuse client instance. 61 62 Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, 63 providing a public_key is required. Multi-project support is experimental - see Langfuse docs. 64 65 Behavior: 66 - Single project: Returns existing client or creates new one 67 - Multi-project: Requires public_key to return specific client 68 - No public_key in multi-project: Returns disabled client to prevent data leakage 69 70 The function uses a singleton pattern per public_key to conserve resources and maintain state. 71 72 Args: 73 public_key (Optional[str]): Project identifier 74 - With key: Returns client for that project 75 - Without key: Returns single client or disabled client if multiple exist 76 77 Returns: 78 Langfuse: Client instance in one of three states: 79 1. Client for specified public_key 80 2. Default client for single-project setup 81 3. Disabled client when multiple projects exist without key 82 83 Security: 84 Disables tracing when multiple projects exist without explicit key to prevent 85 cross-project data leakage. Multi-project setups are experimental. 86 87 Example: 88 ```python 89 # Single project 90 client = get_client() # Default client 91 92 # In multi-project usage: 93 client_a = get_client(public_key="project_a_key") # Returns project A's client 94 client_b = get_client(public_key="project_b_key") # Returns project B's client 95 96 # Without specific key in multi-project setup: 97 client = get_client() # Returns disabled client for safety 98 ``` 99 """ 100 with LangfuseResourceManager._lock: 101 active_instances = LangfuseResourceManager._instances 102 103 # If no explicit public_key provided, check execution context 104 if not public_key: 105 public_key = _current_public_key.get(None) 106 107 if not public_key: 108 if len(active_instances) == 0: 109 # No clients initialized yet, create default instance 110 return Langfuse() 111 112 if len(active_instances) == 1: 113 # Only one client exists, safe to use without specifying key 114 instance = list(active_instances.values())[0] 115 116 # Initialize with the credentials bound to the instance 117 # This is important if the original instance was instantiated 118 # via constructor arguments 119 return _create_client_from_instance(instance) 120 121 else: 122 # Multiple clients exist but no key specified - disable tracing 123 # to prevent cross-project data leakage 124 langfuse_logger.warning( 125 "No 'langfuse_public_key' passed to decorated function, but multiple langfuse clients are instantiated in current process. Skipping tracing for this function to avoid cross-project leakage." 126 ) 127 return Langfuse( 128 tracing_enabled=False, public_key="fake", secret_key="fake" 129 ) 130 131 else: 132 # Specific key provided, look up existing instance 133 target_instance: Optional[LangfuseResourceManager] = active_instances.get( 134 public_key, None 135 ) 136 137 if target_instance is None: 138 # No instance found with this key - client not initialized properly 139 langfuse_logger.warning( 140 f"No Langfuse client with public key {public_key} has been initialized. Skipping tracing for decorated function." 141 ) 142 return Langfuse( 143 tracing_enabled=False, public_key="fake", secret_key="fake" 144 ) 145 146 # target_instance is guaranteed to be not None at this point 147 return _create_client_from_instance(target_instance, public_key)
Get or create a Langfuse client instance.
Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, providing a public_key is required. Multi-project support is experimental - see Langfuse docs.
Behavior:
- Single project: Returns existing client or creates new one
- Multi-project: Requires public_key to return specific client
- No public_key in multi-project: Returns disabled client to prevent data leakage
The function uses a singleton pattern per public_key to conserve resources and maintain state.
Arguments:
- public_key (Optional[str]): Project identifier
- With key: Returns client for that project
- Without key: Returns single client or disabled client if multiple exist
Returns:
Langfuse: Client instance in one of three states: 1. Client for specified public_key 2. Default client for single-project setup 3. Disabled client when multiple projects exist without key
Security:
Disables tracing when multiple projects exist without explicit key to prevent cross-project data leakage. Multi-project setups are experimental.
Example:
# Single project client = get_client() # Default client # In multi-project usage: client_a = get_client(public_key="project_a_key") # Returns project A's client client_b = get_client(public_key="project_b_key") # Returns project B's client # Without specific key in multi-project setup: client = get_client() # Returns disabled client for safety
90 def observe( 91 self, 92 func: Optional[F] = None, 93 *, 94 name: Optional[str] = None, 95 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 96 capture_input: Optional[bool] = None, 97 capture_output: Optional[bool] = None, 98 transform_to_string: Optional[Callable[[Iterable], str]] = None, 99 ) -> Union[F, Callable[[F], F]]: 100 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 101 102 This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates 103 spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator 104 intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints. 105 106 Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, 107 enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details. 108 109 Args: 110 func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None. 111 name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used. 112 as_type (Optional[Literal]): Set the observation type. Supported values: 113 "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". 114 Observation types are highlighted in the Langfuse UI for filtering and visualization. 115 The types "generation" and "embedding" create a span on which additional attributes such as model metrics 116 can be set. 117 118 Returns: 119 Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans. 120 121 Example: 122 For general function tracing with automatic naming: 123 ```python 124 @observe() 125 def process_user_request(user_id, query): 126 # Function is automatically traced with name "process_user_request" 127 return get_response(query) 128 ``` 129 130 For language model generation tracking: 131 ```python 132 @observe(name="answer-generation", as_type="generation") 133 async def generate_answer(query): 134 # Creates a generation-type span with extended LLM metrics 135 response = await openai.chat.completions.create( 136 model="gpt-4", 137 messages=[{"role": "user", "content": query}] 138 ) 139 return response.choices[0].message.content 140 ``` 141 142 For trace context propagation between functions: 143 ```python 144 @observe() 145 def main_process(): 146 # Parent span is created 147 return sub_process() # Child span automatically connected to parent 148 149 @observe() 150 def sub_process(): 151 # Automatically becomes a child span of main_process 152 return "result" 153 ``` 154 155 Raises: 156 Exception: Propagates any exceptions from the wrapped function after logging them in the trace. 157 158 Notes: 159 - The decorator preserves the original function's signature, docstring, and return type. 160 - Proper parent-child relationships between spans are automatically maintained. 161 - Special keyword arguments can be passed to control tracing: 162 - langfuse_trace_id: Explicitly set the trace ID for this function call 163 - langfuse_parent_observation_id: Explicitly set the parent span ID 164 - langfuse_public_key: Use a specific Langfuse project (when multiple clients exist) 165 - For async functions, the decorator returns an async function wrapper. 166 - For sync functions, the decorator returns a synchronous wrapper. 167 """ 168 valid_types = set(get_observation_types_list(ObservationTypeLiteralNoEvent)) 169 if as_type is not None and as_type not in valid_types: 170 self._log.warning( 171 f"Invalid as_type '{as_type}'. Valid types are: {', '.join(sorted(valid_types))}. Defaulting to 'span'." 172 ) 173 as_type = "span" 174 175 function_io_capture_enabled = os.environ.get( 176 LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, "True" 177 ).lower() not in ("false", "0") 178 179 should_capture_input = ( 180 capture_input if capture_input is not None else function_io_capture_enabled 181 ) 182 183 should_capture_output = ( 184 capture_output 185 if capture_output is not None 186 else function_io_capture_enabled 187 ) 188 189 def decorator(func: F) -> F: 190 return ( 191 self._async_observe( 192 func, 193 name=name, 194 as_type=as_type, 195 capture_input=should_capture_input, 196 capture_output=should_capture_output, 197 transform_to_string=transform_to_string, 198 ) 199 if asyncio.iscoroutinefunction(func) 200 else self._sync_observe( 201 func, 202 name=name, 203 as_type=as_type, 204 capture_input=should_capture_input, 205 capture_output=should_capture_output, 206 transform_to_string=transform_to_string, 207 ) 208 ) 209 210 """Handle decorator with or without parentheses. 211 212 This logic enables the decorator to work both with and without parentheses: 213 - @observe - Python passes the function directly to the decorator 214 - @observe() - Python calls the decorator first, which must return a function decorator 215 216 When called without arguments (@observe), the func parameter contains the function to decorate, 217 so we directly apply the decorator to it. When called with parentheses (@observe()), 218 func is None, so we return the decorator function itself for Python to apply in the next step. 219 """ 220 if func is None: 221 return decorator 222 else: 223 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints.
Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details.
Arguments:
- func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None.
- name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used.
- as_type (Optional[Literal]): Set the observation type. Supported values: "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". Observation types are highlighted in the Langfuse UI for filtering and visualization. The types "generation" and "embedding" create a span on which additional attributes such as model metrics can be set.
Returns:
Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans.
Example:
For general function tracing with automatic naming:
@observe() def process_user_request(user_id, query): # Function is automatically traced with name "process_user_request" return get_response(query)
For language model generation tracking:
@observe(name="answer-generation", as_type="generation") async def generate_answer(query): # Creates a generation-type span with extended LLM metrics response = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}] ) return response.choices[0].message.content
For trace context propagation between functions:
@observe() def main_process(): # Parent span is created return sub_process() # Child span automatically connected to parent @observe() def sub_process(): # Automatically becomes a child span of main_process return "result"
Raises:
- Exception: Propagates any exceptions from the wrapped function after logging them in the trace.
Notes:
- The decorator preserves the original function's signature, docstring, and return type.
- Proper parent-child relationships between spans are automatically maintained.
- Special keyword arguments can be passed to control tracing:
- langfuse_trace_id: Explicitly set the trace ID for this function call
- langfuse_parent_observation_id: Explicitly set the parent span ID
- langfuse_public_key: Use a specific Langfuse project (when multiple clients exist)
- For async functions, the decorator returns an async function wrapper.
- For sync functions, the decorator returns a synchronous wrapper.
1145class LangfuseSpan(LangfuseObservationWrapper): 1146 """Standard span implementation for general operations in Langfuse. 1147 1148 This class represents a general-purpose span that can be used to trace 1149 any operation in your application. It extends the base LangfuseObservationWrapper 1150 with specific methods for creating child spans, generations, and updating 1151 span-specific attributes. If possible, use a more specific type for 1152 better observability and insights. 1153 """ 1154 1155 def __init__( 1156 self, 1157 *, 1158 otel_span: otel_trace_api.Span, 1159 langfuse_client: "Langfuse", 1160 input: Optional[Any] = None, 1161 output: Optional[Any] = None, 1162 metadata: Optional[Any] = None, 1163 environment: Optional[str] = None, 1164 version: Optional[str] = None, 1165 level: Optional[SpanLevel] = None, 1166 status_message: Optional[str] = None, 1167 ): 1168 """Initialize a new LangfuseSpan. 1169 1170 Args: 1171 otel_span: The OpenTelemetry span to wrap 1172 langfuse_client: Reference to the parent Langfuse client 1173 input: Input data for the span (any JSON-serializable object) 1174 output: Output data from the span (any JSON-serializable object) 1175 metadata: Additional metadata to associate with the span 1176 environment: The tracing environment 1177 version: Version identifier for the code or component 1178 level: Importance level of the span (info, warning, error) 1179 status_message: Optional status message for the span 1180 """ 1181 super().__init__( 1182 otel_span=otel_span, 1183 as_type="span", 1184 langfuse_client=langfuse_client, 1185 input=input, 1186 output=output, 1187 metadata=metadata, 1188 environment=environment, 1189 version=version, 1190 level=level, 1191 status_message=status_message, 1192 ) 1193 1194 def start_span( 1195 self, 1196 name: str, 1197 input: Optional[Any] = None, 1198 output: Optional[Any] = None, 1199 metadata: Optional[Any] = None, 1200 version: Optional[str] = None, 1201 level: Optional[SpanLevel] = None, 1202 status_message: Optional[str] = None, 1203 ) -> "LangfuseSpan": 1204 """Create a new child span. 1205 1206 This method creates a new child span with this span as the parent. 1207 Unlike start_as_current_span(), this method does not set the new span 1208 as the current span in the context. 1209 1210 Args: 1211 name: Name of the span (e.g., function or operation name) 1212 input: Input data for the operation 1213 output: Output data from the operation 1214 metadata: Additional metadata to associate with the span 1215 version: Version identifier for the code or component 1216 level: Importance level of the span (info, warning, error) 1217 status_message: Optional status message for the span 1218 1219 Returns: 1220 A new LangfuseSpan that must be ended with .end() when complete 1221 1222 Example: 1223 ```python 1224 parent_span = langfuse.start_span(name="process-request") 1225 try: 1226 # Create a child span 1227 child_span = parent_span.start_span(name="validate-input") 1228 try: 1229 # Do validation work 1230 validation_result = validate(request_data) 1231 child_span.update(output=validation_result) 1232 finally: 1233 child_span.end() 1234 1235 # Continue with parent span 1236 result = process_validated_data(validation_result) 1237 parent_span.update(output=result) 1238 finally: 1239 parent_span.end() 1240 ``` 1241 """ 1242 return self.start_observation( 1243 name=name, 1244 as_type="span", 1245 input=input, 1246 output=output, 1247 metadata=metadata, 1248 version=version, 1249 level=level, 1250 status_message=status_message, 1251 ) 1252 1253 def start_as_current_span( 1254 self, 1255 *, 1256 name: str, 1257 input: Optional[Any] = None, 1258 output: Optional[Any] = None, 1259 metadata: Optional[Any] = None, 1260 version: Optional[str] = None, 1261 level: Optional[SpanLevel] = None, 1262 status_message: Optional[str] = None, 1263 ) -> _AgnosticContextManager["LangfuseSpan"]: 1264 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1265 1266 DEPRECATED: This method is deprecated and will be removed in a future version. 1267 Use start_as_current_observation(as_type='span') instead. 1268 1269 This method creates a new child span and sets it as the current span within 1270 a context manager. It should be used with a 'with' statement to automatically 1271 manage the span's lifecycle. 1272 1273 Args: 1274 name: Name of the span (e.g., function or operation name) 1275 input: Input data for the operation 1276 output: Output data from the operation 1277 metadata: Additional metadata to associate with the span 1278 version: Version identifier for the code or component 1279 level: Importance level of the span (info, warning, error) 1280 status_message: Optional status message for the span 1281 1282 Returns: 1283 A context manager that yields a new LangfuseSpan 1284 1285 Example: 1286 ```python 1287 with langfuse.start_as_current_span(name="process-request") as parent_span: 1288 # Parent span is active here 1289 1290 # Create a child span with context management 1291 with parent_span.start_as_current_span(name="validate-input") as child_span: 1292 # Child span is active here 1293 validation_result = validate(request_data) 1294 child_span.update(output=validation_result) 1295 1296 # Back to parent span context 1297 result = process_validated_data(validation_result) 1298 parent_span.update(output=result) 1299 ``` 1300 """ 1301 warnings.warn( 1302 "start_as_current_span is deprecated and will be removed in a future version. " 1303 "Use start_as_current_observation(as_type='span') instead.", 1304 DeprecationWarning, 1305 stacklevel=2, 1306 ) 1307 return self.start_as_current_observation( 1308 name=name, 1309 as_type="span", 1310 input=input, 1311 output=output, 1312 metadata=metadata, 1313 version=version, 1314 level=level, 1315 status_message=status_message, 1316 ) 1317 1318 def start_generation( 1319 self, 1320 *, 1321 name: str, 1322 input: Optional[Any] = None, 1323 output: Optional[Any] = None, 1324 metadata: Optional[Any] = None, 1325 version: Optional[str] = None, 1326 level: Optional[SpanLevel] = None, 1327 status_message: Optional[str] = None, 1328 completion_start_time: Optional[datetime] = None, 1329 model: Optional[str] = None, 1330 model_parameters: Optional[Dict[str, MapValue]] = None, 1331 usage_details: Optional[Dict[str, int]] = None, 1332 cost_details: Optional[Dict[str, float]] = None, 1333 prompt: Optional[PromptClient] = None, 1334 ) -> "LangfuseGeneration": 1335 """[DEPRECATED] Create a new child generation span. 1336 1337 DEPRECATED: This method is deprecated and will be removed in a future version. 1338 Use start_observation(as_type='generation') instead. 1339 1340 This method creates a new child generation span with this span as the parent. 1341 Generation spans are specialized for AI/LLM operations and include additional 1342 fields for model information, usage stats, and costs. 1343 1344 Unlike start_as_current_generation(), this method does not set the new span 1345 as the current span in the context. 1346 1347 Args: 1348 name: Name of the generation operation 1349 input: Input data for the model (e.g., prompts) 1350 output: Output from the model (e.g., completions) 1351 metadata: Additional metadata to associate with the generation 1352 version: Version identifier for the model or component 1353 level: Importance level of the generation (info, warning, error) 1354 status_message: Optional status message for the generation 1355 completion_start_time: When the model started generating the response 1356 model: Name/identifier of the AI model used (e.g., "gpt-4") 1357 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1358 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1359 cost_details: Cost information for the model call 1360 prompt: Associated prompt template from Langfuse prompt management 1361 1362 Returns: 1363 A new LangfuseGeneration that must be ended with .end() when complete 1364 1365 Example: 1366 ```python 1367 span = langfuse.start_span(name="process-query") 1368 try: 1369 # Create a generation child span 1370 generation = span.start_generation( 1371 name="generate-answer", 1372 model="gpt-4", 1373 input={"prompt": "Explain quantum computing"} 1374 ) 1375 try: 1376 # Call model API 1377 response = llm.generate(...) 1378 1379 generation.update( 1380 output=response.text, 1381 usage_details={ 1382 "prompt_tokens": response.usage.prompt_tokens, 1383 "completion_tokens": response.usage.completion_tokens 1384 } 1385 ) 1386 finally: 1387 generation.end() 1388 1389 # Continue with parent span 1390 span.update(output={"answer": response.text, "source": "gpt-4"}) 1391 finally: 1392 span.end() 1393 ``` 1394 """ 1395 warnings.warn( 1396 "start_generation is deprecated and will be removed in a future version. " 1397 "Use start_observation(as_type='generation') instead.", 1398 DeprecationWarning, 1399 stacklevel=2, 1400 ) 1401 return self.start_observation( 1402 name=name, 1403 as_type="generation", 1404 input=input, 1405 output=output, 1406 metadata=metadata, 1407 version=version, 1408 level=level, 1409 status_message=status_message, 1410 completion_start_time=completion_start_time, 1411 model=model, 1412 model_parameters=model_parameters, 1413 usage_details=usage_details, 1414 cost_details=cost_details, 1415 prompt=prompt, 1416 ) 1417 1418 def start_as_current_generation( 1419 self, 1420 *, 1421 name: str, 1422 input: Optional[Any] = None, 1423 output: Optional[Any] = None, 1424 metadata: Optional[Any] = None, 1425 version: Optional[str] = None, 1426 level: Optional[SpanLevel] = None, 1427 status_message: Optional[str] = None, 1428 completion_start_time: Optional[datetime] = None, 1429 model: Optional[str] = None, 1430 model_parameters: Optional[Dict[str, MapValue]] = None, 1431 usage_details: Optional[Dict[str, int]] = None, 1432 cost_details: Optional[Dict[str, float]] = None, 1433 prompt: Optional[PromptClient] = None, 1434 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1435 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1436 1437 DEPRECATED: This method is deprecated and will be removed in a future version. 1438 Use start_as_current_observation(as_type='generation') instead. 1439 1440 This method creates a new child generation span and sets it as the current span 1441 within a context manager. Generation spans are specialized for AI/LLM operations 1442 and include additional fields for model information, usage stats, and costs. 1443 1444 Args: 1445 name: Name of the generation operation 1446 input: Input data for the model (e.g., prompts) 1447 output: Output from the model (e.g., completions) 1448 metadata: Additional metadata to associate with the generation 1449 version: Version identifier for the model or component 1450 level: Importance level of the generation (info, warning, error) 1451 status_message: Optional status message for the generation 1452 completion_start_time: When the model started generating the response 1453 model: Name/identifier of the AI model used (e.g., "gpt-4") 1454 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1455 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1456 cost_details: Cost information for the model call 1457 prompt: Associated prompt template from Langfuse prompt management 1458 1459 Returns: 1460 A context manager that yields a new LangfuseGeneration 1461 1462 Example: 1463 ```python 1464 with langfuse.start_as_current_span(name="process-request") as span: 1465 # Prepare data 1466 query = preprocess_user_query(user_input) 1467 1468 # Create a generation span with context management 1469 with span.start_as_current_generation( 1470 name="generate-answer", 1471 model="gpt-4", 1472 input={"query": query} 1473 ) as generation: 1474 # Generation span is active here 1475 response = llm.generate(query) 1476 1477 # Update with results 1478 generation.update( 1479 output=response.text, 1480 usage_details={ 1481 "prompt_tokens": response.usage.prompt_tokens, 1482 "completion_tokens": response.usage.completion_tokens 1483 } 1484 ) 1485 1486 # Back to parent span context 1487 span.update(output={"answer": response.text, "source": "gpt-4"}) 1488 ``` 1489 """ 1490 warnings.warn( 1491 "start_as_current_generation is deprecated and will be removed in a future version. " 1492 "Use start_as_current_observation(as_type='generation') instead.", 1493 DeprecationWarning, 1494 stacklevel=2, 1495 ) 1496 return self.start_as_current_observation( 1497 name=name, 1498 as_type="generation", 1499 input=input, 1500 output=output, 1501 metadata=metadata, 1502 version=version, 1503 level=level, 1504 status_message=status_message, 1505 completion_start_time=completion_start_time, 1506 model=model, 1507 model_parameters=model_parameters, 1508 usage_details=usage_details, 1509 cost_details=cost_details, 1510 prompt=prompt, 1511 ) 1512 1513 def create_event( 1514 self, 1515 *, 1516 name: str, 1517 input: Optional[Any] = None, 1518 output: Optional[Any] = None, 1519 metadata: Optional[Any] = None, 1520 version: Optional[str] = None, 1521 level: Optional[SpanLevel] = None, 1522 status_message: Optional[str] = None, 1523 ) -> "LangfuseEvent": 1524 """Create a new Langfuse observation of type 'EVENT'. 1525 1526 Args: 1527 name: Name of the span (e.g., function or operation name) 1528 input: Input data for the operation (can be any JSON-serializable object) 1529 output: Output data from the operation (can be any JSON-serializable object) 1530 metadata: Additional metadata to associate with the span 1531 version: Version identifier for the code or component 1532 level: Importance level of the span (info, warning, error) 1533 status_message: Optional status message for the span 1534 1535 Returns: 1536 The LangfuseEvent object 1537 1538 Example: 1539 ```python 1540 event = langfuse.create_event(name="process-event") 1541 ``` 1542 """ 1543 timestamp = time_ns() 1544 1545 with otel_trace_api.use_span(self._otel_span): 1546 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1547 name=name, start_time=timestamp 1548 ) 1549 1550 return cast( 1551 "LangfuseEvent", 1552 LangfuseEvent( 1553 otel_span=new_otel_span, 1554 langfuse_client=self._langfuse_client, 1555 input=input, 1556 output=output, 1557 metadata=metadata, 1558 environment=self._environment, 1559 version=version, 1560 level=level, 1561 status_message=status_message, 1562 ).end(end_time=timestamp), 1563 )
Standard span implementation for general operations in Langfuse.
This class represents a general-purpose span that can be used to trace any operation in your application. It extends the base LangfuseObservationWrapper with specific methods for creating child spans, generations, and updating span-specific attributes. If possible, use a more specific type for better observability and insights.
1155 def __init__( 1156 self, 1157 *, 1158 otel_span: otel_trace_api.Span, 1159 langfuse_client: "Langfuse", 1160 input: Optional[Any] = None, 1161 output: Optional[Any] = None, 1162 metadata: Optional[Any] = None, 1163 environment: Optional[str] = None, 1164 version: Optional[str] = None, 1165 level: Optional[SpanLevel] = None, 1166 status_message: Optional[str] = None, 1167 ): 1168 """Initialize a new LangfuseSpan. 1169 1170 Args: 1171 otel_span: The OpenTelemetry span to wrap 1172 langfuse_client: Reference to the parent Langfuse client 1173 input: Input data for the span (any JSON-serializable object) 1174 output: Output data from the span (any JSON-serializable object) 1175 metadata: Additional metadata to associate with the span 1176 environment: The tracing environment 1177 version: Version identifier for the code or component 1178 level: Importance level of the span (info, warning, error) 1179 status_message: Optional status message for the span 1180 """ 1181 super().__init__( 1182 otel_span=otel_span, 1183 as_type="span", 1184 langfuse_client=langfuse_client, 1185 input=input, 1186 output=output, 1187 metadata=metadata, 1188 environment=environment, 1189 version=version, 1190 level=level, 1191 status_message=status_message, 1192 )
Initialize a new LangfuseSpan.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the span (any JSON-serializable object)
- output: Output data from the span (any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- environment: The tracing environment
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
1194 def start_span( 1195 self, 1196 name: str, 1197 input: Optional[Any] = None, 1198 output: Optional[Any] = None, 1199 metadata: Optional[Any] = None, 1200 version: Optional[str] = None, 1201 level: Optional[SpanLevel] = None, 1202 status_message: Optional[str] = None, 1203 ) -> "LangfuseSpan": 1204 """Create a new child span. 1205 1206 This method creates a new child span with this span as the parent. 1207 Unlike start_as_current_span(), this method does not set the new span 1208 as the current span in the context. 1209 1210 Args: 1211 name: Name of the span (e.g., function or operation name) 1212 input: Input data for the operation 1213 output: Output data from the operation 1214 metadata: Additional metadata to associate with the span 1215 version: Version identifier for the code or component 1216 level: Importance level of the span (info, warning, error) 1217 status_message: Optional status message for the span 1218 1219 Returns: 1220 A new LangfuseSpan that must be ended with .end() when complete 1221 1222 Example: 1223 ```python 1224 parent_span = langfuse.start_span(name="process-request") 1225 try: 1226 # Create a child span 1227 child_span = parent_span.start_span(name="validate-input") 1228 try: 1229 # Do validation work 1230 validation_result = validate(request_data) 1231 child_span.update(output=validation_result) 1232 finally: 1233 child_span.end() 1234 1235 # Continue with parent span 1236 result = process_validated_data(validation_result) 1237 parent_span.update(output=result) 1238 finally: 1239 parent_span.end() 1240 ``` 1241 """ 1242 return self.start_observation( 1243 name=name, 1244 as_type="span", 1245 input=input, 1246 output=output, 1247 metadata=metadata, 1248 version=version, 1249 level=level, 1250 status_message=status_message, 1251 )
Create a new child span.
This method creates a new child span with this span as the parent. Unlike start_as_current_span(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A new LangfuseSpan that must be ended with .end() when complete
Example:
parent_span = langfuse.start_span(name="process-request") try: # Create a child span child_span = parent_span.start_span(name="validate-input") try: # Do validation work validation_result = validate(request_data) child_span.update(output=validation_result) finally: child_span.end() # Continue with parent span result = process_validated_data(validation_result) parent_span.update(output=result) finally: parent_span.end()
1253 def start_as_current_span( 1254 self, 1255 *, 1256 name: str, 1257 input: Optional[Any] = None, 1258 output: Optional[Any] = None, 1259 metadata: Optional[Any] = None, 1260 version: Optional[str] = None, 1261 level: Optional[SpanLevel] = None, 1262 status_message: Optional[str] = None, 1263 ) -> _AgnosticContextManager["LangfuseSpan"]: 1264 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1265 1266 DEPRECATED: This method is deprecated and will be removed in a future version. 1267 Use start_as_current_observation(as_type='span') instead. 1268 1269 This method creates a new child span and sets it as the current span within 1270 a context manager. It should be used with a 'with' statement to automatically 1271 manage the span's lifecycle. 1272 1273 Args: 1274 name: Name of the span (e.g., function or operation name) 1275 input: Input data for the operation 1276 output: Output data from the operation 1277 metadata: Additional metadata to associate with the span 1278 version: Version identifier for the code or component 1279 level: Importance level of the span (info, warning, error) 1280 status_message: Optional status message for the span 1281 1282 Returns: 1283 A context manager that yields a new LangfuseSpan 1284 1285 Example: 1286 ```python 1287 with langfuse.start_as_current_span(name="process-request") as parent_span: 1288 # Parent span is active here 1289 1290 # Create a child span with context management 1291 with parent_span.start_as_current_span(name="validate-input") as child_span: 1292 # Child span is active here 1293 validation_result = validate(request_data) 1294 child_span.update(output=validation_result) 1295 1296 # Back to parent span context 1297 result = process_validated_data(validation_result) 1298 parent_span.update(output=result) 1299 ``` 1300 """ 1301 warnings.warn( 1302 "start_as_current_span is deprecated and will be removed in a future version. " 1303 "Use start_as_current_observation(as_type='span') instead.", 1304 DeprecationWarning, 1305 stacklevel=2, 1306 ) 1307 return self.start_as_current_observation( 1308 name=name, 1309 as_type="span", 1310 input=input, 1311 output=output, 1312 metadata=metadata, 1313 version=version, 1314 level=level, 1315 status_message=status_message, 1316 )
[DEPRECATED] Create a new child span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='span') instead.
This method creates a new child span and sets it as the current span within a context manager. It should be used with a 'with' statement to automatically manage the span's lifecycle.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A context manager that yields a new LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-request") as parent_span: # Parent span is active here # Create a child span with context management with parent_span.start_as_current_span(name="validate-input") as child_span: # Child span is active here validation_result = validate(request_data) child_span.update(output=validation_result) # Back to parent span context result = process_validated_data(validation_result) parent_span.update(output=result)
1318 def start_generation( 1319 self, 1320 *, 1321 name: str, 1322 input: Optional[Any] = None, 1323 output: Optional[Any] = None, 1324 metadata: Optional[Any] = None, 1325 version: Optional[str] = None, 1326 level: Optional[SpanLevel] = None, 1327 status_message: Optional[str] = None, 1328 completion_start_time: Optional[datetime] = None, 1329 model: Optional[str] = None, 1330 model_parameters: Optional[Dict[str, MapValue]] = None, 1331 usage_details: Optional[Dict[str, int]] = None, 1332 cost_details: Optional[Dict[str, float]] = None, 1333 prompt: Optional[PromptClient] = None, 1334 ) -> "LangfuseGeneration": 1335 """[DEPRECATED] Create a new child generation span. 1336 1337 DEPRECATED: This method is deprecated and will be removed in a future version. 1338 Use start_observation(as_type='generation') instead. 1339 1340 This method creates a new child generation span with this span as the parent. 1341 Generation spans are specialized for AI/LLM operations and include additional 1342 fields for model information, usage stats, and costs. 1343 1344 Unlike start_as_current_generation(), this method does not set the new span 1345 as the current span in the context. 1346 1347 Args: 1348 name: Name of the generation operation 1349 input: Input data for the model (e.g., prompts) 1350 output: Output from the model (e.g., completions) 1351 metadata: Additional metadata to associate with the generation 1352 version: Version identifier for the model or component 1353 level: Importance level of the generation (info, warning, error) 1354 status_message: Optional status message for the generation 1355 completion_start_time: When the model started generating the response 1356 model: Name/identifier of the AI model used (e.g., "gpt-4") 1357 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1358 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1359 cost_details: Cost information for the model call 1360 prompt: Associated prompt template from Langfuse prompt management 1361 1362 Returns: 1363 A new LangfuseGeneration that must be ended with .end() when complete 1364 1365 Example: 1366 ```python 1367 span = langfuse.start_span(name="process-query") 1368 try: 1369 # Create a generation child span 1370 generation = span.start_generation( 1371 name="generate-answer", 1372 model="gpt-4", 1373 input={"prompt": "Explain quantum computing"} 1374 ) 1375 try: 1376 # Call model API 1377 response = llm.generate(...) 1378 1379 generation.update( 1380 output=response.text, 1381 usage_details={ 1382 "prompt_tokens": response.usage.prompt_tokens, 1383 "completion_tokens": response.usage.completion_tokens 1384 } 1385 ) 1386 finally: 1387 generation.end() 1388 1389 # Continue with parent span 1390 span.update(output={"answer": response.text, "source": "gpt-4"}) 1391 finally: 1392 span.end() 1393 ``` 1394 """ 1395 warnings.warn( 1396 "start_generation is deprecated and will be removed in a future version. " 1397 "Use start_observation(as_type='generation') instead.", 1398 DeprecationWarning, 1399 stacklevel=2, 1400 ) 1401 return self.start_observation( 1402 name=name, 1403 as_type="generation", 1404 input=input, 1405 output=output, 1406 metadata=metadata, 1407 version=version, 1408 level=level, 1409 status_message=status_message, 1410 completion_start_time=completion_start_time, 1411 model=model, 1412 model_parameters=model_parameters, 1413 usage_details=usage_details, 1414 cost_details=cost_details, 1415 prompt=prompt, 1416 )
[DEPRECATED] Create a new child generation span.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a new child generation span with this span as the parent. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Unlike start_as_current_generation(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A new LangfuseGeneration that must be ended with .end() when complete
Example:
span = langfuse.start_span(name="process-query") try: # Create a generation child span generation = span.start_generation( name="generate-answer", model="gpt-4", input={"prompt": "Explain quantum computing"} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end() # Continue with parent span span.update(output={"answer": response.text, "source": "gpt-4"}) finally: span.end()
1418 def start_as_current_generation( 1419 self, 1420 *, 1421 name: str, 1422 input: Optional[Any] = None, 1423 output: Optional[Any] = None, 1424 metadata: Optional[Any] = None, 1425 version: Optional[str] = None, 1426 level: Optional[SpanLevel] = None, 1427 status_message: Optional[str] = None, 1428 completion_start_time: Optional[datetime] = None, 1429 model: Optional[str] = None, 1430 model_parameters: Optional[Dict[str, MapValue]] = None, 1431 usage_details: Optional[Dict[str, int]] = None, 1432 cost_details: Optional[Dict[str, float]] = None, 1433 prompt: Optional[PromptClient] = None, 1434 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1435 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1436 1437 DEPRECATED: This method is deprecated and will be removed in a future version. 1438 Use start_as_current_observation(as_type='generation') instead. 1439 1440 This method creates a new child generation span and sets it as the current span 1441 within a context manager. Generation spans are specialized for AI/LLM operations 1442 and include additional fields for model information, usage stats, and costs. 1443 1444 Args: 1445 name: Name of the generation operation 1446 input: Input data for the model (e.g., prompts) 1447 output: Output from the model (e.g., completions) 1448 metadata: Additional metadata to associate with the generation 1449 version: Version identifier for the model or component 1450 level: Importance level of the generation (info, warning, error) 1451 status_message: Optional status message for the generation 1452 completion_start_time: When the model started generating the response 1453 model: Name/identifier of the AI model used (e.g., "gpt-4") 1454 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1455 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1456 cost_details: Cost information for the model call 1457 prompt: Associated prompt template from Langfuse prompt management 1458 1459 Returns: 1460 A context manager that yields a new LangfuseGeneration 1461 1462 Example: 1463 ```python 1464 with langfuse.start_as_current_span(name="process-request") as span: 1465 # Prepare data 1466 query = preprocess_user_query(user_input) 1467 1468 # Create a generation span with context management 1469 with span.start_as_current_generation( 1470 name="generate-answer", 1471 model="gpt-4", 1472 input={"query": query} 1473 ) as generation: 1474 # Generation span is active here 1475 response = llm.generate(query) 1476 1477 # Update with results 1478 generation.update( 1479 output=response.text, 1480 usage_details={ 1481 "prompt_tokens": response.usage.prompt_tokens, 1482 "completion_tokens": response.usage.completion_tokens 1483 } 1484 ) 1485 1486 # Back to parent span context 1487 span.update(output={"answer": response.text, "source": "gpt-4"}) 1488 ``` 1489 """ 1490 warnings.warn( 1491 "start_as_current_generation is deprecated and will be removed in a future version. " 1492 "Use start_as_current_observation(as_type='generation') instead.", 1493 DeprecationWarning, 1494 stacklevel=2, 1495 ) 1496 return self.start_as_current_observation( 1497 name=name, 1498 as_type="generation", 1499 input=input, 1500 output=output, 1501 metadata=metadata, 1502 version=version, 1503 level=level, 1504 status_message=status_message, 1505 completion_start_time=completion_start_time, 1506 model=model, 1507 model_parameters=model_parameters, 1508 usage_details=usage_details, 1509 cost_details=cost_details, 1510 prompt=prompt, 1511 )
[DEPRECATED] Create a new child generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a new child generation span and sets it as the current span within a context manager. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields a new LangfuseGeneration
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Prepare data query = preprocess_user_query(user_input) # Create a generation span with context management with span.start_as_current_generation( name="generate-answer", model="gpt-4", input={"query": query} ) as generation: # Generation span is active here response = llm.generate(query) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) # Back to parent span context span.update(output={"answer": response.text, "source": "gpt-4"})
1513 def create_event( 1514 self, 1515 *, 1516 name: str, 1517 input: Optional[Any] = None, 1518 output: Optional[Any] = None, 1519 metadata: Optional[Any] = None, 1520 version: Optional[str] = None, 1521 level: Optional[SpanLevel] = None, 1522 status_message: Optional[str] = None, 1523 ) -> "LangfuseEvent": 1524 """Create a new Langfuse observation of type 'EVENT'. 1525 1526 Args: 1527 name: Name of the span (e.g., function or operation name) 1528 input: Input data for the operation (can be any JSON-serializable object) 1529 output: Output data from the operation (can be any JSON-serializable object) 1530 metadata: Additional metadata to associate with the span 1531 version: Version identifier for the code or component 1532 level: Importance level of the span (info, warning, error) 1533 status_message: Optional status message for the span 1534 1535 Returns: 1536 The LangfuseEvent object 1537 1538 Example: 1539 ```python 1540 event = langfuse.create_event(name="process-event") 1541 ``` 1542 """ 1543 timestamp = time_ns() 1544 1545 with otel_trace_api.use_span(self._otel_span): 1546 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1547 name=name, start_time=timestamp 1548 ) 1549 1550 return cast( 1551 "LangfuseEvent", 1552 LangfuseEvent( 1553 otel_span=new_otel_span, 1554 langfuse_client=self._langfuse_client, 1555 input=input, 1556 output=output, 1557 metadata=metadata, 1558 environment=self._environment, 1559 version=version, 1560 level=level, 1561 status_message=status_message, 1562 ).end(end_time=timestamp), 1563 )
Create a new Langfuse observation of type 'EVENT'.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The LangfuseEvent object
Example:
event = langfuse.create_event(name="process-event")
1566class LangfuseGeneration(LangfuseObservationWrapper): 1567 """Specialized span implementation for AI model generations in Langfuse. 1568 1569 This class represents a generation span specifically designed for tracking 1570 AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized 1571 attributes for model details, token usage, and costs. 1572 """ 1573 1574 def __init__( 1575 self, 1576 *, 1577 otel_span: otel_trace_api.Span, 1578 langfuse_client: "Langfuse", 1579 input: Optional[Any] = None, 1580 output: Optional[Any] = None, 1581 metadata: Optional[Any] = None, 1582 environment: Optional[str] = None, 1583 version: Optional[str] = None, 1584 level: Optional[SpanLevel] = None, 1585 status_message: Optional[str] = None, 1586 completion_start_time: Optional[datetime] = None, 1587 model: Optional[str] = None, 1588 model_parameters: Optional[Dict[str, MapValue]] = None, 1589 usage_details: Optional[Dict[str, int]] = None, 1590 cost_details: Optional[Dict[str, float]] = None, 1591 prompt: Optional[PromptClient] = None, 1592 ): 1593 """Initialize a new LangfuseGeneration span. 1594 1595 Args: 1596 otel_span: The OpenTelemetry span to wrap 1597 langfuse_client: Reference to the parent Langfuse client 1598 input: Input data for the generation (e.g., prompts) 1599 output: Output from the generation (e.g., completions) 1600 metadata: Additional metadata to associate with the generation 1601 environment: The tracing environment 1602 version: Version identifier for the model or component 1603 level: Importance level of the generation (info, warning, error) 1604 status_message: Optional status message for the generation 1605 completion_start_time: When the model started generating the response 1606 model: Name/identifier of the AI model used (e.g., "gpt-4") 1607 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1608 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1609 cost_details: Cost information for the model call 1610 prompt: Associated prompt template from Langfuse prompt management 1611 """ 1612 super().__init__( 1613 as_type="generation", 1614 otel_span=otel_span, 1615 langfuse_client=langfuse_client, 1616 input=input, 1617 output=output, 1618 metadata=metadata, 1619 environment=environment, 1620 version=version, 1621 level=level, 1622 status_message=status_message, 1623 completion_start_time=completion_start_time, 1624 model=model, 1625 model_parameters=model_parameters, 1626 usage_details=usage_details, 1627 cost_details=cost_details, 1628 prompt=prompt, 1629 )
Specialized span implementation for AI model generations in Langfuse.
This class represents a generation span specifically designed for tracking AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized attributes for model details, token usage, and costs.
1574 def __init__( 1575 self, 1576 *, 1577 otel_span: otel_trace_api.Span, 1578 langfuse_client: "Langfuse", 1579 input: Optional[Any] = None, 1580 output: Optional[Any] = None, 1581 metadata: Optional[Any] = None, 1582 environment: Optional[str] = None, 1583 version: Optional[str] = None, 1584 level: Optional[SpanLevel] = None, 1585 status_message: Optional[str] = None, 1586 completion_start_time: Optional[datetime] = None, 1587 model: Optional[str] = None, 1588 model_parameters: Optional[Dict[str, MapValue]] = None, 1589 usage_details: Optional[Dict[str, int]] = None, 1590 cost_details: Optional[Dict[str, float]] = None, 1591 prompt: Optional[PromptClient] = None, 1592 ): 1593 """Initialize a new LangfuseGeneration span. 1594 1595 Args: 1596 otel_span: The OpenTelemetry span to wrap 1597 langfuse_client: Reference to the parent Langfuse client 1598 input: Input data for the generation (e.g., prompts) 1599 output: Output from the generation (e.g., completions) 1600 metadata: Additional metadata to associate with the generation 1601 environment: The tracing environment 1602 version: Version identifier for the model or component 1603 level: Importance level of the generation (info, warning, error) 1604 status_message: Optional status message for the generation 1605 completion_start_time: When the model started generating the response 1606 model: Name/identifier of the AI model used (e.g., "gpt-4") 1607 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1608 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1609 cost_details: Cost information for the model call 1610 prompt: Associated prompt template from Langfuse prompt management 1611 """ 1612 super().__init__( 1613 as_type="generation", 1614 otel_span=otel_span, 1615 langfuse_client=langfuse_client, 1616 input=input, 1617 output=output, 1618 metadata=metadata, 1619 environment=environment, 1620 version=version, 1621 level=level, 1622 status_message=status_message, 1623 completion_start_time=completion_start_time, 1624 model=model, 1625 model_parameters=model_parameters, 1626 usage_details=usage_details, 1627 cost_details=cost_details, 1628 prompt=prompt, 1629 )
Initialize a new LangfuseGeneration span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the generation (e.g., prompts)
- output: Output from the generation (e.g., completions)
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
1632class LangfuseEvent(LangfuseObservationWrapper): 1633 """Specialized span implementation for Langfuse Events.""" 1634 1635 def __init__( 1636 self, 1637 *, 1638 otel_span: otel_trace_api.Span, 1639 langfuse_client: "Langfuse", 1640 input: Optional[Any] = None, 1641 output: Optional[Any] = None, 1642 metadata: Optional[Any] = None, 1643 environment: Optional[str] = None, 1644 version: Optional[str] = None, 1645 level: Optional[SpanLevel] = None, 1646 status_message: Optional[str] = None, 1647 ): 1648 """Initialize a new LangfuseEvent span. 1649 1650 Args: 1651 otel_span: The OpenTelemetry span to wrap 1652 langfuse_client: Reference to the parent Langfuse client 1653 input: Input data for the event 1654 output: Output from the event 1655 metadata: Additional metadata to associate with the generation 1656 environment: The tracing environment 1657 version: Version identifier for the model or component 1658 level: Importance level of the generation (info, warning, error) 1659 status_message: Optional status message for the generation 1660 """ 1661 super().__init__( 1662 otel_span=otel_span, 1663 as_type="event", 1664 langfuse_client=langfuse_client, 1665 input=input, 1666 output=output, 1667 metadata=metadata, 1668 environment=environment, 1669 version=version, 1670 level=level, 1671 status_message=status_message, 1672 ) 1673 1674 def update( 1675 self, 1676 *, 1677 name: Optional[str] = None, 1678 input: Optional[Any] = None, 1679 output: Optional[Any] = None, 1680 metadata: Optional[Any] = None, 1681 version: Optional[str] = None, 1682 level: Optional[SpanLevel] = None, 1683 status_message: Optional[str] = None, 1684 completion_start_time: Optional[datetime] = None, 1685 model: Optional[str] = None, 1686 model_parameters: Optional[Dict[str, MapValue]] = None, 1687 usage_details: Optional[Dict[str, int]] = None, 1688 cost_details: Optional[Dict[str, float]] = None, 1689 prompt: Optional[PromptClient] = None, 1690 **kwargs: Any, 1691 ) -> "LangfuseEvent": 1692 """Update is not allowed for LangfuseEvent because events cannot be updated. 1693 1694 This method logs a warning and returns self without making changes. 1695 1696 Returns: 1697 self: Returns the unchanged LangfuseEvent instance 1698 """ 1699 langfuse_logger.warning( 1700 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1701 ) 1702 return self
Specialized span implementation for Langfuse Events.
1635 def __init__( 1636 self, 1637 *, 1638 otel_span: otel_trace_api.Span, 1639 langfuse_client: "Langfuse", 1640 input: Optional[Any] = None, 1641 output: Optional[Any] = None, 1642 metadata: Optional[Any] = None, 1643 environment: Optional[str] = None, 1644 version: Optional[str] = None, 1645 level: Optional[SpanLevel] = None, 1646 status_message: Optional[str] = None, 1647 ): 1648 """Initialize a new LangfuseEvent span. 1649 1650 Args: 1651 otel_span: The OpenTelemetry span to wrap 1652 langfuse_client: Reference to the parent Langfuse client 1653 input: Input data for the event 1654 output: Output from the event 1655 metadata: Additional metadata to associate with the generation 1656 environment: The tracing environment 1657 version: Version identifier for the model or component 1658 level: Importance level of the generation (info, warning, error) 1659 status_message: Optional status message for the generation 1660 """ 1661 super().__init__( 1662 otel_span=otel_span, 1663 as_type="event", 1664 langfuse_client=langfuse_client, 1665 input=input, 1666 output=output, 1667 metadata=metadata, 1668 environment=environment, 1669 version=version, 1670 level=level, 1671 status_message=status_message, 1672 )
Initialize a new LangfuseEvent span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the event
- output: Output from the event
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
1674 def update( 1675 self, 1676 *, 1677 name: Optional[str] = None, 1678 input: Optional[Any] = None, 1679 output: Optional[Any] = None, 1680 metadata: Optional[Any] = None, 1681 version: Optional[str] = None, 1682 level: Optional[SpanLevel] = None, 1683 status_message: Optional[str] = None, 1684 completion_start_time: Optional[datetime] = None, 1685 model: Optional[str] = None, 1686 model_parameters: Optional[Dict[str, MapValue]] = None, 1687 usage_details: Optional[Dict[str, int]] = None, 1688 cost_details: Optional[Dict[str, float]] = None, 1689 prompt: Optional[PromptClient] = None, 1690 **kwargs: Any, 1691 ) -> "LangfuseEvent": 1692 """Update is not allowed for LangfuseEvent because events cannot be updated. 1693 1694 This method logs a warning and returns self without making changes. 1695 1696 Returns: 1697 self: Returns the unchanged LangfuseEvent instance 1698 """ 1699 langfuse_logger.warning( 1700 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1701 ) 1702 return self
Update is not allowed for LangfuseEvent because events cannot be updated.
This method logs a warning and returns self without making changes.
Returns:
self: Returns the unchanged LangfuseEvent instance
28class LangfuseOtelSpanAttributes: 29 # Langfuse-Trace attributes 30 TRACE_NAME = "langfuse.trace.name" 31 TRACE_USER_ID = "user.id" 32 TRACE_SESSION_ID = "session.id" 33 TRACE_TAGS = "langfuse.trace.tags" 34 TRACE_PUBLIC = "langfuse.trace.public" 35 TRACE_METADATA = "langfuse.trace.metadata" 36 TRACE_INPUT = "langfuse.trace.input" 37 TRACE_OUTPUT = "langfuse.trace.output" 38 39 # Langfuse-observation attributes 40 OBSERVATION_TYPE = "langfuse.observation.type" 41 OBSERVATION_METADATA = "langfuse.observation.metadata" 42 OBSERVATION_LEVEL = "langfuse.observation.level" 43 OBSERVATION_STATUS_MESSAGE = "langfuse.observation.status_message" 44 OBSERVATION_INPUT = "langfuse.observation.input" 45 OBSERVATION_OUTPUT = "langfuse.observation.output" 46 47 # Langfuse-observation of type Generation attributes 48 OBSERVATION_COMPLETION_START_TIME = "langfuse.observation.completion_start_time" 49 OBSERVATION_MODEL = "langfuse.observation.model.name" 50 OBSERVATION_MODEL_PARAMETERS = "langfuse.observation.model.parameters" 51 OBSERVATION_USAGE_DETAILS = "langfuse.observation.usage_details" 52 OBSERVATION_COST_DETAILS = "langfuse.observation.cost_details" 53 OBSERVATION_PROMPT_NAME = "langfuse.observation.prompt.name" 54 OBSERVATION_PROMPT_VERSION = "langfuse.observation.prompt.version" 55 56 # General 57 ENVIRONMENT = "langfuse.environment" 58 RELEASE = "langfuse.release" 59 VERSION = "langfuse.version" 60 61 # Internal 62 AS_ROOT = "langfuse.internal.as_root"
1705class LangfuseAgent(LangfuseObservationWrapper): 1706 """Agent observation for reasoning blocks that act on tools using LLM guidance.""" 1707 1708 def __init__(self, **kwargs: Any) -> None: 1709 """Initialize a new LangfuseAgent span.""" 1710 kwargs["as_type"] = "agent" 1711 super().__init__(**kwargs)
Agent observation for reasoning blocks that act on tools using LLM guidance.
1714class LangfuseTool(LangfuseObservationWrapper): 1715 """Tool observation representing external tool calls, e.g., calling a weather API.""" 1716 1717 def __init__(self, **kwargs: Any) -> None: 1718 """Initialize a new LangfuseTool span.""" 1719 kwargs["as_type"] = "tool" 1720 super().__init__(**kwargs)
Tool observation representing external tool calls, e.g., calling a weather API.
1723class LangfuseChain(LangfuseObservationWrapper): 1724 """Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.""" 1725 1726 def __init__(self, **kwargs: Any) -> None: 1727 """Initialize a new LangfuseChain span.""" 1728 kwargs["as_type"] = "chain" 1729 super().__init__(**kwargs)
Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.
1741class LangfuseEmbedding(LangfuseObservationWrapper): 1742 """Embedding observation for LLM embedding calls, typically used before retrieval.""" 1743 1744 def __init__(self, **kwargs: Any) -> None: 1745 """Initialize a new LangfuseEmbedding span.""" 1746 kwargs["as_type"] = "embedding" 1747 super().__init__(**kwargs)
Embedding observation for LLM embedding calls, typically used before retrieval.
1750class LangfuseEvaluator(LangfuseObservationWrapper): 1751 """Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.""" 1752 1753 def __init__(self, **kwargs: Any) -> None: 1754 """Initialize a new LangfuseEvaluator span.""" 1755 kwargs["as_type"] = "evaluator" 1756 super().__init__(**kwargs)
Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.
1732class LangfuseRetriever(LangfuseObservationWrapper): 1733 """Retriever observation for data retrieval steps, e.g. vector store or database queries.""" 1734 1735 def __init__(self, **kwargs: Any) -> None: 1736 """Initialize a new LangfuseRetriever span.""" 1737 kwargs["as_type"] = "retriever" 1738 super().__init__(**kwargs)
Retriever observation for data retrieval steps, e.g. vector store or database queries.
1759class LangfuseGuardrail(LangfuseObservationWrapper): 1760 """Guardrail observation for protection e.g. against jailbreaks or offensive content.""" 1761 1762 def __init__(self, **kwargs: Any) -> None: 1763 """Initialize a new LangfuseGuardrail span.""" 1764 kwargs["as_type"] = "guardrail" 1765 super().__init__(**kwargs)
Guardrail observation for protection e.g. against jailbreaks or offensive content.
97class Evaluation: 98 """Represents an evaluation result for an experiment item or an entire experiment run. 99 100 This class provides a strongly-typed way to create evaluation results in evaluator functions. 101 Users must use keyword arguments when instantiating this class. 102 103 Attributes: 104 name: Unique identifier for the evaluation metric. Should be descriptive 105 and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). 106 Used for aggregation and comparison across experiment runs. 107 value: The evaluation score or result. Can be: 108 - Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42) 109 - String: For categorical results like "positive", "negative", "neutral" 110 - Boolean: For binary assessments like "passes_safety_check" 111 - None: When evaluation cannot be computed (missing data, API errors, etc.) 112 comment: Optional human-readable explanation of the evaluation result. 113 Useful for providing context, explaining scoring rationale, or noting 114 special conditions. Displayed in Langfuse UI for interpretability. 115 metadata: Optional structured metadata about the evaluation process. 116 Can include confidence scores, intermediate calculations, model versions, 117 or any other relevant technical details. 118 data_type: Optional score data type. Required if value is not NUMERIC. 119 One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC. 120 config_id: Optional Langfuse score config ID. 121 122 Examples: 123 Basic accuracy evaluation: 124 ```python 125 from langfuse import Evaluation 126 127 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 128 if not expected_output: 129 return Evaluation(name="accuracy", value=None, comment="No expected output") 130 131 is_correct = output.strip().lower() == expected_output.strip().lower() 132 return Evaluation( 133 name="accuracy", 134 value=1.0 if is_correct else 0.0, 135 comment="Correct answer" if is_correct else "Incorrect answer" 136 ) 137 ``` 138 139 Multi-metric evaluator: 140 ```python 141 def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): 142 return [ 143 Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), 144 Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), 145 Evaluation( 146 name="quality", 147 value=0.85, 148 comment="High quality response", 149 metadata={"confidence": 0.92, "model": "gpt-4"} 150 ) 151 ] 152 ``` 153 154 Categorical evaluation: 155 ```python 156 def sentiment_evaluator(*, input, output, **kwargs): 157 sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" 158 return Evaluation( 159 name="sentiment", 160 value=sentiment, 161 comment=f"Response expresses {sentiment} sentiment", 162 data_type="CATEGORICAL" 163 ) 164 ``` 165 166 Failed evaluation with error handling: 167 ```python 168 def external_api_evaluator(*, input, output, **kwargs): 169 try: 170 score = external_api.evaluate(output) 171 return Evaluation(name="external_score", value=score) 172 except Exception as e: 173 return Evaluation( 174 name="external_score", 175 value=None, 176 comment=f"API unavailable: {e}", 177 metadata={"error": str(e), "retry_count": 3} 178 ) 179 ``` 180 181 Note: 182 All arguments must be passed as keywords. Positional arguments are not allowed 183 to ensure code clarity and prevent errors from argument reordering. 184 """ 185 186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Represents an evaluation result for an experiment item or an entire experiment run.
This class provides a strongly-typed way to create evaluation results in evaluator functions. Users must use keyword arguments when instantiating this class.
Attributes:
- name: Unique identifier for the evaluation metric. Should be descriptive and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). Used for aggregation and comparison across experiment runs.
- value: The evaluation score or result. Can be:
- Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42)
- String: For categorical results like "positive", "negative", "neutral"
- Boolean: For binary assessments like "passes_safety_check"
- None: When evaluation cannot be computed (missing data, API errors, etc.)
- comment: Optional human-readable explanation of the evaluation result. Useful for providing context, explaining scoring rationale, or noting special conditions. Displayed in Langfuse UI for interpretability.
- metadata: Optional structured metadata about the evaluation process. Can include confidence scores, intermediate calculations, model versions, or any other relevant technical details.
- data_type: Optional score data type. Required if value is not NUMERIC. One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC.
- config_id: Optional Langfuse score config ID.
Examples:
Basic accuracy evaluation:
from langfuse import Evaluation def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if not expected_output: return Evaluation(name="accuracy", value=None, comment="No expected output") is_correct = output.strip().lower() == expected_output.strip().lower() return Evaluation( name="accuracy", value=1.0 if is_correct else 0.0, comment="Correct answer" if is_correct else "Incorrect answer" )
Multi-metric evaluator:
def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): return [ Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), Evaluation( name="quality", value=0.85, comment="High quality response", metadata={"confidence": 0.92, "model": "gpt-4"} ) ]
Categorical evaluation:
def sentiment_evaluator(*, input, output, **kwargs): sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" return Evaluation( name="sentiment", value=sentiment, comment=f"Response expresses {sentiment} sentiment", data_type="CATEGORICAL" )
Failed evaluation with error handling:
def external_api_evaluator(*, input, output, **kwargs): try: score = external_api.evaluate(output) return Evaluation(name="external_score", value=score) except Exception as e: return Evaluation( name="external_score", value=None, comment=f"API unavailable: {e}", metadata={"error": str(e), "retry_count": 3} )
Note:
All arguments must be passed as keywords. Positional arguments are not allowed to ensure code clarity and prevent errors from argument reordering.
186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Initialize an Evaluation with the provided data.
Arguments:
- name: Unique identifier for the evaluation metric.
- value: The evaluation score or result.
- comment: Optional human-readable explanation of the result.
- metadata: Optional structured metadata about the evaluation process.
- data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN).
- config_id: Optional Langfuse score config ID.
Note:
All arguments must be provided as keywords. Positional arguments will raise a TypeError.