langfuse

Langfuse Python SDK
Installation
The SDK was rewritten in v3 and released in June 2025. Refer to the v3 migration guide for instructions on updating your code.
pip install langfuse
Docs
Please see our docs for detailed information on this SDK.
1""".. include:: ../README.md""" 2 3from langfuse.experiment import Evaluation 4 5from ._client import client as _client_module 6from ._client.attributes import LangfuseOtelSpanAttributes 7from ._client.constants import ObservationTypeLiteral 8from ._client.get_client import get_client 9from ._client.observe import observe 10from ._client.span import ( 11 LangfuseAgent, 12 LangfuseChain, 13 LangfuseEmbedding, 14 LangfuseEvaluator, 15 LangfuseEvent, 16 LangfuseGeneration, 17 LangfuseGuardrail, 18 LangfuseRetriever, 19 LangfuseSpan, 20 LangfuseTool, 21) 22 23Langfuse = _client_module.Langfuse 24 25__all__ = [ 26 "Langfuse", 27 "get_client", 28 "observe", 29 "ObservationTypeLiteral", 30 "LangfuseSpan", 31 "LangfuseGeneration", 32 "LangfuseEvent", 33 "LangfuseOtelSpanAttributes", 34 "LangfuseAgent", 35 "LangfuseTool", 36 "LangfuseChain", 37 "LangfuseEmbedding", 38 "LangfuseEvaluator", 39 "LangfuseRetriever", 40 "LangfuseGuardrail", 41 "Evaluation", 42 "experiment", 43 "api", 44]
117class Langfuse: 118 """Main client for Langfuse tracing and platform features. 119 120 This class provides an interface for creating and managing traces, spans, 121 and generations in Langfuse as well as interacting with the Langfuse API. 122 123 The client features a thread-safe singleton pattern for each unique public API key, 124 ensuring consistent trace context propagation across your application. It implements 125 efficient batching of spans with configurable flush settings and includes background 126 thread management for media uploads and score ingestion. 127 128 Configuration is flexible through either direct parameters or environment variables, 129 with graceful fallbacks and runtime configuration updates. 130 131 Attributes: 132 api: Synchronous API client for Langfuse backend communication 133 async_api: Asynchronous API client for Langfuse backend communication 134 _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components 135 136 Parameters: 137 public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable. 138 secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable. 139 base_url (Optional[str]): The Langfuse API base URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_BASE_URL environment variable. 140 host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". 141 timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. 142 httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. 143 debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. 144 tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. 145 flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. 146 flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable. 147 environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. 148 release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release. 149 media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. 150 sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. 151 mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. 152 blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`) 153 additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. 154 tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees. 155 156 Example: 157 ```python 158 from langfuse.otel import Langfuse 159 160 # Initialize the client (reads from env vars if not provided) 161 langfuse = Langfuse( 162 public_key="your-public-key", 163 secret_key="your-secret-key", 164 host="https://cloud.langfuse.com", # Optional, default shown 165 ) 166 167 # Create a trace span 168 with langfuse.start_as_current_span(name="process-query") as span: 169 # Your application code here 170 171 # Create a nested generation span for an LLM call 172 with span.start_as_current_generation( 173 name="generate-response", 174 model="gpt-4", 175 input={"query": "Tell me about AI"}, 176 model_parameters={"temperature": 0.7, "max_tokens": 500} 177 ) as generation: 178 # Generate response here 179 response = "AI is a field of computer science..." 180 181 generation.update( 182 output=response, 183 usage_details={"prompt_tokens": 10, "completion_tokens": 50}, 184 cost_details={"total_cost": 0.0023} 185 ) 186 187 # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) 188 generation.score(name="relevance", value=0.95, data_type="NUMERIC") 189 ``` 190 """ 191 192 _resources: Optional[LangfuseResourceManager] = None 193 _mask: Optional[MaskFunction] = None 194 _otel_tracer: otel_trace_api.Tracer 195 196 def __init__( 197 self, 198 *, 199 public_key: Optional[str] = None, 200 secret_key: Optional[str] = None, 201 base_url: Optional[str] = None, 202 host: Optional[str] = None, 203 timeout: Optional[int] = None, 204 httpx_client: Optional[httpx.Client] = None, 205 debug: bool = False, 206 tracing_enabled: Optional[bool] = True, 207 flush_at: Optional[int] = None, 208 flush_interval: Optional[float] = None, 209 environment: Optional[str] = None, 210 release: Optional[str] = None, 211 media_upload_thread_count: Optional[int] = None, 212 sample_rate: Optional[float] = None, 213 mask: Optional[MaskFunction] = None, 214 blocked_instrumentation_scopes: Optional[List[str]] = None, 215 additional_headers: Optional[Dict[str, str]] = None, 216 tracer_provider: Optional[TracerProvider] = None, 217 ): 218 self._base_url = ( 219 base_url 220 or os.environ.get(LANGFUSE_BASE_URL) 221 or host 222 or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 223 ) 224 self._environment = environment or cast( 225 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 226 ) 227 self._project_id: Optional[str] = None 228 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 229 if not 0.0 <= sample_rate <= 1.0: 230 raise ValueError( 231 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 232 ) 233 234 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 235 236 self._tracing_enabled = ( 237 tracing_enabled 238 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 239 ) 240 if not self._tracing_enabled: 241 langfuse_logger.info( 242 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 243 ) 244 245 debug = ( 246 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 247 ) 248 if debug: 249 logging.basicConfig( 250 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 251 ) 252 langfuse_logger.setLevel(logging.DEBUG) 253 254 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 255 if public_key is None: 256 langfuse_logger.warning( 257 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 258 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 259 ) 260 self._otel_tracer = otel_trace_api.NoOpTracer() 261 return 262 263 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 264 if secret_key is None: 265 langfuse_logger.warning( 266 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 267 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 268 ) 269 self._otel_tracer = otel_trace_api.NoOpTracer() 270 return 271 272 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 273 langfuse_logger.warning( 274 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 275 ) 276 277 # Initialize api and tracer if requirements are met 278 self._resources = LangfuseResourceManager( 279 public_key=public_key, 280 secret_key=secret_key, 281 base_url=self._base_url, 282 timeout=timeout, 283 environment=self._environment, 284 release=release, 285 flush_at=flush_at, 286 flush_interval=flush_interval, 287 httpx_client=httpx_client, 288 media_upload_thread_count=media_upload_thread_count, 289 sample_rate=sample_rate, 290 mask=mask, 291 tracing_enabled=self._tracing_enabled, 292 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 293 additional_headers=additional_headers, 294 tracer_provider=tracer_provider, 295 ) 296 self._mask = self._resources.mask 297 298 self._otel_tracer = ( 299 self._resources.tracer 300 if self._tracing_enabled and self._resources.tracer is not None 301 else otel_trace_api.NoOpTracer() 302 ) 303 self.api = self._resources.api 304 self.async_api = self._resources.async_api 305 306 def start_span( 307 self, 308 *, 309 trace_context: Optional[TraceContext] = None, 310 name: str, 311 input: Optional[Any] = None, 312 output: Optional[Any] = None, 313 metadata: Optional[Any] = None, 314 version: Optional[str] = None, 315 level: Optional[SpanLevel] = None, 316 status_message: Optional[str] = None, 317 ) -> LangfuseSpan: 318 """Create a new span for tracing a unit of work. 319 320 This method creates a new span but does not set it as the current span in the 321 context. To create and use a span within a context, use start_as_current_span(). 322 323 The created span will be the child of the current span in the context. 324 325 Args: 326 trace_context: Optional context for connecting to an existing trace 327 name: Name of the span (e.g., function or operation name) 328 input: Input data for the operation (can be any JSON-serializable object) 329 output: Output data from the operation (can be any JSON-serializable object) 330 metadata: Additional metadata to associate with the span 331 version: Version identifier for the code or component 332 level: Importance level of the span (info, warning, error) 333 status_message: Optional status message for the span 334 335 Returns: 336 A LangfuseSpan object that must be ended with .end() when the operation completes 337 338 Example: 339 ```python 340 span = langfuse.start_span(name="process-data") 341 try: 342 # Do work 343 span.update(output="result") 344 finally: 345 span.end() 346 ``` 347 """ 348 return self.start_observation( 349 trace_context=trace_context, 350 name=name, 351 as_type="span", 352 input=input, 353 output=output, 354 metadata=metadata, 355 version=version, 356 level=level, 357 status_message=status_message, 358 ) 359 360 def start_as_current_span( 361 self, 362 *, 363 trace_context: Optional[TraceContext] = None, 364 name: str, 365 input: Optional[Any] = None, 366 output: Optional[Any] = None, 367 metadata: Optional[Any] = None, 368 version: Optional[str] = None, 369 level: Optional[SpanLevel] = None, 370 status_message: Optional[str] = None, 371 end_on_exit: Optional[bool] = None, 372 ) -> _AgnosticContextManager[LangfuseSpan]: 373 """Create a new span and set it as the current span in a context manager. 374 375 This method creates a new span and sets it as the current span within a context 376 manager. Use this method with a 'with' statement to automatically handle span 377 lifecycle within a code block. 378 379 The created span will be the child of the current span in the context. 380 381 Args: 382 trace_context: Optional context for connecting to an existing trace 383 name: Name of the span (e.g., function or operation name) 384 input: Input data for the operation (can be any JSON-serializable object) 385 output: Output data from the operation (can be any JSON-serializable object) 386 metadata: Additional metadata to associate with the span 387 version: Version identifier for the code or component 388 level: Importance level of the span (info, warning, error) 389 status_message: Optional status message for the span 390 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 391 392 Returns: 393 A context manager that yields a LangfuseSpan 394 395 Example: 396 ```python 397 with langfuse.start_as_current_span(name="process-query") as span: 398 # Do work 399 result = process_data() 400 span.update(output=result) 401 402 # Create a child span automatically 403 with span.start_as_current_span(name="sub-operation") as child_span: 404 # Do sub-operation work 405 child_span.update(output="sub-result") 406 ``` 407 """ 408 return self.start_as_current_observation( 409 trace_context=trace_context, 410 name=name, 411 as_type="span", 412 input=input, 413 output=output, 414 metadata=metadata, 415 version=version, 416 level=level, 417 status_message=status_message, 418 end_on_exit=end_on_exit, 419 ) 420 421 @overload 422 def start_observation( 423 self, 424 *, 425 trace_context: Optional[TraceContext] = None, 426 name: str, 427 as_type: Literal["generation"], 428 input: Optional[Any] = None, 429 output: Optional[Any] = None, 430 metadata: Optional[Any] = None, 431 version: Optional[str] = None, 432 level: Optional[SpanLevel] = None, 433 status_message: Optional[str] = None, 434 completion_start_time: Optional[datetime] = None, 435 model: Optional[str] = None, 436 model_parameters: Optional[Dict[str, MapValue]] = None, 437 usage_details: Optional[Dict[str, int]] = None, 438 cost_details: Optional[Dict[str, float]] = None, 439 prompt: Optional[PromptClient] = None, 440 ) -> LangfuseGeneration: ... 441 442 @overload 443 def start_observation( 444 self, 445 *, 446 trace_context: Optional[TraceContext] = None, 447 name: str, 448 as_type: Literal["span"] = "span", 449 input: Optional[Any] = None, 450 output: Optional[Any] = None, 451 metadata: Optional[Any] = None, 452 version: Optional[str] = None, 453 level: Optional[SpanLevel] = None, 454 status_message: Optional[str] = None, 455 ) -> LangfuseSpan: ... 456 457 @overload 458 def start_observation( 459 self, 460 *, 461 trace_context: Optional[TraceContext] = None, 462 name: str, 463 as_type: Literal["agent"], 464 input: Optional[Any] = None, 465 output: Optional[Any] = None, 466 metadata: Optional[Any] = None, 467 version: Optional[str] = None, 468 level: Optional[SpanLevel] = None, 469 status_message: Optional[str] = None, 470 ) -> LangfuseAgent: ... 471 472 @overload 473 def start_observation( 474 self, 475 *, 476 trace_context: Optional[TraceContext] = None, 477 name: str, 478 as_type: Literal["tool"], 479 input: Optional[Any] = None, 480 output: Optional[Any] = None, 481 metadata: Optional[Any] = None, 482 version: Optional[str] = None, 483 level: Optional[SpanLevel] = None, 484 status_message: Optional[str] = None, 485 ) -> LangfuseTool: ... 486 487 @overload 488 def start_observation( 489 self, 490 *, 491 trace_context: Optional[TraceContext] = None, 492 name: str, 493 as_type: Literal["chain"], 494 input: Optional[Any] = None, 495 output: Optional[Any] = None, 496 metadata: Optional[Any] = None, 497 version: Optional[str] = None, 498 level: Optional[SpanLevel] = None, 499 status_message: Optional[str] = None, 500 ) -> LangfuseChain: ... 501 502 @overload 503 def start_observation( 504 self, 505 *, 506 trace_context: Optional[TraceContext] = None, 507 name: str, 508 as_type: Literal["retriever"], 509 input: Optional[Any] = None, 510 output: Optional[Any] = None, 511 metadata: Optional[Any] = None, 512 version: Optional[str] = None, 513 level: Optional[SpanLevel] = None, 514 status_message: Optional[str] = None, 515 ) -> LangfuseRetriever: ... 516 517 @overload 518 def start_observation( 519 self, 520 *, 521 trace_context: Optional[TraceContext] = None, 522 name: str, 523 as_type: Literal["evaluator"], 524 input: Optional[Any] = None, 525 output: Optional[Any] = None, 526 metadata: Optional[Any] = None, 527 version: Optional[str] = None, 528 level: Optional[SpanLevel] = None, 529 status_message: Optional[str] = None, 530 ) -> LangfuseEvaluator: ... 531 532 @overload 533 def start_observation( 534 self, 535 *, 536 trace_context: Optional[TraceContext] = None, 537 name: str, 538 as_type: Literal["embedding"], 539 input: Optional[Any] = None, 540 output: Optional[Any] = None, 541 metadata: Optional[Any] = None, 542 version: Optional[str] = None, 543 level: Optional[SpanLevel] = None, 544 status_message: Optional[str] = None, 545 completion_start_time: Optional[datetime] = None, 546 model: Optional[str] = None, 547 model_parameters: Optional[Dict[str, MapValue]] = None, 548 usage_details: Optional[Dict[str, int]] = None, 549 cost_details: Optional[Dict[str, float]] = None, 550 prompt: Optional[PromptClient] = None, 551 ) -> LangfuseEmbedding: ... 552 553 @overload 554 def start_observation( 555 self, 556 *, 557 trace_context: Optional[TraceContext] = None, 558 name: str, 559 as_type: Literal["guardrail"], 560 input: Optional[Any] = None, 561 output: Optional[Any] = None, 562 metadata: Optional[Any] = None, 563 version: Optional[str] = None, 564 level: Optional[SpanLevel] = None, 565 status_message: Optional[str] = None, 566 ) -> LangfuseGuardrail: ... 567 568 def start_observation( 569 self, 570 *, 571 trace_context: Optional[TraceContext] = None, 572 name: str, 573 as_type: ObservationTypeLiteralNoEvent = "span", 574 input: Optional[Any] = None, 575 output: Optional[Any] = None, 576 metadata: Optional[Any] = None, 577 version: Optional[str] = None, 578 level: Optional[SpanLevel] = None, 579 status_message: Optional[str] = None, 580 completion_start_time: Optional[datetime] = None, 581 model: Optional[str] = None, 582 model_parameters: Optional[Dict[str, MapValue]] = None, 583 usage_details: Optional[Dict[str, int]] = None, 584 cost_details: Optional[Dict[str, float]] = None, 585 prompt: Optional[PromptClient] = None, 586 ) -> Union[ 587 LangfuseSpan, 588 LangfuseGeneration, 589 LangfuseAgent, 590 LangfuseTool, 591 LangfuseChain, 592 LangfuseRetriever, 593 LangfuseEvaluator, 594 LangfuseEmbedding, 595 LangfuseGuardrail, 596 ]: 597 """Create a new observation of the specified type. 598 599 This method creates a new observation but does not set it as the current span in the 600 context. To create and use an observation within a context, use start_as_current_observation(). 601 602 Args: 603 trace_context: Optional context for connecting to an existing trace 604 name: Name of the observation 605 as_type: Type of observation to create (defaults to "span") 606 input: Input data for the operation 607 output: Output data from the operation 608 metadata: Additional metadata to associate with the observation 609 version: Version identifier for the code or component 610 level: Importance level of the observation 611 status_message: Optional status message for the observation 612 completion_start_time: When the model started generating (for generation types) 613 model: Name/identifier of the AI model used (for generation types) 614 model_parameters: Parameters used for the model (for generation types) 615 usage_details: Token usage information (for generation types) 616 cost_details: Cost information (for generation types) 617 prompt: Associated prompt template (for generation types) 618 619 Returns: 620 An observation object of the appropriate type that must be ended with .end() 621 """ 622 if trace_context: 623 trace_id = trace_context.get("trace_id", None) 624 parent_span_id = trace_context.get("parent_span_id", None) 625 626 if trace_id: 627 remote_parent_span = self._create_remote_parent_span( 628 trace_id=trace_id, parent_span_id=parent_span_id 629 ) 630 631 with otel_trace_api.use_span( 632 cast(otel_trace_api.Span, remote_parent_span) 633 ): 634 otel_span = self._otel_tracer.start_span(name=name) 635 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 636 637 return self._create_observation_from_otel_span( 638 otel_span=otel_span, 639 as_type=as_type, 640 input=input, 641 output=output, 642 metadata=metadata, 643 version=version, 644 level=level, 645 status_message=status_message, 646 completion_start_time=completion_start_time, 647 model=model, 648 model_parameters=model_parameters, 649 usage_details=usage_details, 650 cost_details=cost_details, 651 prompt=prompt, 652 ) 653 654 otel_span = self._otel_tracer.start_span(name=name) 655 656 return self._create_observation_from_otel_span( 657 otel_span=otel_span, 658 as_type=as_type, 659 input=input, 660 output=output, 661 metadata=metadata, 662 version=version, 663 level=level, 664 status_message=status_message, 665 completion_start_time=completion_start_time, 666 model=model, 667 model_parameters=model_parameters, 668 usage_details=usage_details, 669 cost_details=cost_details, 670 prompt=prompt, 671 ) 672 673 def _create_observation_from_otel_span( 674 self, 675 *, 676 otel_span: otel_trace_api.Span, 677 as_type: ObservationTypeLiteralNoEvent, 678 input: Optional[Any] = None, 679 output: Optional[Any] = None, 680 metadata: Optional[Any] = None, 681 version: Optional[str] = None, 682 level: Optional[SpanLevel] = None, 683 status_message: Optional[str] = None, 684 completion_start_time: Optional[datetime] = None, 685 model: Optional[str] = None, 686 model_parameters: Optional[Dict[str, MapValue]] = None, 687 usage_details: Optional[Dict[str, int]] = None, 688 cost_details: Optional[Dict[str, float]] = None, 689 prompt: Optional[PromptClient] = None, 690 ) -> Union[ 691 LangfuseSpan, 692 LangfuseGeneration, 693 LangfuseAgent, 694 LangfuseTool, 695 LangfuseChain, 696 LangfuseRetriever, 697 LangfuseEvaluator, 698 LangfuseEmbedding, 699 LangfuseGuardrail, 700 ]: 701 """Create the appropriate observation type from an OTEL span.""" 702 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 703 observation_class = self._get_span_class(as_type) 704 # Type ignore to prevent overloads of internal _get_span_class function, 705 # issue is that LangfuseEvent could be returned and that classes have diff. args 706 return observation_class( # type: ignore[return-value,call-arg] 707 otel_span=otel_span, 708 langfuse_client=self, 709 environment=self._environment, 710 input=input, 711 output=output, 712 metadata=metadata, 713 version=version, 714 level=level, 715 status_message=status_message, 716 completion_start_time=completion_start_time, 717 model=model, 718 model_parameters=model_parameters, 719 usage_details=usage_details, 720 cost_details=cost_details, 721 prompt=prompt, 722 ) 723 else: 724 # For other types (e.g. span, guardrail), create appropriate class without generation properties 725 observation_class = self._get_span_class(as_type) 726 # Type ignore to prevent overloads of internal _get_span_class function, 727 # issue is that LangfuseEvent could be returned and that classes have diff. args 728 return observation_class( # type: ignore[return-value,call-arg] 729 otel_span=otel_span, 730 langfuse_client=self, 731 environment=self._environment, 732 input=input, 733 output=output, 734 metadata=metadata, 735 version=version, 736 level=level, 737 status_message=status_message, 738 ) 739 # span._observation_type = as_type 740 # span._otel_span.set_attribute("langfuse.observation.type", as_type) 741 # return span 742 743 def start_generation( 744 self, 745 *, 746 trace_context: Optional[TraceContext] = None, 747 name: str, 748 input: Optional[Any] = None, 749 output: Optional[Any] = None, 750 metadata: Optional[Any] = None, 751 version: Optional[str] = None, 752 level: Optional[SpanLevel] = None, 753 status_message: Optional[str] = None, 754 completion_start_time: Optional[datetime] = None, 755 model: Optional[str] = None, 756 model_parameters: Optional[Dict[str, MapValue]] = None, 757 usage_details: Optional[Dict[str, int]] = None, 758 cost_details: Optional[Dict[str, float]] = None, 759 prompt: Optional[PromptClient] = None, 760 ) -> LangfuseGeneration: 761 """Create a new generation span for model generations. 762 763 DEPRECATED: This method is deprecated and will be removed in a future version. 764 Use start_observation(as_type='generation') instead. 765 766 This method creates a specialized span for tracking model generations. 767 It includes additional fields specific to model generations such as model name, 768 token usage, and cost details. 769 770 The created generation span will be the child of the current span in the context. 771 772 Args: 773 trace_context: Optional context for connecting to an existing trace 774 name: Name of the generation operation 775 input: Input data for the model (e.g., prompts) 776 output: Output from the model (e.g., completions) 777 metadata: Additional metadata to associate with the generation 778 version: Version identifier for the model or component 779 level: Importance level of the generation (info, warning, error) 780 status_message: Optional status message for the generation 781 completion_start_time: When the model started generating the response 782 model: Name/identifier of the AI model used (e.g., "gpt-4") 783 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 784 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 785 cost_details: Cost information for the model call 786 prompt: Associated prompt template from Langfuse prompt management 787 788 Returns: 789 A LangfuseGeneration object that must be ended with .end() when complete 790 791 Example: 792 ```python 793 generation = langfuse.start_generation( 794 name="answer-generation", 795 model="gpt-4", 796 input={"prompt": "Explain quantum computing"}, 797 model_parameters={"temperature": 0.7} 798 ) 799 try: 800 # Call model API 801 response = llm.generate(...) 802 803 generation.update( 804 output=response.text, 805 usage_details={ 806 "prompt_tokens": response.usage.prompt_tokens, 807 "completion_tokens": response.usage.completion_tokens 808 } 809 ) 810 finally: 811 generation.end() 812 ``` 813 """ 814 warnings.warn( 815 "start_generation is deprecated and will be removed in a future version. " 816 "Use start_observation(as_type='generation') instead.", 817 DeprecationWarning, 818 stacklevel=2, 819 ) 820 return self.start_observation( 821 trace_context=trace_context, 822 name=name, 823 as_type="generation", 824 input=input, 825 output=output, 826 metadata=metadata, 827 version=version, 828 level=level, 829 status_message=status_message, 830 completion_start_time=completion_start_time, 831 model=model, 832 model_parameters=model_parameters, 833 usage_details=usage_details, 834 cost_details=cost_details, 835 prompt=prompt, 836 ) 837 838 def start_as_current_generation( 839 self, 840 *, 841 trace_context: Optional[TraceContext] = None, 842 name: str, 843 input: Optional[Any] = None, 844 output: Optional[Any] = None, 845 metadata: Optional[Any] = None, 846 version: Optional[str] = None, 847 level: Optional[SpanLevel] = None, 848 status_message: Optional[str] = None, 849 completion_start_time: Optional[datetime] = None, 850 model: Optional[str] = None, 851 model_parameters: Optional[Dict[str, MapValue]] = None, 852 usage_details: Optional[Dict[str, int]] = None, 853 cost_details: Optional[Dict[str, float]] = None, 854 prompt: Optional[PromptClient] = None, 855 end_on_exit: Optional[bool] = None, 856 ) -> _AgnosticContextManager[LangfuseGeneration]: 857 """Create a new generation span and set it as the current span in a context manager. 858 859 DEPRECATED: This method is deprecated and will be removed in a future version. 860 Use start_as_current_observation(as_type='generation') instead. 861 862 This method creates a specialized span for model generations and sets it as the 863 current span within a context manager. Use this method with a 'with' statement to 864 automatically handle the generation span lifecycle within a code block. 865 866 The created generation span will be the child of the current span in the context. 867 868 Args: 869 trace_context: Optional context for connecting to an existing trace 870 name: Name of the generation operation 871 input: Input data for the model (e.g., prompts) 872 output: Output from the model (e.g., completions) 873 metadata: Additional metadata to associate with the generation 874 version: Version identifier for the model or component 875 level: Importance level of the generation (info, warning, error) 876 status_message: Optional status message for the generation 877 completion_start_time: When the model started generating the response 878 model: Name/identifier of the AI model used (e.g., "gpt-4") 879 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 880 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 881 cost_details: Cost information for the model call 882 prompt: Associated prompt template from Langfuse prompt management 883 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 884 885 Returns: 886 A context manager that yields a LangfuseGeneration 887 888 Example: 889 ```python 890 with langfuse.start_as_current_generation( 891 name="answer-generation", 892 model="gpt-4", 893 input={"prompt": "Explain quantum computing"} 894 ) as generation: 895 # Call model API 896 response = llm.generate(...) 897 898 # Update with results 899 generation.update( 900 output=response.text, 901 usage_details={ 902 "prompt_tokens": response.usage.prompt_tokens, 903 "completion_tokens": response.usage.completion_tokens 904 } 905 ) 906 ``` 907 """ 908 warnings.warn( 909 "start_as_current_generation is deprecated and will be removed in a future version. " 910 "Use start_as_current_observation(as_type='generation') instead.", 911 DeprecationWarning, 912 stacklevel=2, 913 ) 914 return self.start_as_current_observation( 915 trace_context=trace_context, 916 name=name, 917 as_type="generation", 918 input=input, 919 output=output, 920 metadata=metadata, 921 version=version, 922 level=level, 923 status_message=status_message, 924 completion_start_time=completion_start_time, 925 model=model, 926 model_parameters=model_parameters, 927 usage_details=usage_details, 928 cost_details=cost_details, 929 prompt=prompt, 930 end_on_exit=end_on_exit, 931 ) 932 933 @overload 934 def start_as_current_observation( 935 self, 936 *, 937 trace_context: Optional[TraceContext] = None, 938 name: str, 939 as_type: Literal["generation"], 940 input: Optional[Any] = None, 941 output: Optional[Any] = None, 942 metadata: Optional[Any] = None, 943 version: Optional[str] = None, 944 level: Optional[SpanLevel] = None, 945 status_message: Optional[str] = None, 946 completion_start_time: Optional[datetime] = None, 947 model: Optional[str] = None, 948 model_parameters: Optional[Dict[str, MapValue]] = None, 949 usage_details: Optional[Dict[str, int]] = None, 950 cost_details: Optional[Dict[str, float]] = None, 951 prompt: Optional[PromptClient] = None, 952 end_on_exit: Optional[bool] = None, 953 ) -> _AgnosticContextManager[LangfuseGeneration]: ... 954 955 @overload 956 def start_as_current_observation( 957 self, 958 *, 959 trace_context: Optional[TraceContext] = None, 960 name: str, 961 as_type: Literal["span"] = "span", 962 input: Optional[Any] = None, 963 output: Optional[Any] = None, 964 metadata: Optional[Any] = None, 965 version: Optional[str] = None, 966 level: Optional[SpanLevel] = None, 967 status_message: Optional[str] = None, 968 end_on_exit: Optional[bool] = None, 969 ) -> _AgnosticContextManager[LangfuseSpan]: ... 970 971 @overload 972 def start_as_current_observation( 973 self, 974 *, 975 trace_context: Optional[TraceContext] = None, 976 name: str, 977 as_type: Literal["agent"], 978 input: Optional[Any] = None, 979 output: Optional[Any] = None, 980 metadata: Optional[Any] = None, 981 version: Optional[str] = None, 982 level: Optional[SpanLevel] = None, 983 status_message: Optional[str] = None, 984 end_on_exit: Optional[bool] = None, 985 ) -> _AgnosticContextManager[LangfuseAgent]: ... 986 987 @overload 988 def start_as_current_observation( 989 self, 990 *, 991 trace_context: Optional[TraceContext] = None, 992 name: str, 993 as_type: Literal["tool"], 994 input: Optional[Any] = None, 995 output: Optional[Any] = None, 996 metadata: Optional[Any] = None, 997 version: Optional[str] = None, 998 level: Optional[SpanLevel] = None, 999 status_message: Optional[str] = None, 1000 end_on_exit: Optional[bool] = None, 1001 ) -> _AgnosticContextManager[LangfuseTool]: ... 1002 1003 @overload 1004 def start_as_current_observation( 1005 self, 1006 *, 1007 trace_context: Optional[TraceContext] = None, 1008 name: str, 1009 as_type: Literal["chain"], 1010 input: Optional[Any] = None, 1011 output: Optional[Any] = None, 1012 metadata: Optional[Any] = None, 1013 version: Optional[str] = None, 1014 level: Optional[SpanLevel] = None, 1015 status_message: Optional[str] = None, 1016 end_on_exit: Optional[bool] = None, 1017 ) -> _AgnosticContextManager[LangfuseChain]: ... 1018 1019 @overload 1020 def start_as_current_observation( 1021 self, 1022 *, 1023 trace_context: Optional[TraceContext] = None, 1024 name: str, 1025 as_type: Literal["retriever"], 1026 input: Optional[Any] = None, 1027 output: Optional[Any] = None, 1028 metadata: Optional[Any] = None, 1029 version: Optional[str] = None, 1030 level: Optional[SpanLevel] = None, 1031 status_message: Optional[str] = None, 1032 end_on_exit: Optional[bool] = None, 1033 ) -> _AgnosticContextManager[LangfuseRetriever]: ... 1034 1035 @overload 1036 def start_as_current_observation( 1037 self, 1038 *, 1039 trace_context: Optional[TraceContext] = None, 1040 name: str, 1041 as_type: Literal["evaluator"], 1042 input: Optional[Any] = None, 1043 output: Optional[Any] = None, 1044 metadata: Optional[Any] = None, 1045 version: Optional[str] = None, 1046 level: Optional[SpanLevel] = None, 1047 status_message: Optional[str] = None, 1048 end_on_exit: Optional[bool] = None, 1049 ) -> _AgnosticContextManager[LangfuseEvaluator]: ... 1050 1051 @overload 1052 def start_as_current_observation( 1053 self, 1054 *, 1055 trace_context: Optional[TraceContext] = None, 1056 name: str, 1057 as_type: Literal["embedding"], 1058 input: Optional[Any] = None, 1059 output: Optional[Any] = None, 1060 metadata: Optional[Any] = None, 1061 version: Optional[str] = None, 1062 level: Optional[SpanLevel] = None, 1063 status_message: Optional[str] = None, 1064 completion_start_time: Optional[datetime] = None, 1065 model: Optional[str] = None, 1066 model_parameters: Optional[Dict[str, MapValue]] = None, 1067 usage_details: Optional[Dict[str, int]] = None, 1068 cost_details: Optional[Dict[str, float]] = None, 1069 prompt: Optional[PromptClient] = None, 1070 end_on_exit: Optional[bool] = None, 1071 ) -> _AgnosticContextManager[LangfuseEmbedding]: ... 1072 1073 @overload 1074 def start_as_current_observation( 1075 self, 1076 *, 1077 trace_context: Optional[TraceContext] = None, 1078 name: str, 1079 as_type: Literal["guardrail"], 1080 input: Optional[Any] = None, 1081 output: Optional[Any] = None, 1082 metadata: Optional[Any] = None, 1083 version: Optional[str] = None, 1084 level: Optional[SpanLevel] = None, 1085 status_message: Optional[str] = None, 1086 end_on_exit: Optional[bool] = None, 1087 ) -> _AgnosticContextManager[LangfuseGuardrail]: ... 1088 1089 def start_as_current_observation( 1090 self, 1091 *, 1092 trace_context: Optional[TraceContext] = None, 1093 name: str, 1094 as_type: ObservationTypeLiteralNoEvent = "span", 1095 input: Optional[Any] = None, 1096 output: Optional[Any] = None, 1097 metadata: Optional[Any] = None, 1098 version: Optional[str] = None, 1099 level: Optional[SpanLevel] = None, 1100 status_message: Optional[str] = None, 1101 completion_start_time: Optional[datetime] = None, 1102 model: Optional[str] = None, 1103 model_parameters: Optional[Dict[str, MapValue]] = None, 1104 usage_details: Optional[Dict[str, int]] = None, 1105 cost_details: Optional[Dict[str, float]] = None, 1106 prompt: Optional[PromptClient] = None, 1107 end_on_exit: Optional[bool] = None, 1108 ) -> Union[ 1109 _AgnosticContextManager[LangfuseGeneration], 1110 _AgnosticContextManager[LangfuseSpan], 1111 _AgnosticContextManager[LangfuseAgent], 1112 _AgnosticContextManager[LangfuseTool], 1113 _AgnosticContextManager[LangfuseChain], 1114 _AgnosticContextManager[LangfuseRetriever], 1115 _AgnosticContextManager[LangfuseEvaluator], 1116 _AgnosticContextManager[LangfuseEmbedding], 1117 _AgnosticContextManager[LangfuseGuardrail], 1118 ]: 1119 """Create a new observation and set it as the current span in a context manager. 1120 1121 This method creates a new observation of the specified type and sets it as the 1122 current span within a context manager. Use this method with a 'with' statement to 1123 automatically handle the observation lifecycle within a code block. 1124 1125 The created observation will be the child of the current span in the context. 1126 1127 Args: 1128 trace_context: Optional context for connecting to an existing trace 1129 name: Name of the observation (e.g., function or operation name) 1130 as_type: Type of observation to create (defaults to "span") 1131 input: Input data for the operation (can be any JSON-serializable object) 1132 output: Output data from the operation (can be any JSON-serializable object) 1133 metadata: Additional metadata to associate with the observation 1134 version: Version identifier for the code or component 1135 level: Importance level of the observation (info, warning, error) 1136 status_message: Optional status message for the observation 1137 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1138 1139 The following parameters are available when as_type is: "generation" or "embedding". 1140 completion_start_time: When the model started generating the response 1141 model: Name/identifier of the AI model used (e.g., "gpt-4") 1142 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1143 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1144 cost_details: Cost information for the model call 1145 prompt: Associated prompt template from Langfuse prompt management 1146 1147 Returns: 1148 A context manager that yields the appropriate observation type based on as_type 1149 1150 Example: 1151 ```python 1152 # Create a span 1153 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1154 # Do work 1155 result = process_data() 1156 span.update(output=result) 1157 1158 # Create a child span automatically 1159 with span.start_as_current_span(name="sub-operation") as child_span: 1160 # Do sub-operation work 1161 child_span.update(output="sub-result") 1162 1163 # Create a tool observation 1164 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1165 # Do tool work 1166 results = search_web(query) 1167 tool.update(output=results) 1168 1169 # Create a generation observation 1170 with langfuse.start_as_current_observation( 1171 name="answer-generation", 1172 as_type="generation", 1173 model="gpt-4" 1174 ) as generation: 1175 # Generate answer 1176 response = llm.generate(...) 1177 generation.update(output=response) 1178 ``` 1179 """ 1180 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1181 if trace_context: 1182 trace_id = trace_context.get("trace_id", None) 1183 parent_span_id = trace_context.get("parent_span_id", None) 1184 1185 if trace_id: 1186 remote_parent_span = self._create_remote_parent_span( 1187 trace_id=trace_id, parent_span_id=parent_span_id 1188 ) 1189 1190 return cast( 1191 Union[ 1192 _AgnosticContextManager[LangfuseGeneration], 1193 _AgnosticContextManager[LangfuseEmbedding], 1194 ], 1195 self._create_span_with_parent_context( 1196 as_type=as_type, 1197 name=name, 1198 remote_parent_span=remote_parent_span, 1199 parent=None, 1200 end_on_exit=end_on_exit, 1201 input=input, 1202 output=output, 1203 metadata=metadata, 1204 version=version, 1205 level=level, 1206 status_message=status_message, 1207 completion_start_time=completion_start_time, 1208 model=model, 1209 model_parameters=model_parameters, 1210 usage_details=usage_details, 1211 cost_details=cost_details, 1212 prompt=prompt, 1213 ), 1214 ) 1215 1216 return cast( 1217 Union[ 1218 _AgnosticContextManager[LangfuseGeneration], 1219 _AgnosticContextManager[LangfuseEmbedding], 1220 ], 1221 self._start_as_current_otel_span_with_processed_media( 1222 as_type=as_type, 1223 name=name, 1224 end_on_exit=end_on_exit, 1225 input=input, 1226 output=output, 1227 metadata=metadata, 1228 version=version, 1229 level=level, 1230 status_message=status_message, 1231 completion_start_time=completion_start_time, 1232 model=model, 1233 model_parameters=model_parameters, 1234 usage_details=usage_details, 1235 cost_details=cost_details, 1236 prompt=prompt, 1237 ), 1238 ) 1239 1240 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1241 if trace_context: 1242 trace_id = trace_context.get("trace_id", None) 1243 parent_span_id = trace_context.get("parent_span_id", None) 1244 1245 if trace_id: 1246 remote_parent_span = self._create_remote_parent_span( 1247 trace_id=trace_id, parent_span_id=parent_span_id 1248 ) 1249 1250 return cast( 1251 Union[ 1252 _AgnosticContextManager[LangfuseSpan], 1253 _AgnosticContextManager[LangfuseAgent], 1254 _AgnosticContextManager[LangfuseTool], 1255 _AgnosticContextManager[LangfuseChain], 1256 _AgnosticContextManager[LangfuseRetriever], 1257 _AgnosticContextManager[LangfuseEvaluator], 1258 _AgnosticContextManager[LangfuseGuardrail], 1259 ], 1260 self._create_span_with_parent_context( 1261 as_type=as_type, 1262 name=name, 1263 remote_parent_span=remote_parent_span, 1264 parent=None, 1265 end_on_exit=end_on_exit, 1266 input=input, 1267 output=output, 1268 metadata=metadata, 1269 version=version, 1270 level=level, 1271 status_message=status_message, 1272 ), 1273 ) 1274 1275 return cast( 1276 Union[ 1277 _AgnosticContextManager[LangfuseSpan], 1278 _AgnosticContextManager[LangfuseAgent], 1279 _AgnosticContextManager[LangfuseTool], 1280 _AgnosticContextManager[LangfuseChain], 1281 _AgnosticContextManager[LangfuseRetriever], 1282 _AgnosticContextManager[LangfuseEvaluator], 1283 _AgnosticContextManager[LangfuseGuardrail], 1284 ], 1285 self._start_as_current_otel_span_with_processed_media( 1286 as_type=as_type, 1287 name=name, 1288 end_on_exit=end_on_exit, 1289 input=input, 1290 output=output, 1291 metadata=metadata, 1292 version=version, 1293 level=level, 1294 status_message=status_message, 1295 ), 1296 ) 1297 1298 # This should never be reached since all valid types are handled above 1299 langfuse_logger.warning( 1300 f"Unknown observation type: {as_type}, falling back to span" 1301 ) 1302 return self._start_as_current_otel_span_with_processed_media( 1303 as_type="span", 1304 name=name, 1305 end_on_exit=end_on_exit, 1306 input=input, 1307 output=output, 1308 metadata=metadata, 1309 version=version, 1310 level=level, 1311 status_message=status_message, 1312 ) 1313 1314 def _get_span_class( 1315 self, 1316 as_type: ObservationTypeLiteral, 1317 ) -> Union[ 1318 Type[LangfuseAgent], 1319 Type[LangfuseTool], 1320 Type[LangfuseChain], 1321 Type[LangfuseRetriever], 1322 Type[LangfuseEvaluator], 1323 Type[LangfuseEmbedding], 1324 Type[LangfuseGuardrail], 1325 Type[LangfuseGeneration], 1326 Type[LangfuseEvent], 1327 Type[LangfuseSpan], 1328 ]: 1329 """Get the appropriate span class based on as_type.""" 1330 normalized_type = as_type.lower() 1331 1332 if normalized_type == "agent": 1333 return LangfuseAgent 1334 elif normalized_type == "tool": 1335 return LangfuseTool 1336 elif normalized_type == "chain": 1337 return LangfuseChain 1338 elif normalized_type == "retriever": 1339 return LangfuseRetriever 1340 elif normalized_type == "evaluator": 1341 return LangfuseEvaluator 1342 elif normalized_type == "embedding": 1343 return LangfuseEmbedding 1344 elif normalized_type == "guardrail": 1345 return LangfuseGuardrail 1346 elif normalized_type == "generation": 1347 return LangfuseGeneration 1348 elif normalized_type == "event": 1349 return LangfuseEvent 1350 elif normalized_type == "span": 1351 return LangfuseSpan 1352 else: 1353 return LangfuseSpan 1354 1355 @_agnosticcontextmanager 1356 def _create_span_with_parent_context( 1357 self, 1358 *, 1359 name: str, 1360 parent: Optional[otel_trace_api.Span] = None, 1361 remote_parent_span: Optional[otel_trace_api.Span] = None, 1362 as_type: ObservationTypeLiteralNoEvent, 1363 end_on_exit: Optional[bool] = None, 1364 input: Optional[Any] = None, 1365 output: Optional[Any] = None, 1366 metadata: Optional[Any] = None, 1367 version: Optional[str] = None, 1368 level: Optional[SpanLevel] = None, 1369 status_message: Optional[str] = None, 1370 completion_start_time: Optional[datetime] = None, 1371 model: Optional[str] = None, 1372 model_parameters: Optional[Dict[str, MapValue]] = None, 1373 usage_details: Optional[Dict[str, int]] = None, 1374 cost_details: Optional[Dict[str, float]] = None, 1375 prompt: Optional[PromptClient] = None, 1376 ) -> Any: 1377 parent_span = parent or cast(otel_trace_api.Span, remote_parent_span) 1378 1379 with otel_trace_api.use_span(parent_span): 1380 with self._start_as_current_otel_span_with_processed_media( 1381 name=name, 1382 as_type=as_type, 1383 end_on_exit=end_on_exit, 1384 input=input, 1385 output=output, 1386 metadata=metadata, 1387 version=version, 1388 level=level, 1389 status_message=status_message, 1390 completion_start_time=completion_start_time, 1391 model=model, 1392 model_parameters=model_parameters, 1393 usage_details=usage_details, 1394 cost_details=cost_details, 1395 prompt=prompt, 1396 ) as langfuse_span: 1397 if remote_parent_span is not None: 1398 langfuse_span._otel_span.set_attribute( 1399 LangfuseOtelSpanAttributes.AS_ROOT, True 1400 ) 1401 1402 yield langfuse_span 1403 1404 @_agnosticcontextmanager 1405 def _start_as_current_otel_span_with_processed_media( 1406 self, 1407 *, 1408 name: str, 1409 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 1410 end_on_exit: Optional[bool] = None, 1411 input: Optional[Any] = None, 1412 output: Optional[Any] = None, 1413 metadata: Optional[Any] = None, 1414 version: Optional[str] = None, 1415 level: Optional[SpanLevel] = None, 1416 status_message: Optional[str] = None, 1417 completion_start_time: Optional[datetime] = None, 1418 model: Optional[str] = None, 1419 model_parameters: Optional[Dict[str, MapValue]] = None, 1420 usage_details: Optional[Dict[str, int]] = None, 1421 cost_details: Optional[Dict[str, float]] = None, 1422 prompt: Optional[PromptClient] = None, 1423 ) -> Any: 1424 with self._otel_tracer.start_as_current_span( 1425 name=name, 1426 end_on_exit=end_on_exit if end_on_exit is not None else True, 1427 ) as otel_span: 1428 span_class = self._get_span_class( 1429 as_type or "generation" 1430 ) # default was "generation" 1431 common_args = { 1432 "otel_span": otel_span, 1433 "langfuse_client": self, 1434 "environment": self._environment, 1435 "input": input, 1436 "output": output, 1437 "metadata": metadata, 1438 "version": version, 1439 "level": level, 1440 "status_message": status_message, 1441 } 1442 1443 if span_class in [ 1444 LangfuseGeneration, 1445 LangfuseEmbedding, 1446 ]: 1447 common_args.update( 1448 { 1449 "completion_start_time": completion_start_time, 1450 "model": model, 1451 "model_parameters": model_parameters, 1452 "usage_details": usage_details, 1453 "cost_details": cost_details, 1454 "prompt": prompt, 1455 } 1456 ) 1457 # For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed 1458 1459 yield span_class(**common_args) # type: ignore[arg-type] 1460 1461 def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]: 1462 current_span = otel_trace_api.get_current_span() 1463 1464 if current_span is otel_trace_api.INVALID_SPAN: 1465 langfuse_logger.warning( 1466 "Context error: No active span in current context. Operations that depend on an active span will be skipped. " 1467 "Ensure spans are created with start_as_current_span() or that you're operating within an active span context." 1468 ) 1469 return None 1470 1471 return current_span 1472 1473 def update_current_generation( 1474 self, 1475 *, 1476 name: Optional[str] = None, 1477 input: Optional[Any] = None, 1478 output: Optional[Any] = None, 1479 metadata: Optional[Any] = None, 1480 version: Optional[str] = None, 1481 level: Optional[SpanLevel] = None, 1482 status_message: Optional[str] = None, 1483 completion_start_time: Optional[datetime] = None, 1484 model: Optional[str] = None, 1485 model_parameters: Optional[Dict[str, MapValue]] = None, 1486 usage_details: Optional[Dict[str, int]] = None, 1487 cost_details: Optional[Dict[str, float]] = None, 1488 prompt: Optional[PromptClient] = None, 1489 ) -> None: 1490 """Update the current active generation span with new information. 1491 1492 This method updates the current generation span in the active context with 1493 additional information. It's useful for adding output, usage stats, or other 1494 details that become available during or after model generation. 1495 1496 Args: 1497 name: The generation name 1498 input: Updated input data for the model 1499 output: Output from the model (e.g., completions) 1500 metadata: Additional metadata to associate with the generation 1501 version: Version identifier for the model or component 1502 level: Importance level of the generation (info, warning, error) 1503 status_message: Optional status message for the generation 1504 completion_start_time: When the model started generating the response 1505 model: Name/identifier of the AI model used (e.g., "gpt-4") 1506 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1507 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1508 cost_details: Cost information for the model call 1509 prompt: Associated prompt template from Langfuse prompt management 1510 1511 Example: 1512 ```python 1513 with langfuse.start_as_current_generation(name="answer-query") as generation: 1514 # Initial setup and API call 1515 response = llm.generate(...) 1516 1517 # Update with results that weren't available at creation time 1518 langfuse.update_current_generation( 1519 output=response.text, 1520 usage_details={ 1521 "prompt_tokens": response.usage.prompt_tokens, 1522 "completion_tokens": response.usage.completion_tokens 1523 } 1524 ) 1525 ``` 1526 """ 1527 if not self._tracing_enabled: 1528 langfuse_logger.debug( 1529 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1530 ) 1531 return 1532 1533 current_otel_span = self._get_current_otel_span() 1534 1535 if current_otel_span is not None: 1536 generation = LangfuseGeneration( 1537 otel_span=current_otel_span, langfuse_client=self 1538 ) 1539 1540 if name: 1541 current_otel_span.update_name(name) 1542 1543 generation.update( 1544 input=input, 1545 output=output, 1546 metadata=metadata, 1547 version=version, 1548 level=level, 1549 status_message=status_message, 1550 completion_start_time=completion_start_time, 1551 model=model, 1552 model_parameters=model_parameters, 1553 usage_details=usage_details, 1554 cost_details=cost_details, 1555 prompt=prompt, 1556 ) 1557 1558 def update_current_span( 1559 self, 1560 *, 1561 name: Optional[str] = None, 1562 input: Optional[Any] = None, 1563 output: Optional[Any] = None, 1564 metadata: Optional[Any] = None, 1565 version: Optional[str] = None, 1566 level: Optional[SpanLevel] = None, 1567 status_message: Optional[str] = None, 1568 ) -> None: 1569 """Update the current active span with new information. 1570 1571 This method updates the current span in the active context with 1572 additional information. It's useful for adding outputs or metadata 1573 that become available during execution. 1574 1575 Args: 1576 name: The span name 1577 input: Updated input data for the operation 1578 output: Output data from the operation 1579 metadata: Additional metadata to associate with the span 1580 version: Version identifier for the code or component 1581 level: Importance level of the span (info, warning, error) 1582 status_message: Optional status message for the span 1583 1584 Example: 1585 ```python 1586 with langfuse.start_as_current_span(name="process-data") as span: 1587 # Initial processing 1588 result = process_first_part() 1589 1590 # Update with intermediate results 1591 langfuse.update_current_span(metadata={"intermediate_result": result}) 1592 1593 # Continue processing 1594 final_result = process_second_part(result) 1595 1596 # Final update 1597 langfuse.update_current_span(output=final_result) 1598 ``` 1599 """ 1600 if not self._tracing_enabled: 1601 langfuse_logger.debug( 1602 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1603 ) 1604 return 1605 1606 current_otel_span = self._get_current_otel_span() 1607 1608 if current_otel_span is not None: 1609 span = LangfuseSpan( 1610 otel_span=current_otel_span, 1611 langfuse_client=self, 1612 environment=self._environment, 1613 ) 1614 1615 if name: 1616 current_otel_span.update_name(name) 1617 1618 span.update( 1619 input=input, 1620 output=output, 1621 metadata=metadata, 1622 version=version, 1623 level=level, 1624 status_message=status_message, 1625 ) 1626 1627 def update_current_trace( 1628 self, 1629 *, 1630 name: Optional[str] = None, 1631 user_id: Optional[str] = None, 1632 session_id: Optional[str] = None, 1633 version: Optional[str] = None, 1634 input: Optional[Any] = None, 1635 output: Optional[Any] = None, 1636 metadata: Optional[Any] = None, 1637 tags: Optional[List[str]] = None, 1638 public: Optional[bool] = None, 1639 ) -> None: 1640 """Update the current trace with additional information. 1641 1642 This method updates the Langfuse trace that the current span belongs to. It's useful for 1643 adding trace-level metadata like user ID, session ID, or tags that apply to 1644 the entire Langfuse trace rather than just a single observation. 1645 1646 Args: 1647 name: Updated name for the Langfuse trace 1648 user_id: ID of the user who initiated the Langfuse trace 1649 session_id: Session identifier for grouping related Langfuse traces 1650 version: Version identifier for the application or service 1651 input: Input data for the overall Langfuse trace 1652 output: Output data from the overall Langfuse trace 1653 metadata: Additional metadata to associate with the Langfuse trace 1654 tags: List of tags to categorize the Langfuse trace 1655 public: Whether the Langfuse trace should be publicly accessible 1656 1657 Example: 1658 ```python 1659 with langfuse.start_as_current_span(name="handle-request") as span: 1660 # Get user information 1661 user = authenticate_user(request) 1662 1663 # Update trace with user context 1664 langfuse.update_current_trace( 1665 user_id=user.id, 1666 session_id=request.session_id, 1667 tags=["production", "web-app"] 1668 ) 1669 1670 # Continue processing 1671 response = process_request(request) 1672 1673 # Update span with results 1674 span.update(output=response) 1675 ``` 1676 """ 1677 if not self._tracing_enabled: 1678 langfuse_logger.debug( 1679 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1680 ) 1681 return 1682 1683 current_otel_span = self._get_current_otel_span() 1684 1685 if current_otel_span is not None: 1686 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1687 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1688 ) 1689 # We need to preserve the class to keep the correct observation type 1690 span_class = self._get_span_class(existing_observation_type) 1691 span = span_class( 1692 otel_span=current_otel_span, 1693 langfuse_client=self, 1694 environment=self._environment, 1695 ) 1696 1697 span.update_trace( 1698 name=name, 1699 user_id=user_id, 1700 session_id=session_id, 1701 version=version, 1702 input=input, 1703 output=output, 1704 metadata=metadata, 1705 tags=tags, 1706 public=public, 1707 ) 1708 1709 def create_event( 1710 self, 1711 *, 1712 trace_context: Optional[TraceContext] = None, 1713 name: str, 1714 input: Optional[Any] = None, 1715 output: Optional[Any] = None, 1716 metadata: Optional[Any] = None, 1717 version: Optional[str] = None, 1718 level: Optional[SpanLevel] = None, 1719 status_message: Optional[str] = None, 1720 ) -> LangfuseEvent: 1721 """Create a new Langfuse observation of type 'EVENT'. 1722 1723 The created Langfuse Event observation will be the child of the current span in the context. 1724 1725 Args: 1726 trace_context: Optional context for connecting to an existing trace 1727 name: Name of the span (e.g., function or operation name) 1728 input: Input data for the operation (can be any JSON-serializable object) 1729 output: Output data from the operation (can be any JSON-serializable object) 1730 metadata: Additional metadata to associate with the span 1731 version: Version identifier for the code or component 1732 level: Importance level of the span (info, warning, error) 1733 status_message: Optional status message for the span 1734 1735 Returns: 1736 The Langfuse Event object 1737 1738 Example: 1739 ```python 1740 event = langfuse.create_event(name="process-event") 1741 ``` 1742 """ 1743 timestamp = time_ns() 1744 1745 if trace_context: 1746 trace_id = trace_context.get("trace_id", None) 1747 parent_span_id = trace_context.get("parent_span_id", None) 1748 1749 if trace_id: 1750 remote_parent_span = self._create_remote_parent_span( 1751 trace_id=trace_id, parent_span_id=parent_span_id 1752 ) 1753 1754 with otel_trace_api.use_span( 1755 cast(otel_trace_api.Span, remote_parent_span) 1756 ): 1757 otel_span = self._otel_tracer.start_span( 1758 name=name, start_time=timestamp 1759 ) 1760 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1761 1762 return cast( 1763 LangfuseEvent, 1764 LangfuseEvent( 1765 otel_span=otel_span, 1766 langfuse_client=self, 1767 environment=self._environment, 1768 input=input, 1769 output=output, 1770 metadata=metadata, 1771 version=version, 1772 level=level, 1773 status_message=status_message, 1774 ).end(end_time=timestamp), 1775 ) 1776 1777 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1778 1779 return cast( 1780 LangfuseEvent, 1781 LangfuseEvent( 1782 otel_span=otel_span, 1783 langfuse_client=self, 1784 environment=self._environment, 1785 input=input, 1786 output=output, 1787 metadata=metadata, 1788 version=version, 1789 level=level, 1790 status_message=status_message, 1791 ).end(end_time=timestamp), 1792 ) 1793 1794 def _create_remote_parent_span( 1795 self, *, trace_id: str, parent_span_id: Optional[str] 1796 ) -> Any: 1797 if not self._is_valid_trace_id(trace_id): 1798 langfuse_logger.warning( 1799 f"Passed trace ID '{trace_id}' is not a valid 32 lowercase hex char Langfuse trace id. Ignoring trace ID." 1800 ) 1801 1802 if parent_span_id and not self._is_valid_span_id(parent_span_id): 1803 langfuse_logger.warning( 1804 f"Passed span ID '{parent_span_id}' is not a valid 16 lowercase hex char Langfuse span id. Ignoring parent span ID." 1805 ) 1806 1807 int_trace_id = int(trace_id, 16) 1808 int_parent_span_id = ( 1809 int(parent_span_id, 16) 1810 if parent_span_id 1811 else RandomIdGenerator().generate_span_id() 1812 ) 1813 1814 span_context = otel_trace_api.SpanContext( 1815 trace_id=int_trace_id, 1816 span_id=int_parent_span_id, 1817 trace_flags=otel_trace_api.TraceFlags(0x01), # mark span as sampled 1818 is_remote=False, 1819 ) 1820 1821 return trace.NonRecordingSpan(span_context) 1822 1823 def _is_valid_trace_id(self, trace_id: str) -> bool: 1824 pattern = r"^[0-9a-f]{32}$" 1825 1826 return bool(re.match(pattern, trace_id)) 1827 1828 def _is_valid_span_id(self, span_id: str) -> bool: 1829 pattern = r"^[0-9a-f]{16}$" 1830 1831 return bool(re.match(pattern, span_id)) 1832 1833 def _create_observation_id(self, *, seed: Optional[str] = None) -> str: 1834 """Create a unique observation ID for use with Langfuse. 1835 1836 This method generates a unique observation ID (span ID in OpenTelemetry terms) 1837 for use with various Langfuse APIs. It can either generate a random ID or 1838 create a deterministic ID based on a seed string. 1839 1840 Observation IDs must be 16 lowercase hexadecimal characters, representing 8 bytes. 1841 This method ensures the generated ID meets this requirement. If you need to 1842 correlate an external ID with a Langfuse observation ID, use the external ID as 1843 the seed to get a valid, deterministic observation ID. 1844 1845 Args: 1846 seed: Optional string to use as a seed for deterministic ID generation. 1847 If provided, the same seed will always produce the same ID. 1848 If not provided, a random ID will be generated. 1849 1850 Returns: 1851 A 16-character lowercase hexadecimal string representing the observation ID. 1852 1853 Example: 1854 ```python 1855 # Generate a random observation ID 1856 obs_id = langfuse.create_observation_id() 1857 1858 # Generate a deterministic ID based on a seed 1859 user_obs_id = langfuse.create_observation_id(seed="user-123-feedback") 1860 1861 # Correlate an external item ID with a Langfuse observation ID 1862 item_id = "item-789012" 1863 correlated_obs_id = langfuse.create_observation_id(seed=item_id) 1864 1865 # Use the ID with Langfuse APIs 1866 langfuse.create_score( 1867 name="relevance", 1868 value=0.95, 1869 trace_id=trace_id, 1870 observation_id=obs_id 1871 ) 1872 ``` 1873 """ 1874 if not seed: 1875 span_id_int = RandomIdGenerator().generate_span_id() 1876 1877 return self._format_otel_span_id(span_id_int) 1878 1879 return sha256(seed.encode("utf-8")).digest()[:8].hex() 1880 1881 @staticmethod 1882 def create_trace_id(*, seed: Optional[str] = None) -> str: 1883 """Create a unique trace ID for use with Langfuse. 1884 1885 This method generates a unique trace ID for use with various Langfuse APIs. 1886 It can either generate a random ID or create a deterministic ID based on 1887 a seed string. 1888 1889 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1890 This method ensures the generated ID meets this requirement. If you need to 1891 correlate an external ID with a Langfuse trace ID, use the external ID as the 1892 seed to get a valid, deterministic Langfuse trace ID. 1893 1894 Args: 1895 seed: Optional string to use as a seed for deterministic ID generation. 1896 If provided, the same seed will always produce the same ID. 1897 If not provided, a random ID will be generated. 1898 1899 Returns: 1900 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1901 1902 Example: 1903 ```python 1904 # Generate a random trace ID 1905 trace_id = langfuse.create_trace_id() 1906 1907 # Generate a deterministic ID based on a seed 1908 session_trace_id = langfuse.create_trace_id(seed="session-456") 1909 1910 # Correlate an external ID with a Langfuse trace ID 1911 external_id = "external-system-123456" 1912 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1913 1914 # Use the ID with trace context 1915 with langfuse.start_as_current_span( 1916 name="process-request", 1917 trace_context={"trace_id": trace_id} 1918 ) as span: 1919 # Operation will be part of the specific trace 1920 pass 1921 ``` 1922 """ 1923 if not seed: 1924 trace_id_int = RandomIdGenerator().generate_trace_id() 1925 1926 return Langfuse._format_otel_trace_id(trace_id_int) 1927 1928 return sha256(seed.encode("utf-8")).digest()[:16].hex() 1929 1930 def _get_otel_trace_id(self, otel_span: otel_trace_api.Span) -> str: 1931 span_context = otel_span.get_span_context() 1932 1933 return self._format_otel_trace_id(span_context.trace_id) 1934 1935 def _get_otel_span_id(self, otel_span: otel_trace_api.Span) -> str: 1936 span_context = otel_span.get_span_context() 1937 1938 return self._format_otel_span_id(span_context.span_id) 1939 1940 @staticmethod 1941 def _format_otel_span_id(span_id_int: int) -> str: 1942 """Format an integer span ID to a 16-character lowercase hex string. 1943 1944 Internal method to convert an OpenTelemetry integer span ID to the standard 1945 W3C Trace Context format (16-character lowercase hex string). 1946 1947 Args: 1948 span_id_int: 64-bit integer representing a span ID 1949 1950 Returns: 1951 A 16-character lowercase hexadecimal string 1952 """ 1953 return format(span_id_int, "016x") 1954 1955 @staticmethod 1956 def _format_otel_trace_id(trace_id_int: int) -> str: 1957 """Format an integer trace ID to a 32-character lowercase hex string. 1958 1959 Internal method to convert an OpenTelemetry integer trace ID to the standard 1960 W3C Trace Context format (32-character lowercase hex string). 1961 1962 Args: 1963 trace_id_int: 128-bit integer representing a trace ID 1964 1965 Returns: 1966 A 32-character lowercase hexadecimal string 1967 """ 1968 return format(trace_id_int, "032x") 1969 1970 @overload 1971 def create_score( 1972 self, 1973 *, 1974 name: str, 1975 value: float, 1976 session_id: Optional[str] = None, 1977 dataset_run_id: Optional[str] = None, 1978 trace_id: Optional[str] = None, 1979 observation_id: Optional[str] = None, 1980 score_id: Optional[str] = None, 1981 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 1982 comment: Optional[str] = None, 1983 config_id: Optional[str] = None, 1984 metadata: Optional[Any] = None, 1985 ) -> None: ... 1986 1987 @overload 1988 def create_score( 1989 self, 1990 *, 1991 name: str, 1992 value: str, 1993 session_id: Optional[str] = None, 1994 dataset_run_id: Optional[str] = None, 1995 trace_id: Optional[str] = None, 1996 score_id: Optional[str] = None, 1997 observation_id: Optional[str] = None, 1998 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 1999 comment: Optional[str] = None, 2000 config_id: Optional[str] = None, 2001 metadata: Optional[Any] = None, 2002 ) -> None: ... 2003 2004 def create_score( 2005 self, 2006 *, 2007 name: str, 2008 value: Union[float, str], 2009 session_id: Optional[str] = None, 2010 dataset_run_id: Optional[str] = None, 2011 trace_id: Optional[str] = None, 2012 observation_id: Optional[str] = None, 2013 score_id: Optional[str] = None, 2014 data_type: Optional[ScoreDataType] = None, 2015 comment: Optional[str] = None, 2016 config_id: Optional[str] = None, 2017 metadata: Optional[Any] = None, 2018 ) -> None: 2019 """Create a score for a specific trace or observation. 2020 2021 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2022 used to track quality metrics, user feedback, or automated evaluations. 2023 2024 Args: 2025 name: Name of the score (e.g., "relevance", "accuracy") 2026 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2027 session_id: ID of the Langfuse session to associate the score with 2028 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2029 trace_id: ID of the Langfuse trace to associate the score with 2030 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2031 score_id: Optional custom ID for the score (auto-generated if not provided) 2032 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2033 comment: Optional comment or explanation for the score 2034 config_id: Optional ID of a score config defined in Langfuse 2035 metadata: Optional metadata to be attached to the score 2036 2037 Example: 2038 ```python 2039 # Create a numeric score for accuracy 2040 langfuse.create_score( 2041 name="accuracy", 2042 value=0.92, 2043 trace_id="abcdef1234567890abcdef1234567890", 2044 data_type="NUMERIC", 2045 comment="High accuracy with minor irrelevant details" 2046 ) 2047 2048 # Create a categorical score for sentiment 2049 langfuse.create_score( 2050 name="sentiment", 2051 value="positive", 2052 trace_id="abcdef1234567890abcdef1234567890", 2053 observation_id="abcdef1234567890", 2054 data_type="CATEGORICAL" 2055 ) 2056 ``` 2057 """ 2058 if not self._tracing_enabled: 2059 return 2060 2061 score_id = score_id or self._create_observation_id() 2062 2063 try: 2064 new_body = ScoreBody( 2065 id=score_id, 2066 sessionId=session_id, 2067 datasetRunId=dataset_run_id, 2068 traceId=trace_id, 2069 observationId=observation_id, 2070 name=name, 2071 value=value, 2072 dataType=data_type, # type: ignore 2073 comment=comment, 2074 configId=config_id, 2075 environment=self._environment, 2076 metadata=metadata, 2077 ) 2078 2079 event = { 2080 "id": self.create_trace_id(), 2081 "type": "score-create", 2082 "timestamp": _get_timestamp(), 2083 "body": new_body, 2084 } 2085 2086 if self._resources is not None: 2087 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2088 force_sample = ( 2089 not self._is_valid_trace_id(trace_id) if trace_id else True 2090 ) 2091 2092 self._resources.add_score_task( 2093 event, 2094 force_sample=force_sample, 2095 ) 2096 2097 except Exception as e: 2098 langfuse_logger.exception( 2099 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2100 ) 2101 2102 @overload 2103 def score_current_span( 2104 self, 2105 *, 2106 name: str, 2107 value: float, 2108 score_id: Optional[str] = None, 2109 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2110 comment: Optional[str] = None, 2111 config_id: Optional[str] = None, 2112 ) -> None: ... 2113 2114 @overload 2115 def score_current_span( 2116 self, 2117 *, 2118 name: str, 2119 value: str, 2120 score_id: Optional[str] = None, 2121 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2122 comment: Optional[str] = None, 2123 config_id: Optional[str] = None, 2124 ) -> None: ... 2125 2126 def score_current_span( 2127 self, 2128 *, 2129 name: str, 2130 value: Union[float, str], 2131 score_id: Optional[str] = None, 2132 data_type: Optional[ScoreDataType] = None, 2133 comment: Optional[str] = None, 2134 config_id: Optional[str] = None, 2135 ) -> None: 2136 """Create a score for the current active span. 2137 2138 This method scores the currently active span in the context. It's a convenient 2139 way to score the current operation without needing to know its trace and span IDs. 2140 2141 Args: 2142 name: Name of the score (e.g., "relevance", "accuracy") 2143 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2144 score_id: Optional custom ID for the score (auto-generated if not provided) 2145 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2146 comment: Optional comment or explanation for the score 2147 config_id: Optional ID of a score config defined in Langfuse 2148 2149 Example: 2150 ```python 2151 with langfuse.start_as_current_generation(name="answer-query") as generation: 2152 # Generate answer 2153 response = generate_answer(...) 2154 generation.update(output=response) 2155 2156 # Score the generation 2157 langfuse.score_current_span( 2158 name="relevance", 2159 value=0.85, 2160 data_type="NUMERIC", 2161 comment="Mostly relevant but contains some tangential information" 2162 ) 2163 ``` 2164 """ 2165 current_span = self._get_current_otel_span() 2166 2167 if current_span is not None: 2168 trace_id = self._get_otel_trace_id(current_span) 2169 observation_id = self._get_otel_span_id(current_span) 2170 2171 langfuse_logger.info( 2172 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2173 ) 2174 2175 self.create_score( 2176 trace_id=trace_id, 2177 observation_id=observation_id, 2178 name=name, 2179 value=cast(str, value), 2180 score_id=score_id, 2181 data_type=cast(Literal["CATEGORICAL"], data_type), 2182 comment=comment, 2183 config_id=config_id, 2184 ) 2185 2186 @overload 2187 def score_current_trace( 2188 self, 2189 *, 2190 name: str, 2191 value: float, 2192 score_id: Optional[str] = None, 2193 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2194 comment: Optional[str] = None, 2195 config_id: Optional[str] = None, 2196 ) -> None: ... 2197 2198 @overload 2199 def score_current_trace( 2200 self, 2201 *, 2202 name: str, 2203 value: str, 2204 score_id: Optional[str] = None, 2205 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2206 comment: Optional[str] = None, 2207 config_id: Optional[str] = None, 2208 ) -> None: ... 2209 2210 def score_current_trace( 2211 self, 2212 *, 2213 name: str, 2214 value: Union[float, str], 2215 score_id: Optional[str] = None, 2216 data_type: Optional[ScoreDataType] = None, 2217 comment: Optional[str] = None, 2218 config_id: Optional[str] = None, 2219 ) -> None: 2220 """Create a score for the current trace. 2221 2222 This method scores the trace of the currently active span. Unlike score_current_span, 2223 this method associates the score with the entire trace rather than a specific span. 2224 It's useful for scoring overall performance or quality of the entire operation. 2225 2226 Args: 2227 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2228 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2229 score_id: Optional custom ID for the score (auto-generated if not provided) 2230 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2231 comment: Optional comment or explanation for the score 2232 config_id: Optional ID of a score config defined in Langfuse 2233 2234 Example: 2235 ```python 2236 with langfuse.start_as_current_span(name="process-user-request") as span: 2237 # Process request 2238 result = process_complete_request() 2239 span.update(output=result) 2240 2241 # Score the overall trace 2242 langfuse.score_current_trace( 2243 name="overall_quality", 2244 value=0.95, 2245 data_type="NUMERIC", 2246 comment="High quality end-to-end response" 2247 ) 2248 ``` 2249 """ 2250 current_span = self._get_current_otel_span() 2251 2252 if current_span is not None: 2253 trace_id = self._get_otel_trace_id(current_span) 2254 2255 langfuse_logger.info( 2256 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2257 ) 2258 2259 self.create_score( 2260 trace_id=trace_id, 2261 name=name, 2262 value=cast(str, value), 2263 score_id=score_id, 2264 data_type=cast(Literal["CATEGORICAL"], data_type), 2265 comment=comment, 2266 config_id=config_id, 2267 ) 2268 2269 def flush(self) -> None: 2270 """Force flush all pending spans and events to the Langfuse API. 2271 2272 This method manually flushes any pending spans, scores, and other events to the 2273 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2274 before proceeding, without waiting for the automatic flush interval. 2275 2276 Example: 2277 ```python 2278 # Record some spans and scores 2279 with langfuse.start_as_current_span(name="operation") as span: 2280 # Do work... 2281 pass 2282 2283 # Ensure all data is sent to Langfuse before proceeding 2284 langfuse.flush() 2285 2286 # Continue with other work 2287 ``` 2288 """ 2289 if self._resources is not None: 2290 self._resources.flush() 2291 2292 def shutdown(self) -> None: 2293 """Shut down the Langfuse client and flush all pending data. 2294 2295 This method cleanly shuts down the Langfuse client, ensuring all pending data 2296 is flushed to the API and all background threads are properly terminated. 2297 2298 It's important to call this method when your application is shutting down to 2299 prevent data loss and resource leaks. For most applications, using the client 2300 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2301 2302 Example: 2303 ```python 2304 # Initialize Langfuse 2305 langfuse = Langfuse(public_key="...", secret_key="...") 2306 2307 # Use Langfuse throughout your application 2308 # ... 2309 2310 # When application is shutting down 2311 langfuse.shutdown() 2312 ``` 2313 """ 2314 if self._resources is not None: 2315 self._resources.shutdown() 2316 2317 def get_current_trace_id(self) -> Optional[str]: 2318 """Get the trace ID of the current active span. 2319 2320 This method retrieves the trace ID from the currently active span in the context. 2321 It can be used to get the trace ID for referencing in logs, external systems, 2322 or for creating related operations. 2323 2324 Returns: 2325 The current trace ID as a 32-character lowercase hexadecimal string, 2326 or None if there is no active span. 2327 2328 Example: 2329 ```python 2330 with langfuse.start_as_current_span(name="process-request") as span: 2331 # Get the current trace ID for reference 2332 trace_id = langfuse.get_current_trace_id() 2333 2334 # Use it for external correlation 2335 log.info(f"Processing request with trace_id: {trace_id}") 2336 2337 # Or pass to another system 2338 external_system.process(data, trace_id=trace_id) 2339 ``` 2340 """ 2341 if not self._tracing_enabled: 2342 langfuse_logger.debug( 2343 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2344 ) 2345 return None 2346 2347 current_otel_span = self._get_current_otel_span() 2348 2349 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None 2350 2351 def get_current_observation_id(self) -> Optional[str]: 2352 """Get the observation ID (span ID) of the current active span. 2353 2354 This method retrieves the observation ID from the currently active span in the context. 2355 It can be used to get the observation ID for referencing in logs, external systems, 2356 or for creating scores or other related operations. 2357 2358 Returns: 2359 The current observation ID as a 16-character lowercase hexadecimal string, 2360 or None if there is no active span. 2361 2362 Example: 2363 ```python 2364 with langfuse.start_as_current_span(name="process-user-query") as span: 2365 # Get the current observation ID 2366 observation_id = langfuse.get_current_observation_id() 2367 2368 # Store it for later reference 2369 cache.set(f"query_{query_id}_observation", observation_id) 2370 2371 # Process the query... 2372 ``` 2373 """ 2374 if not self._tracing_enabled: 2375 langfuse_logger.debug( 2376 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2377 ) 2378 return None 2379 2380 current_otel_span = self._get_current_otel_span() 2381 2382 return self._get_otel_span_id(current_otel_span) if current_otel_span else None 2383 2384 def _get_project_id(self) -> Optional[str]: 2385 """Fetch and return the current project id. Persisted across requests. Returns None if no project id is found for api keys.""" 2386 if not self._project_id: 2387 proj = self.api.projects.get() 2388 if not proj.data or not proj.data[0].id: 2389 return None 2390 2391 self._project_id = proj.data[0].id 2392 2393 return self._project_id 2394 2395 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2396 """Get the URL to view a trace in the Langfuse UI. 2397 2398 This method generates a URL that links directly to a trace in the Langfuse UI. 2399 It's useful for providing links in logs, notifications, or debugging tools. 2400 2401 Args: 2402 trace_id: Optional trace ID to generate a URL for. If not provided, 2403 the trace ID of the current active span will be used. 2404 2405 Returns: 2406 A URL string pointing to the trace in the Langfuse UI, 2407 or None if the project ID couldn't be retrieved or no trace ID is available. 2408 2409 Example: 2410 ```python 2411 # Get URL for the current trace 2412 with langfuse.start_as_current_span(name="process-request") as span: 2413 trace_url = langfuse.get_trace_url() 2414 log.info(f"Processing trace: {trace_url}") 2415 2416 # Get URL for a specific trace 2417 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2418 send_notification(f"Review needed for trace: {specific_trace_url}") 2419 ``` 2420 """ 2421 project_id = self._get_project_id() 2422 final_trace_id = trace_id or self.get_current_trace_id() 2423 2424 return ( 2425 f"{self._base_url}/project/{project_id}/traces/{final_trace_id}" 2426 if project_id and final_trace_id 2427 else None 2428 ) 2429 2430 def get_dataset( 2431 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2432 ) -> "DatasetClient": 2433 """Fetch a dataset by its name. 2434 2435 Args: 2436 name (str): The name of the dataset to fetch. 2437 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2438 2439 Returns: 2440 DatasetClient: The dataset with the given name. 2441 """ 2442 try: 2443 langfuse_logger.debug(f"Getting datasets {name}") 2444 dataset = self.api.datasets.get(dataset_name=name) 2445 2446 dataset_items = [] 2447 page = 1 2448 2449 while True: 2450 new_items = self.api.dataset_items.list( 2451 dataset_name=self._url_encode(name, is_url_param=True), 2452 page=page, 2453 limit=fetch_items_page_size, 2454 ) 2455 dataset_items.extend(new_items.data) 2456 2457 if new_items.meta.total_pages <= page: 2458 break 2459 2460 page += 1 2461 2462 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2463 2464 return DatasetClient(dataset, items=items) 2465 2466 except Error as e: 2467 handle_fern_exception(e) 2468 raise e 2469 2470 def run_experiment( 2471 self, 2472 *, 2473 name: str, 2474 run_name: Optional[str] = None, 2475 description: Optional[str] = None, 2476 data: ExperimentData, 2477 task: TaskFunction, 2478 evaluators: List[EvaluatorFunction] = [], 2479 run_evaluators: List[RunEvaluatorFunction] = [], 2480 max_concurrency: int = 50, 2481 metadata: Optional[Dict[str, Any]] = None, 2482 ) -> ExperimentResult: 2483 """Run an experiment on a dataset with automatic tracing and evaluation. 2484 2485 This method executes a task function on each item in the provided dataset, 2486 automatically traces all executions with Langfuse for observability, runs 2487 item-level and run-level evaluators on the outputs, and returns comprehensive 2488 results with evaluation metrics. 2489 2490 The experiment system provides: 2491 - Automatic tracing of all task executions 2492 - Concurrent processing with configurable limits 2493 - Comprehensive error handling that isolates failures 2494 - Integration with Langfuse datasets for experiment tracking 2495 - Flexible evaluation framework supporting both sync and async evaluators 2496 2497 Args: 2498 name: Human-readable name for the experiment. Used for identification 2499 in the Langfuse UI. 2500 run_name: Optional exact name for the experiment run. If provided, this will be 2501 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2502 If not provided, this will default to the experiment name appended with an ISO timestamp. 2503 description: Optional description explaining the experiment's purpose, 2504 methodology, or expected outcomes. 2505 data: Array of data items to process. Can be either: 2506 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2507 - List of Langfuse DatasetItem objects from dataset.items 2508 task: Function that processes each data item and returns output. 2509 Must accept 'item' as keyword argument and can return sync or async results. 2510 The task function signature should be: task(*, item, **kwargs) -> Any 2511 evaluators: List of functions to evaluate each item's output individually. 2512 Each evaluator receives input, output, expected_output, and metadata. 2513 Can return single Evaluation dict or list of Evaluation dicts. 2514 run_evaluators: List of functions to evaluate the entire experiment run. 2515 Each run evaluator receives all item_results and can compute aggregate metrics. 2516 Useful for calculating averages, distributions, or cross-item comparisons. 2517 max_concurrency: Maximum number of concurrent task executions (default: 50). 2518 Controls the number of items processed simultaneously. Adjust based on 2519 API rate limits and system resources. 2520 metadata: Optional metadata dictionary to attach to all experiment traces. 2521 This metadata will be included in every trace created during the experiment. 2522 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2523 2524 Returns: 2525 ExperimentResult containing: 2526 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2527 - item_results: List of results for each processed item with outputs and evaluations 2528 - run_evaluations: List of aggregate evaluation results for the entire run 2529 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2530 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2531 2532 Raises: 2533 ValueError: If required parameters are missing or invalid 2534 Exception: If experiment setup fails (individual item failures are handled gracefully) 2535 2536 Examples: 2537 Basic experiment with local data: 2538 ```python 2539 def summarize_text(*, item, **kwargs): 2540 return f"Summary: {item['input'][:50]}..." 2541 2542 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2543 return { 2544 "name": "output_length", 2545 "value": len(output), 2546 "comment": f"Output contains {len(output)} characters" 2547 } 2548 2549 result = langfuse.run_experiment( 2550 name="Text Summarization Test", 2551 description="Evaluate summarization quality and length", 2552 data=[ 2553 {"input": "Long article text...", "expected_output": "Expected summary"}, 2554 {"input": "Another article...", "expected_output": "Another summary"} 2555 ], 2556 task=summarize_text, 2557 evaluators=[length_evaluator] 2558 ) 2559 2560 print(f"Processed {len(result.item_results)} items") 2561 for item_result in result.item_results: 2562 print(f"Input: {item_result.item['input']}") 2563 print(f"Output: {item_result.output}") 2564 print(f"Evaluations: {item_result.evaluations}") 2565 ``` 2566 2567 Advanced experiment with async task and multiple evaluators: 2568 ```python 2569 async def llm_task(*, item, **kwargs): 2570 # Simulate async LLM call 2571 response = await openai_client.chat.completions.create( 2572 model="gpt-4", 2573 messages=[{"role": "user", "content": item["input"]}] 2574 ) 2575 return response.choices[0].message.content 2576 2577 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2578 if expected_output and expected_output.lower() in output.lower(): 2579 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2580 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2581 2582 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2583 # Simulate toxicity check 2584 toxicity_score = check_toxicity(output) # Your toxicity checker 2585 return { 2586 "name": "toxicity", 2587 "value": toxicity_score, 2588 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2589 } 2590 2591 def average_accuracy(*, item_results, **kwargs): 2592 accuracies = [ 2593 eval.value for result in item_results 2594 for eval in result.evaluations 2595 if eval.name == "accuracy" 2596 ] 2597 return { 2598 "name": "average_accuracy", 2599 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2600 "comment": f"Average accuracy across {len(accuracies)} items" 2601 } 2602 2603 result = langfuse.run_experiment( 2604 name="LLM Safety and Accuracy Test", 2605 description="Evaluate model accuracy and safety across diverse prompts", 2606 data=test_dataset, # Your dataset items 2607 task=llm_task, 2608 evaluators=[accuracy_evaluator, toxicity_evaluator], 2609 run_evaluators=[average_accuracy], 2610 max_concurrency=5, # Limit concurrent API calls 2611 metadata={"model": "gpt-4", "temperature": 0.7} 2612 ) 2613 ``` 2614 2615 Using with Langfuse datasets: 2616 ```python 2617 # Get dataset from Langfuse 2618 dataset = langfuse.get_dataset("my-eval-dataset") 2619 2620 result = dataset.run_experiment( 2621 name="Production Model Evaluation", 2622 description="Monthly evaluation of production model performance", 2623 task=my_production_task, 2624 evaluators=[accuracy_evaluator, latency_evaluator] 2625 ) 2626 2627 # Results automatically linked to dataset in Langfuse UI 2628 print(f"View results: {result['dataset_run_url']}") 2629 ``` 2630 2631 Note: 2632 - Task and evaluator functions can be either synchronous or asynchronous 2633 - Individual item failures are logged but don't stop the experiment 2634 - All executions are automatically traced and visible in Langfuse UI 2635 - When using Langfuse datasets, results are automatically linked for easy comparison 2636 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2637 - Async execution is handled automatically with smart event loop detection 2638 """ 2639 return cast( 2640 ExperimentResult, 2641 run_async_safely( 2642 self._run_experiment_async( 2643 name=name, 2644 run_name=self._create_experiment_run_name( 2645 name=name, run_name=run_name 2646 ), 2647 description=description, 2648 data=data, 2649 task=task, 2650 evaluators=evaluators or [], 2651 run_evaluators=run_evaluators or [], 2652 max_concurrency=max_concurrency, 2653 metadata=metadata or {}, 2654 ), 2655 ), 2656 ) 2657 2658 async def _run_experiment_async( 2659 self, 2660 *, 2661 name: str, 2662 run_name: str, 2663 description: Optional[str], 2664 data: ExperimentData, 2665 task: TaskFunction, 2666 evaluators: List[EvaluatorFunction], 2667 run_evaluators: List[RunEvaluatorFunction], 2668 max_concurrency: int, 2669 metadata: Dict[str, Any], 2670 ) -> ExperimentResult: 2671 langfuse_logger.debug( 2672 f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" 2673 ) 2674 2675 # Set up concurrency control 2676 semaphore = asyncio.Semaphore(max_concurrency) 2677 2678 # Process all items 2679 async def process_item(item: ExperimentItem) -> ExperimentItemResult: 2680 async with semaphore: 2681 return await self._process_experiment_item( 2682 item, task, evaluators, name, run_name, description, metadata 2683 ) 2684 2685 # Run all items concurrently 2686 tasks = [process_item(item) for item in data] 2687 item_results = await asyncio.gather(*tasks, return_exceptions=True) 2688 2689 # Filter out any exceptions and log errors 2690 valid_results: List[ExperimentItemResult] = [] 2691 for i, result in enumerate(item_results): 2692 if isinstance(result, Exception): 2693 langfuse_logger.error(f"Item {i} failed: {result}") 2694 elif isinstance(result, ExperimentItemResult): 2695 valid_results.append(result) # type: ignore 2696 2697 # Run experiment-level evaluators 2698 run_evaluations: List[Evaluation] = [] 2699 for run_evaluator in run_evaluators: 2700 try: 2701 evaluations = await _run_evaluator( 2702 run_evaluator, item_results=valid_results 2703 ) 2704 run_evaluations.extend(evaluations) 2705 except Exception as e: 2706 langfuse_logger.error(f"Run evaluator failed: {e}") 2707 2708 # Generate dataset run URL if applicable 2709 dataset_run_id = valid_results[0].dataset_run_id if valid_results else None 2710 dataset_run_url = None 2711 if dataset_run_id and data: 2712 try: 2713 # Check if the first item has dataset_id (for DatasetItem objects) 2714 first_item = data[0] 2715 dataset_id = None 2716 2717 if hasattr(first_item, "dataset_id"): 2718 dataset_id = getattr(first_item, "dataset_id", None) 2719 2720 if dataset_id: 2721 project_id = self._get_project_id() 2722 2723 if project_id: 2724 dataset_run_url = f"{self._base_url}/project/{project_id}/datasets/{dataset_id}/runs/{dataset_run_id}" 2725 2726 except Exception: 2727 pass # URL generation is optional 2728 2729 # Store run-level evaluations as scores 2730 for evaluation in run_evaluations: 2731 try: 2732 if dataset_run_id: 2733 self.create_score( 2734 dataset_run_id=dataset_run_id, 2735 name=evaluation.name or "<unknown>", 2736 value=evaluation.value, # type: ignore 2737 comment=evaluation.comment, 2738 metadata=evaluation.metadata, 2739 data_type=evaluation.data_type, # type: ignore 2740 config_id=evaluation.config_id, 2741 ) 2742 2743 except Exception as e: 2744 langfuse_logger.error(f"Failed to store run evaluation: {e}") 2745 2746 # Flush scores and traces 2747 self.flush() 2748 2749 return ExperimentResult( 2750 name=name, 2751 run_name=run_name, 2752 description=description, 2753 item_results=valid_results, 2754 run_evaluations=run_evaluations, 2755 dataset_run_id=dataset_run_id, 2756 dataset_run_url=dataset_run_url, 2757 ) 2758 2759 async def _process_experiment_item( 2760 self, 2761 item: ExperimentItem, 2762 task: Callable, 2763 evaluators: List[Callable], 2764 experiment_name: str, 2765 experiment_run_name: str, 2766 experiment_description: Optional[str], 2767 experiment_metadata: Dict[str, Any], 2768 ) -> ExperimentItemResult: 2769 # Execute task with tracing 2770 span_name = "experiment-item-run" 2771 2772 with self.start_as_current_span(name=span_name) as span: 2773 try: 2774 output = await _run_task(task, item) 2775 2776 input_data = ( 2777 item.get("input") 2778 if isinstance(item, dict) 2779 else getattr(item, "input", None) 2780 ) 2781 2782 item_metadata: Dict[str, Any] = {} 2783 2784 if isinstance(item, dict): 2785 item_metadata = item.get("metadata", None) or {} 2786 2787 final_metadata = { 2788 "experiment_name": experiment_name, 2789 "experiment_run_name": experiment_run_name, 2790 **experiment_metadata, 2791 } 2792 2793 if ( 2794 not isinstance(item, dict) 2795 and hasattr(item, "dataset_id") 2796 and hasattr(item, "id") 2797 ): 2798 final_metadata.update( 2799 {"dataset_id": item.dataset_id, "dataset_item_id": item.id} 2800 ) 2801 2802 if isinstance(item_metadata, dict): 2803 final_metadata.update(item_metadata) 2804 2805 span.update( 2806 input=input_data, 2807 output=output, 2808 metadata=final_metadata, 2809 ) 2810 2811 # Get trace ID for linking 2812 trace_id = span.trace_id 2813 dataset_run_id = None 2814 2815 # Link to dataset run if this is a dataset item 2816 if hasattr(item, "id") and hasattr(item, "dataset_id"): 2817 try: 2818 dataset_run_item = self.api.dataset_run_items.create( 2819 request=CreateDatasetRunItemRequest( 2820 runName=experiment_run_name, 2821 runDescription=experiment_description, 2822 metadata=experiment_metadata, 2823 datasetItemId=item.id, # type: ignore 2824 traceId=trace_id, 2825 observationId=span.id, 2826 ) 2827 ) 2828 2829 dataset_run_id = dataset_run_item.dataset_run_id 2830 2831 except Exception as e: 2832 langfuse_logger.error(f"Failed to create dataset run item: {e}") 2833 2834 # Run evaluators 2835 evaluations = [] 2836 2837 for evaluator in evaluators: 2838 try: 2839 expected_output = None 2840 2841 if isinstance(item, dict): 2842 expected_output = item.get("expected_output") 2843 elif hasattr(item, "expected_output"): 2844 expected_output = item.expected_output 2845 2846 eval_metadata: Optional[Dict[str, Any]] = None 2847 2848 if isinstance(item, dict): 2849 eval_metadata = item.get("metadata") 2850 elif hasattr(item, "metadata"): 2851 eval_metadata = item.metadata 2852 2853 eval_results = await _run_evaluator( 2854 evaluator, 2855 input=input_data, 2856 output=output, 2857 expected_output=expected_output, 2858 metadata=eval_metadata, 2859 ) 2860 evaluations.extend(eval_results) 2861 2862 # Store evaluations as scores 2863 for evaluation in eval_results: 2864 self.create_score( 2865 trace_id=trace_id, 2866 name=evaluation.name, 2867 value=evaluation.value, # type: ignore 2868 comment=evaluation.comment, 2869 metadata=evaluation.metadata, 2870 config_id=evaluation.config_id, 2871 data_type=evaluation.data_type, # type: ignore 2872 ) 2873 2874 except Exception as e: 2875 langfuse_logger.error(f"Evaluator failed: {e}") 2876 2877 return ExperimentItemResult( 2878 item=item, 2879 output=output, 2880 evaluations=evaluations, 2881 trace_id=trace_id, 2882 dataset_run_id=dataset_run_id, 2883 ) 2884 2885 except Exception as e: 2886 span.update( 2887 output=f"Error: {str(e)}", level="ERROR", status_message=str(e) 2888 ) 2889 raise e 2890 2891 def _create_experiment_run_name( 2892 self, *, name: Optional[str] = None, run_name: Optional[str] = None 2893 ) -> str: 2894 if run_name: 2895 return run_name 2896 2897 iso_timestamp = _get_timestamp().isoformat().replace("+00:00", "Z") 2898 2899 return f"{name} - {iso_timestamp}" 2900 2901 def auth_check(self) -> bool: 2902 """Check if the provided credentials (public and secret key) are valid. 2903 2904 Raises: 2905 Exception: If no projects were found for the provided credentials. 2906 2907 Note: 2908 This method is blocking. It is discouraged to use it in production code. 2909 """ 2910 try: 2911 projects = self.api.projects.get() 2912 langfuse_logger.debug( 2913 f"Auth check successful, found {len(projects.data)} projects" 2914 ) 2915 if len(projects.data) == 0: 2916 raise Exception( 2917 "Auth check failed, no project found for the keys provided." 2918 ) 2919 return True 2920 2921 except AttributeError as e: 2922 langfuse_logger.warning( 2923 f"Auth check failed: Client not properly initialized. Error: {e}" 2924 ) 2925 return False 2926 2927 except Error as e: 2928 handle_fern_exception(e) 2929 raise e 2930 2931 def create_dataset( 2932 self, 2933 *, 2934 name: str, 2935 description: Optional[str] = None, 2936 metadata: Optional[Any] = None, 2937 ) -> Dataset: 2938 """Create a dataset with the given name on Langfuse. 2939 2940 Args: 2941 name: Name of the dataset to create. 2942 description: Description of the dataset. Defaults to None. 2943 metadata: Additional metadata. Defaults to None. 2944 2945 Returns: 2946 Dataset: The created dataset as returned by the Langfuse API. 2947 """ 2948 try: 2949 body = CreateDatasetRequest( 2950 name=name, description=description, metadata=metadata 2951 ) 2952 langfuse_logger.debug(f"Creating datasets {body}") 2953 2954 return self.api.datasets.create(request=body) 2955 2956 except Error as e: 2957 handle_fern_exception(e) 2958 raise e 2959 2960 def create_dataset_item( 2961 self, 2962 *, 2963 dataset_name: str, 2964 input: Optional[Any] = None, 2965 expected_output: Optional[Any] = None, 2966 metadata: Optional[Any] = None, 2967 source_trace_id: Optional[str] = None, 2968 source_observation_id: Optional[str] = None, 2969 status: Optional[DatasetStatus] = None, 2970 id: Optional[str] = None, 2971 ) -> DatasetItem: 2972 """Create a dataset item. 2973 2974 Upserts if an item with id already exists. 2975 2976 Args: 2977 dataset_name: Name of the dataset in which the dataset item should be created. 2978 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2979 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2980 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2981 source_trace_id: Id of the source trace. Defaults to None. 2982 source_observation_id: Id of the source observation. Defaults to None. 2983 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2984 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2985 2986 Returns: 2987 DatasetItem: The created dataset item as returned by the Langfuse API. 2988 2989 Example: 2990 ```python 2991 from langfuse import Langfuse 2992 2993 langfuse = Langfuse() 2994 2995 # Uploading items to the Langfuse dataset named "capital_cities" 2996 langfuse.create_dataset_item( 2997 dataset_name="capital_cities", 2998 input={"input": {"country": "Italy"}}, 2999 expected_output={"expected_output": "Rome"}, 3000 metadata={"foo": "bar"} 3001 ) 3002 ``` 3003 """ 3004 try: 3005 body = CreateDatasetItemRequest( 3006 datasetName=dataset_name, 3007 input=input, 3008 expectedOutput=expected_output, 3009 metadata=metadata, 3010 sourceTraceId=source_trace_id, 3011 sourceObservationId=source_observation_id, 3012 status=status, 3013 id=id, 3014 ) 3015 langfuse_logger.debug(f"Creating dataset item {body}") 3016 return self.api.dataset_items.create(request=body) 3017 except Error as e: 3018 handle_fern_exception(e) 3019 raise e 3020 3021 def resolve_media_references( 3022 self, 3023 *, 3024 obj: Any, 3025 resolve_with: Literal["base64_data_uri"], 3026 max_depth: int = 10, 3027 content_fetch_timeout_seconds: int = 5, 3028 ) -> Any: 3029 """Replace media reference strings in an object with base64 data URIs. 3030 3031 This method recursively traverses an object (up to max_depth) looking for media reference strings 3032 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3033 the provided Langfuse client and replaces the reference string with a base64 data URI. 3034 3035 If fetching media content fails for a reference string, a warning is logged and the reference 3036 string is left unchanged. 3037 3038 Args: 3039 obj: The object to process. Can be a primitive value, array, or nested object. 3040 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3041 resolve_with: The representation of the media content to replace the media reference string with. 3042 Currently only "base64_data_uri" is supported. 3043 max_depth: int: The maximum depth to traverse the object. Default is 10. 3044 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3045 3046 Returns: 3047 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3048 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3049 3050 Example: 3051 obj = { 3052 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3053 "nested": { 3054 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3055 } 3056 } 3057 3058 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3059 3060 # Result: 3061 # { 3062 # "image": "...", 3063 # "nested": { 3064 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3065 # } 3066 # } 3067 """ 3068 return LangfuseMedia.resolve_media_references( 3069 langfuse_client=self, 3070 obj=obj, 3071 resolve_with=resolve_with, 3072 max_depth=max_depth, 3073 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3074 ) 3075 3076 @overload 3077 def get_prompt( 3078 self, 3079 name: str, 3080 *, 3081 version: Optional[int] = None, 3082 label: Optional[str] = None, 3083 type: Literal["chat"], 3084 cache_ttl_seconds: Optional[int] = None, 3085 fallback: Optional[List[ChatMessageDict]] = None, 3086 max_retries: Optional[int] = None, 3087 fetch_timeout_seconds: Optional[int] = None, 3088 ) -> ChatPromptClient: ... 3089 3090 @overload 3091 def get_prompt( 3092 self, 3093 name: str, 3094 *, 3095 version: Optional[int] = None, 3096 label: Optional[str] = None, 3097 type: Literal["text"] = "text", 3098 cache_ttl_seconds: Optional[int] = None, 3099 fallback: Optional[str] = None, 3100 max_retries: Optional[int] = None, 3101 fetch_timeout_seconds: Optional[int] = None, 3102 ) -> TextPromptClient: ... 3103 3104 def get_prompt( 3105 self, 3106 name: str, 3107 *, 3108 version: Optional[int] = None, 3109 label: Optional[str] = None, 3110 type: Literal["chat", "text"] = "text", 3111 cache_ttl_seconds: Optional[int] = None, 3112 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3113 max_retries: Optional[int] = None, 3114 fetch_timeout_seconds: Optional[int] = None, 3115 ) -> PromptClient: 3116 """Get a prompt. 3117 3118 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3119 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3120 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3121 return the expired prompt as a fallback. 3122 3123 Args: 3124 name (str): The name of the prompt to retrieve. 3125 3126 Keyword Args: 3127 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3128 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3129 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3130 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3131 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3132 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3133 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3134 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3135 3136 Returns: 3137 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3138 - TextPromptClient, if type argument is 'text'. 3139 - ChatPromptClient, if type argument is 'chat'. 3140 3141 Raises: 3142 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3143 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3144 """ 3145 if self._resources is None: 3146 raise Error( 3147 "SDK is not correctly initialized. Check the init logs for more details." 3148 ) 3149 if version is not None and label is not None: 3150 raise ValueError("Cannot specify both version and label at the same time.") 3151 3152 if not name: 3153 raise ValueError("Prompt name cannot be empty.") 3154 3155 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3156 bounded_max_retries = self._get_bounded_max_retries( 3157 max_retries, default_max_retries=2, max_retries_upper_bound=4 3158 ) 3159 3160 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3161 cached_prompt = self._resources.prompt_cache.get(cache_key) 3162 3163 if cached_prompt is None or cache_ttl_seconds == 0: 3164 langfuse_logger.debug( 3165 f"Prompt '{cache_key}' not found in cache or caching disabled." 3166 ) 3167 try: 3168 return self._fetch_prompt_and_update_cache( 3169 name, 3170 version=version, 3171 label=label, 3172 ttl_seconds=cache_ttl_seconds, 3173 max_retries=bounded_max_retries, 3174 fetch_timeout_seconds=fetch_timeout_seconds, 3175 ) 3176 except Exception as e: 3177 if fallback: 3178 langfuse_logger.warning( 3179 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3180 ) 3181 3182 fallback_client_args: Dict[str, Any] = { 3183 "name": name, 3184 "prompt": fallback, 3185 "type": type, 3186 "version": version or 0, 3187 "config": {}, 3188 "labels": [label] if label else [], 3189 "tags": [], 3190 } 3191 3192 if type == "text": 3193 return TextPromptClient( 3194 prompt=Prompt_Text(**fallback_client_args), 3195 is_fallback=True, 3196 ) 3197 3198 if type == "chat": 3199 return ChatPromptClient( 3200 prompt=Prompt_Chat(**fallback_client_args), 3201 is_fallback=True, 3202 ) 3203 3204 raise e 3205 3206 if cached_prompt.is_expired(): 3207 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3208 try: 3209 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3210 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3211 3212 def refresh_task() -> None: 3213 self._fetch_prompt_and_update_cache( 3214 name, 3215 version=version, 3216 label=label, 3217 ttl_seconds=cache_ttl_seconds, 3218 max_retries=bounded_max_retries, 3219 fetch_timeout_seconds=fetch_timeout_seconds, 3220 ) 3221 3222 self._resources.prompt_cache.add_refresh_prompt_task( 3223 cache_key, 3224 refresh_task, 3225 ) 3226 langfuse_logger.debug( 3227 f"Returning stale prompt '{cache_key}' from cache." 3228 ) 3229 # return stale prompt 3230 return cached_prompt.value 3231 3232 except Exception as e: 3233 langfuse_logger.warning( 3234 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3235 ) 3236 # creation of refresh prompt task failed, return stale prompt 3237 return cached_prompt.value 3238 3239 return cached_prompt.value 3240 3241 def _fetch_prompt_and_update_cache( 3242 self, 3243 name: str, 3244 *, 3245 version: Optional[int] = None, 3246 label: Optional[str] = None, 3247 ttl_seconds: Optional[int] = None, 3248 max_retries: int, 3249 fetch_timeout_seconds: Optional[int], 3250 ) -> PromptClient: 3251 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3252 langfuse_logger.debug(f"Fetching prompt '{cache_key}' from server...") 3253 3254 try: 3255 3256 @backoff.on_exception( 3257 backoff.constant, Exception, max_tries=max_retries + 1, logger=None 3258 ) 3259 def fetch_prompts() -> Any: 3260 return self.api.prompts.get( 3261 self._url_encode(name), 3262 version=version, 3263 label=label, 3264 request_options={ 3265 "timeout_in_seconds": fetch_timeout_seconds, 3266 } 3267 if fetch_timeout_seconds is not None 3268 else None, 3269 ) 3270 3271 prompt_response = fetch_prompts() 3272 3273 prompt: PromptClient 3274 if prompt_response.type == "chat": 3275 prompt = ChatPromptClient(prompt_response) 3276 else: 3277 prompt = TextPromptClient(prompt_response) 3278 3279 if self._resources is not None: 3280 self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) 3281 3282 return prompt 3283 3284 except Exception as e: 3285 langfuse_logger.error( 3286 f"Error while fetching prompt '{cache_key}': {str(e)}" 3287 ) 3288 raise e 3289 3290 def _get_bounded_max_retries( 3291 self, 3292 max_retries: Optional[int], 3293 *, 3294 default_max_retries: int = 2, 3295 max_retries_upper_bound: int = 4, 3296 ) -> int: 3297 if max_retries is None: 3298 return default_max_retries 3299 3300 bounded_max_retries = min( 3301 max(max_retries, 0), 3302 max_retries_upper_bound, 3303 ) 3304 3305 return bounded_max_retries 3306 3307 @overload 3308 def create_prompt( 3309 self, 3310 *, 3311 name: str, 3312 prompt: List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]], 3313 labels: List[str] = [], 3314 tags: Optional[List[str]] = None, 3315 type: Optional[Literal["chat"]], 3316 config: Optional[Any] = None, 3317 commit_message: Optional[str] = None, 3318 ) -> ChatPromptClient: ... 3319 3320 @overload 3321 def create_prompt( 3322 self, 3323 *, 3324 name: str, 3325 prompt: str, 3326 labels: List[str] = [], 3327 tags: Optional[List[str]] = None, 3328 type: Optional[Literal["text"]] = "text", 3329 config: Optional[Any] = None, 3330 commit_message: Optional[str] = None, 3331 ) -> TextPromptClient: ... 3332 3333 def create_prompt( 3334 self, 3335 *, 3336 name: str, 3337 prompt: Union[ 3338 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3339 ], 3340 labels: List[str] = [], 3341 tags: Optional[List[str]] = None, 3342 type: Optional[Literal["chat", "text"]] = "text", 3343 config: Optional[Any] = None, 3344 commit_message: Optional[str] = None, 3345 ) -> PromptClient: 3346 """Create a new prompt in Langfuse. 3347 3348 Keyword Args: 3349 name : The name of the prompt to be created. 3350 prompt : The content of the prompt to be created. 3351 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3352 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3353 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3354 config: Additional structured data to be saved with the prompt. Defaults to None. 3355 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3356 commit_message: Optional string describing the change. 3357 3358 Returns: 3359 TextPromptClient: The prompt if type argument is 'text'. 3360 ChatPromptClient: The prompt if type argument is 'chat'. 3361 """ 3362 try: 3363 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3364 3365 if type == "chat": 3366 if not isinstance(prompt, list): 3367 raise ValueError( 3368 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3369 ) 3370 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3371 CreatePromptRequest_Chat( 3372 name=name, 3373 prompt=cast(Any, prompt), 3374 labels=labels, 3375 tags=tags, 3376 config=config or {}, 3377 commitMessage=commit_message, 3378 type="chat", 3379 ) 3380 ) 3381 server_prompt = self.api.prompts.create(request=request) 3382 3383 if self._resources is not None: 3384 self._resources.prompt_cache.invalidate(name) 3385 3386 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3387 3388 if not isinstance(prompt, str): 3389 raise ValueError("For 'text' type, 'prompt' must be a string.") 3390 3391 request = CreatePromptRequest_Text( 3392 name=name, 3393 prompt=prompt, 3394 labels=labels, 3395 tags=tags, 3396 config=config or {}, 3397 commitMessage=commit_message, 3398 type="text", 3399 ) 3400 3401 server_prompt = self.api.prompts.create(request=request) 3402 3403 if self._resources is not None: 3404 self._resources.prompt_cache.invalidate(name) 3405 3406 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3407 3408 except Error as e: 3409 handle_fern_exception(e) 3410 raise e 3411 3412 def update_prompt( 3413 self, 3414 *, 3415 name: str, 3416 version: int, 3417 new_labels: List[str] = [], 3418 ) -> Any: 3419 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3420 3421 Args: 3422 name (str): The name of the prompt to update. 3423 version (int): The version number of the prompt to update. 3424 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3425 3426 Returns: 3427 Prompt: The updated prompt from the Langfuse API. 3428 3429 """ 3430 updated_prompt = self.api.prompt_version.update( 3431 name=self._url_encode(name), 3432 version=version, 3433 new_labels=new_labels, 3434 ) 3435 3436 if self._resources is not None: 3437 self._resources.prompt_cache.invalidate(name) 3438 3439 return updated_prompt 3440 3441 def _url_encode(self, url: str, *, is_url_param: Optional[bool] = False) -> str: 3442 # httpx ≥ 0.28 does its own WHATWG-compliant quoting (eg. encodes bare 3443 # “%”, “?”, “#”, “|”, … in query/path parts). Re-quoting here would 3444 # double-encode, so we skip when the value is about to be sent straight 3445 # to httpx (`is_url_param=True`) and the installed version is ≥ 0.28. 3446 if is_url_param and Version(httpx.__version__) >= Version("0.28.0"): 3447 return url 3448 3449 # urllib.parse.quote does not escape slashes "/" by default; we need to add safe="" to force escaping 3450 # we need add safe="" to force escaping of slashes 3451 # This is necessary for prompts in prompt folders 3452 return urllib.parse.quote(url, safe="") 3453 3454 def clear_prompt_cache(self) -> None: 3455 """Clear the entire prompt cache, removing all cached prompts. 3456 3457 This method is useful when you want to force a complete refresh of all 3458 cached prompts, for example after major updates or when you need to 3459 ensure the latest versions are fetched from the server. 3460 """ 3461 if self._resources is not None: 3462 self._resources.prompt_cache.clear()
Main client for Langfuse tracing and platform features.
This class provides an interface for creating and managing traces, spans, and generations in Langfuse as well as interacting with the Langfuse API.
The client features a thread-safe singleton pattern for each unique public API key, ensuring consistent trace context propagation across your application. It implements efficient batching of spans with configurable flush settings and includes background thread management for media uploads and score ingestion.
Configuration is flexible through either direct parameters or environment variables, with graceful fallbacks and runtime configuration updates.
Attributes:
- api: Synchronous API client for Langfuse backend communication
- async_api: Asynchronous API client for Langfuse backend communication
- _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components
Arguments:
- public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable.
- secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable.
- base_url (Optional[str]): The Langfuse API base URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_BASE_URL environment variable.
- host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com".
- timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
- httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
- debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
- tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
- flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
- flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable.
- environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
- release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
- media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
- sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
- mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
- blocked_instrumentation_scopes (Optional[List[str]]):  List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (metadata.scope.name)
- additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
- tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.
Example:
from langfuse.otel import Langfuse # Initialize the client (reads from env vars if not provided) langfuse = Langfuse( public_key="your-public-key", secret_key="your-secret-key", host="https://cloud.langfuse.com", # Optional, default shown ) # Create a trace span with langfuse.start_as_current_span(name="process-query") as span: # Your application code here # Create a nested generation span for an LLM call with span.start_as_current_generation( name="generate-response", model="gpt-4", input={"query": "Tell me about AI"}, model_parameters={"temperature": 0.7, "max_tokens": 500} ) as generation: # Generate response here response = "AI is a field of computer science..." generation.update( output=response, usage_details={"prompt_tokens": 10, "completion_tokens": 50}, cost_details={"total_cost": 0.0023} ) # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) generation.score(name="relevance", value=0.95, data_type="NUMERIC")
196 def __init__( 197 self, 198 *, 199 public_key: Optional[str] = None, 200 secret_key: Optional[str] = None, 201 base_url: Optional[str] = None, 202 host: Optional[str] = None, 203 timeout: Optional[int] = None, 204 httpx_client: Optional[httpx.Client] = None, 205 debug: bool = False, 206 tracing_enabled: Optional[bool] = True, 207 flush_at: Optional[int] = None, 208 flush_interval: Optional[float] = None, 209 environment: Optional[str] = None, 210 release: Optional[str] = None, 211 media_upload_thread_count: Optional[int] = None, 212 sample_rate: Optional[float] = None, 213 mask: Optional[MaskFunction] = None, 214 blocked_instrumentation_scopes: Optional[List[str]] = None, 215 additional_headers: Optional[Dict[str, str]] = None, 216 tracer_provider: Optional[TracerProvider] = None, 217 ): 218 self._base_url = ( 219 base_url 220 or os.environ.get(LANGFUSE_BASE_URL) 221 or host 222 or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 223 ) 224 self._environment = environment or cast( 225 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 226 ) 227 self._project_id: Optional[str] = None 228 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 229 if not 0.0 <= sample_rate <= 1.0: 230 raise ValueError( 231 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 232 ) 233 234 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 235 236 self._tracing_enabled = ( 237 tracing_enabled 238 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 239 ) 240 if not self._tracing_enabled: 241 langfuse_logger.info( 242 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 243 ) 244 245 debug = ( 246 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 247 ) 248 if debug: 249 logging.basicConfig( 250 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 251 ) 252 langfuse_logger.setLevel(logging.DEBUG) 253 254 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 255 if public_key is None: 256 langfuse_logger.warning( 257 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 258 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 259 ) 260 self._otel_tracer = otel_trace_api.NoOpTracer() 261 return 262 263 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 264 if secret_key is None: 265 langfuse_logger.warning( 266 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 267 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 268 ) 269 self._otel_tracer = otel_trace_api.NoOpTracer() 270 return 271 272 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 273 langfuse_logger.warning( 274 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 275 ) 276 277 # Initialize api and tracer if requirements are met 278 self._resources = LangfuseResourceManager( 279 public_key=public_key, 280 secret_key=secret_key, 281 base_url=self._base_url, 282 timeout=timeout, 283 environment=self._environment, 284 release=release, 285 flush_at=flush_at, 286 flush_interval=flush_interval, 287 httpx_client=httpx_client, 288 media_upload_thread_count=media_upload_thread_count, 289 sample_rate=sample_rate, 290 mask=mask, 291 tracing_enabled=self._tracing_enabled, 292 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 293 additional_headers=additional_headers, 294 tracer_provider=tracer_provider, 295 ) 296 self._mask = self._resources.mask 297 298 self._otel_tracer = ( 299 self._resources.tracer 300 if self._tracing_enabled and self._resources.tracer is not None 301 else otel_trace_api.NoOpTracer() 302 ) 303 self.api = self._resources.api 304 self.async_api = self._resources.async_api
306 def start_span( 307 self, 308 *, 309 trace_context: Optional[TraceContext] = None, 310 name: str, 311 input: Optional[Any] = None, 312 output: Optional[Any] = None, 313 metadata: Optional[Any] = None, 314 version: Optional[str] = None, 315 level: Optional[SpanLevel] = None, 316 status_message: Optional[str] = None, 317 ) -> LangfuseSpan: 318 """Create a new span for tracing a unit of work. 319 320 This method creates a new span but does not set it as the current span in the 321 context. To create and use a span within a context, use start_as_current_span(). 322 323 The created span will be the child of the current span in the context. 324 325 Args: 326 trace_context: Optional context for connecting to an existing trace 327 name: Name of the span (e.g., function or operation name) 328 input: Input data for the operation (can be any JSON-serializable object) 329 output: Output data from the operation (can be any JSON-serializable object) 330 metadata: Additional metadata to associate with the span 331 version: Version identifier for the code or component 332 level: Importance level of the span (info, warning, error) 333 status_message: Optional status message for the span 334 335 Returns: 336 A LangfuseSpan object that must be ended with .end() when the operation completes 337 338 Example: 339 ```python 340 span = langfuse.start_span(name="process-data") 341 try: 342 # Do work 343 span.update(output="result") 344 finally: 345 span.end() 346 ``` 347 """ 348 return self.start_observation( 349 trace_context=trace_context, 350 name=name, 351 as_type="span", 352 input=input, 353 output=output, 354 metadata=metadata, 355 version=version, 356 level=level, 357 status_message=status_message, 358 )
Create a new span for tracing a unit of work.
This method creates a new span but does not set it as the current span in the context. To create and use a span within a context, use start_as_current_span().
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A LangfuseSpan object that must be ended with .end() when the operation completes
Example:
span = langfuse.start_span(name="process-data") try: # Do work span.update(output="result") finally: span.end()
360 def start_as_current_span( 361 self, 362 *, 363 trace_context: Optional[TraceContext] = None, 364 name: str, 365 input: Optional[Any] = None, 366 output: Optional[Any] = None, 367 metadata: Optional[Any] = None, 368 version: Optional[str] = None, 369 level: Optional[SpanLevel] = None, 370 status_message: Optional[str] = None, 371 end_on_exit: Optional[bool] = None, 372 ) -> _AgnosticContextManager[LangfuseSpan]: 373 """Create a new span and set it as the current span in a context manager. 374 375 This method creates a new span and sets it as the current span within a context 376 manager. Use this method with a 'with' statement to automatically handle span 377 lifecycle within a code block. 378 379 The created span will be the child of the current span in the context. 380 381 Args: 382 trace_context: Optional context for connecting to an existing trace 383 name: Name of the span (e.g., function or operation name) 384 input: Input data for the operation (can be any JSON-serializable object) 385 output: Output data from the operation (can be any JSON-serializable object) 386 metadata: Additional metadata to associate with the span 387 version: Version identifier for the code or component 388 level: Importance level of the span (info, warning, error) 389 status_message: Optional status message for the span 390 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 391 392 Returns: 393 A context manager that yields a LangfuseSpan 394 395 Example: 396 ```python 397 with langfuse.start_as_current_span(name="process-query") as span: 398 # Do work 399 result = process_data() 400 span.update(output=result) 401 402 # Create a child span automatically 403 with span.start_as_current_span(name="sub-operation") as child_span: 404 # Do sub-operation work 405 child_span.update(output="sub-result") 406 ``` 407 """ 408 return self.start_as_current_observation( 409 trace_context=trace_context, 410 name=name, 411 as_type="span", 412 input=input, 413 output=output, 414 metadata=metadata, 415 version=version, 416 level=level, 417 status_message=status_message, 418 end_on_exit=end_on_exit, 419 )
Create a new span and set it as the current span in a context manager.
This method creates a new span and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle span lifecycle within a code block.
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-query") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result")
568 def start_observation( 569 self, 570 *, 571 trace_context: Optional[TraceContext] = None, 572 name: str, 573 as_type: ObservationTypeLiteralNoEvent = "span", 574 input: Optional[Any] = None, 575 output: Optional[Any] = None, 576 metadata: Optional[Any] = None, 577 version: Optional[str] = None, 578 level: Optional[SpanLevel] = None, 579 status_message: Optional[str] = None, 580 completion_start_time: Optional[datetime] = None, 581 model: Optional[str] = None, 582 model_parameters: Optional[Dict[str, MapValue]] = None, 583 usage_details: Optional[Dict[str, int]] = None, 584 cost_details: Optional[Dict[str, float]] = None, 585 prompt: Optional[PromptClient] = None, 586 ) -> Union[ 587 LangfuseSpan, 588 LangfuseGeneration, 589 LangfuseAgent, 590 LangfuseTool, 591 LangfuseChain, 592 LangfuseRetriever, 593 LangfuseEvaluator, 594 LangfuseEmbedding, 595 LangfuseGuardrail, 596 ]: 597 """Create a new observation of the specified type. 598 599 This method creates a new observation but does not set it as the current span in the 600 context. To create and use an observation within a context, use start_as_current_observation(). 601 602 Args: 603 trace_context: Optional context for connecting to an existing trace 604 name: Name of the observation 605 as_type: Type of observation to create (defaults to "span") 606 input: Input data for the operation 607 output: Output data from the operation 608 metadata: Additional metadata to associate with the observation 609 version: Version identifier for the code or component 610 level: Importance level of the observation 611 status_message: Optional status message for the observation 612 completion_start_time: When the model started generating (for generation types) 613 model: Name/identifier of the AI model used (for generation types) 614 model_parameters: Parameters used for the model (for generation types) 615 usage_details: Token usage information (for generation types) 616 cost_details: Cost information (for generation types) 617 prompt: Associated prompt template (for generation types) 618 619 Returns: 620 An observation object of the appropriate type that must be ended with .end() 621 """ 622 if trace_context: 623 trace_id = trace_context.get("trace_id", None) 624 parent_span_id = trace_context.get("parent_span_id", None) 625 626 if trace_id: 627 remote_parent_span = self._create_remote_parent_span( 628 trace_id=trace_id, parent_span_id=parent_span_id 629 ) 630 631 with otel_trace_api.use_span( 632 cast(otel_trace_api.Span, remote_parent_span) 633 ): 634 otel_span = self._otel_tracer.start_span(name=name) 635 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 636 637 return self._create_observation_from_otel_span( 638 otel_span=otel_span, 639 as_type=as_type, 640 input=input, 641 output=output, 642 metadata=metadata, 643 version=version, 644 level=level, 645 status_message=status_message, 646 completion_start_time=completion_start_time, 647 model=model, 648 model_parameters=model_parameters, 649 usage_details=usage_details, 650 cost_details=cost_details, 651 prompt=prompt, 652 ) 653 654 otel_span = self._otel_tracer.start_span(name=name) 655 656 return self._create_observation_from_otel_span( 657 otel_span=otel_span, 658 as_type=as_type, 659 input=input, 660 output=output, 661 metadata=metadata, 662 version=version, 663 level=level, 664 status_message=status_message, 665 completion_start_time=completion_start_time, 666 model=model, 667 model_parameters=model_parameters, 668 usage_details=usage_details, 669 cost_details=cost_details, 670 prompt=prompt, 671 )
Create a new observation of the specified type.
This method creates a new observation but does not set it as the current span in the context. To create and use an observation within a context, use start_as_current_observation().
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation
- status_message: Optional status message for the observation
- completion_start_time: When the model started generating (for generation types)
- model: Name/identifier of the AI model used (for generation types)
- model_parameters: Parameters used for the model (for generation types)
- usage_details: Token usage information (for generation types)
- cost_details: Cost information (for generation types)
- prompt: Associated prompt template (for generation types)
Returns:
An observation object of the appropriate type that must be ended with .end()
743 def start_generation( 744 self, 745 *, 746 trace_context: Optional[TraceContext] = None, 747 name: str, 748 input: Optional[Any] = None, 749 output: Optional[Any] = None, 750 metadata: Optional[Any] = None, 751 version: Optional[str] = None, 752 level: Optional[SpanLevel] = None, 753 status_message: Optional[str] = None, 754 completion_start_time: Optional[datetime] = None, 755 model: Optional[str] = None, 756 model_parameters: Optional[Dict[str, MapValue]] = None, 757 usage_details: Optional[Dict[str, int]] = None, 758 cost_details: Optional[Dict[str, float]] = None, 759 prompt: Optional[PromptClient] = None, 760 ) -> LangfuseGeneration: 761 """Create a new generation span for model generations. 762 763 DEPRECATED: This method is deprecated and will be removed in a future version. 764 Use start_observation(as_type='generation') instead. 765 766 This method creates a specialized span for tracking model generations. 767 It includes additional fields specific to model generations such as model name, 768 token usage, and cost details. 769 770 The created generation span will be the child of the current span in the context. 771 772 Args: 773 trace_context: Optional context for connecting to an existing trace 774 name: Name of the generation operation 775 input: Input data for the model (e.g., prompts) 776 output: Output from the model (e.g., completions) 777 metadata: Additional metadata to associate with the generation 778 version: Version identifier for the model or component 779 level: Importance level of the generation (info, warning, error) 780 status_message: Optional status message for the generation 781 completion_start_time: When the model started generating the response 782 model: Name/identifier of the AI model used (e.g., "gpt-4") 783 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 784 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 785 cost_details: Cost information for the model call 786 prompt: Associated prompt template from Langfuse prompt management 787 788 Returns: 789 A LangfuseGeneration object that must be ended with .end() when complete 790 791 Example: 792 ```python 793 generation = langfuse.start_generation( 794 name="answer-generation", 795 model="gpt-4", 796 input={"prompt": "Explain quantum computing"}, 797 model_parameters={"temperature": 0.7} 798 ) 799 try: 800 # Call model API 801 response = llm.generate(...) 802 803 generation.update( 804 output=response.text, 805 usage_details={ 806 "prompt_tokens": response.usage.prompt_tokens, 807 "completion_tokens": response.usage.completion_tokens 808 } 809 ) 810 finally: 811 generation.end() 812 ``` 813 """ 814 warnings.warn( 815 "start_generation is deprecated and will be removed in a future version. " 816 "Use start_observation(as_type='generation') instead.", 817 DeprecationWarning, 818 stacklevel=2, 819 ) 820 return self.start_observation( 821 trace_context=trace_context, 822 name=name, 823 as_type="generation", 824 input=input, 825 output=output, 826 metadata=metadata, 827 version=version, 828 level=level, 829 status_message=status_message, 830 completion_start_time=completion_start_time, 831 model=model, 832 model_parameters=model_parameters, 833 usage_details=usage_details, 834 cost_details=cost_details, 835 prompt=prompt, 836 )
Create a new generation span for model generations.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a specialized span for tracking model generations. It includes additional fields specific to model generations such as model name, token usage, and cost details.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A LangfuseGeneration object that must be ended with .end() when complete
Example:
generation = langfuse.start_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"}, model_parameters={"temperature": 0.7} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end()
838 def start_as_current_generation( 839 self, 840 *, 841 trace_context: Optional[TraceContext] = None, 842 name: str, 843 input: Optional[Any] = None, 844 output: Optional[Any] = None, 845 metadata: Optional[Any] = None, 846 version: Optional[str] = None, 847 level: Optional[SpanLevel] = None, 848 status_message: Optional[str] = None, 849 completion_start_time: Optional[datetime] = None, 850 model: Optional[str] = None, 851 model_parameters: Optional[Dict[str, MapValue]] = None, 852 usage_details: Optional[Dict[str, int]] = None, 853 cost_details: Optional[Dict[str, float]] = None, 854 prompt: Optional[PromptClient] = None, 855 end_on_exit: Optional[bool] = None, 856 ) -> _AgnosticContextManager[LangfuseGeneration]: 857 """Create a new generation span and set it as the current span in a context manager. 858 859 DEPRECATED: This method is deprecated and will be removed in a future version. 860 Use start_as_current_observation(as_type='generation') instead. 861 862 This method creates a specialized span for model generations and sets it as the 863 current span within a context manager. Use this method with a 'with' statement to 864 automatically handle the generation span lifecycle within a code block. 865 866 The created generation span will be the child of the current span in the context. 867 868 Args: 869 trace_context: Optional context for connecting to an existing trace 870 name: Name of the generation operation 871 input: Input data for the model (e.g., prompts) 872 output: Output from the model (e.g., completions) 873 metadata: Additional metadata to associate with the generation 874 version: Version identifier for the model or component 875 level: Importance level of the generation (info, warning, error) 876 status_message: Optional status message for the generation 877 completion_start_time: When the model started generating the response 878 model: Name/identifier of the AI model used (e.g., "gpt-4") 879 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 880 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 881 cost_details: Cost information for the model call 882 prompt: Associated prompt template from Langfuse prompt management 883 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 884 885 Returns: 886 A context manager that yields a LangfuseGeneration 887 888 Example: 889 ```python 890 with langfuse.start_as_current_generation( 891 name="answer-generation", 892 model="gpt-4", 893 input={"prompt": "Explain quantum computing"} 894 ) as generation: 895 # Call model API 896 response = llm.generate(...) 897 898 # Update with results 899 generation.update( 900 output=response.text, 901 usage_details={ 902 "prompt_tokens": response.usage.prompt_tokens, 903 "completion_tokens": response.usage.completion_tokens 904 } 905 ) 906 ``` 907 """ 908 warnings.warn( 909 "start_as_current_generation is deprecated and will be removed in a future version. " 910 "Use start_as_current_observation(as_type='generation') instead.", 911 DeprecationWarning, 912 stacklevel=2, 913 ) 914 return self.start_as_current_observation( 915 trace_context=trace_context, 916 name=name, 917 as_type="generation", 918 input=input, 919 output=output, 920 metadata=metadata, 921 version=version, 922 level=level, 923 status_message=status_message, 924 completion_start_time=completion_start_time, 925 model=model, 926 model_parameters=model_parameters, 927 usage_details=usage_details, 928 cost_details=cost_details, 929 prompt=prompt, 930 end_on_exit=end_on_exit, 931 )
Create a new generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a specialized span for model generations and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the generation span lifecycle within a code block.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseGeneration
Example:
with langfuse.start_as_current_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"} ) as generation: # Call model API response = llm.generate(...) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1089 def start_as_current_observation( 1090 self, 1091 *, 1092 trace_context: Optional[TraceContext] = None, 1093 name: str, 1094 as_type: ObservationTypeLiteralNoEvent = "span", 1095 input: Optional[Any] = None, 1096 output: Optional[Any] = None, 1097 metadata: Optional[Any] = None, 1098 version: Optional[str] = None, 1099 level: Optional[SpanLevel] = None, 1100 status_message: Optional[str] = None, 1101 completion_start_time: Optional[datetime] = None, 1102 model: Optional[str] = None, 1103 model_parameters: Optional[Dict[str, MapValue]] = None, 1104 usage_details: Optional[Dict[str, int]] = None, 1105 cost_details: Optional[Dict[str, float]] = None, 1106 prompt: Optional[PromptClient] = None, 1107 end_on_exit: Optional[bool] = None, 1108 ) -> Union[ 1109 _AgnosticContextManager[LangfuseGeneration], 1110 _AgnosticContextManager[LangfuseSpan], 1111 _AgnosticContextManager[LangfuseAgent], 1112 _AgnosticContextManager[LangfuseTool], 1113 _AgnosticContextManager[LangfuseChain], 1114 _AgnosticContextManager[LangfuseRetriever], 1115 _AgnosticContextManager[LangfuseEvaluator], 1116 _AgnosticContextManager[LangfuseEmbedding], 1117 _AgnosticContextManager[LangfuseGuardrail], 1118 ]: 1119 """Create a new observation and set it as the current span in a context manager. 1120 1121 This method creates a new observation of the specified type and sets it as the 1122 current span within a context manager. Use this method with a 'with' statement to 1123 automatically handle the observation lifecycle within a code block. 1124 1125 The created observation will be the child of the current span in the context. 1126 1127 Args: 1128 trace_context: Optional context for connecting to an existing trace 1129 name: Name of the observation (e.g., function or operation name) 1130 as_type: Type of observation to create (defaults to "span") 1131 input: Input data for the operation (can be any JSON-serializable object) 1132 output: Output data from the operation (can be any JSON-serializable object) 1133 metadata: Additional metadata to associate with the observation 1134 version: Version identifier for the code or component 1135 level: Importance level of the observation (info, warning, error) 1136 status_message: Optional status message for the observation 1137 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1138 1139 The following parameters are available when as_type is: "generation" or "embedding". 1140 completion_start_time: When the model started generating the response 1141 model: Name/identifier of the AI model used (e.g., "gpt-4") 1142 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1143 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1144 cost_details: Cost information for the model call 1145 prompt: Associated prompt template from Langfuse prompt management 1146 1147 Returns: 1148 A context manager that yields the appropriate observation type based on as_type 1149 1150 Example: 1151 ```python 1152 # Create a span 1153 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1154 # Do work 1155 result = process_data() 1156 span.update(output=result) 1157 1158 # Create a child span automatically 1159 with span.start_as_current_span(name="sub-operation") as child_span: 1160 # Do sub-operation work 1161 child_span.update(output="sub-result") 1162 1163 # Create a tool observation 1164 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1165 # Do tool work 1166 results = search_web(query) 1167 tool.update(output=results) 1168 1169 # Create a generation observation 1170 with langfuse.start_as_current_observation( 1171 name="answer-generation", 1172 as_type="generation", 1173 model="gpt-4" 1174 ) as generation: 1175 # Generate answer 1176 response = llm.generate(...) 1177 generation.update(output=response) 1178 ``` 1179 """ 1180 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1181 if trace_context: 1182 trace_id = trace_context.get("trace_id", None) 1183 parent_span_id = trace_context.get("parent_span_id", None) 1184 1185 if trace_id: 1186 remote_parent_span = self._create_remote_parent_span( 1187 trace_id=trace_id, parent_span_id=parent_span_id 1188 ) 1189 1190 return cast( 1191 Union[ 1192 _AgnosticContextManager[LangfuseGeneration], 1193 _AgnosticContextManager[LangfuseEmbedding], 1194 ], 1195 self._create_span_with_parent_context( 1196 as_type=as_type, 1197 name=name, 1198 remote_parent_span=remote_parent_span, 1199 parent=None, 1200 end_on_exit=end_on_exit, 1201 input=input, 1202 output=output, 1203 metadata=metadata, 1204 version=version, 1205 level=level, 1206 status_message=status_message, 1207 completion_start_time=completion_start_time, 1208 model=model, 1209 model_parameters=model_parameters, 1210 usage_details=usage_details, 1211 cost_details=cost_details, 1212 prompt=prompt, 1213 ), 1214 ) 1215 1216 return cast( 1217 Union[ 1218 _AgnosticContextManager[LangfuseGeneration], 1219 _AgnosticContextManager[LangfuseEmbedding], 1220 ], 1221 self._start_as_current_otel_span_with_processed_media( 1222 as_type=as_type, 1223 name=name, 1224 end_on_exit=end_on_exit, 1225 input=input, 1226 output=output, 1227 metadata=metadata, 1228 version=version, 1229 level=level, 1230 status_message=status_message, 1231 completion_start_time=completion_start_time, 1232 model=model, 1233 model_parameters=model_parameters, 1234 usage_details=usage_details, 1235 cost_details=cost_details, 1236 prompt=prompt, 1237 ), 1238 ) 1239 1240 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1241 if trace_context: 1242 trace_id = trace_context.get("trace_id", None) 1243 parent_span_id = trace_context.get("parent_span_id", None) 1244 1245 if trace_id: 1246 remote_parent_span = self._create_remote_parent_span( 1247 trace_id=trace_id, parent_span_id=parent_span_id 1248 ) 1249 1250 return cast( 1251 Union[ 1252 _AgnosticContextManager[LangfuseSpan], 1253 _AgnosticContextManager[LangfuseAgent], 1254 _AgnosticContextManager[LangfuseTool], 1255 _AgnosticContextManager[LangfuseChain], 1256 _AgnosticContextManager[LangfuseRetriever], 1257 _AgnosticContextManager[LangfuseEvaluator], 1258 _AgnosticContextManager[LangfuseGuardrail], 1259 ], 1260 self._create_span_with_parent_context( 1261 as_type=as_type, 1262 name=name, 1263 remote_parent_span=remote_parent_span, 1264 parent=None, 1265 end_on_exit=end_on_exit, 1266 input=input, 1267 output=output, 1268 metadata=metadata, 1269 version=version, 1270 level=level, 1271 status_message=status_message, 1272 ), 1273 ) 1274 1275 return cast( 1276 Union[ 1277 _AgnosticContextManager[LangfuseSpan], 1278 _AgnosticContextManager[LangfuseAgent], 1279 _AgnosticContextManager[LangfuseTool], 1280 _AgnosticContextManager[LangfuseChain], 1281 _AgnosticContextManager[LangfuseRetriever], 1282 _AgnosticContextManager[LangfuseEvaluator], 1283 _AgnosticContextManager[LangfuseGuardrail], 1284 ], 1285 self._start_as_current_otel_span_with_processed_media( 1286 as_type=as_type, 1287 name=name, 1288 end_on_exit=end_on_exit, 1289 input=input, 1290 output=output, 1291 metadata=metadata, 1292 version=version, 1293 level=level, 1294 status_message=status_message, 1295 ), 1296 ) 1297 1298 # This should never be reached since all valid types are handled above 1299 langfuse_logger.warning( 1300 f"Unknown observation type: {as_type}, falling back to span" 1301 ) 1302 return self._start_as_current_otel_span_with_processed_media( 1303 as_type="span", 1304 name=name, 1305 end_on_exit=end_on_exit, 1306 input=input, 1307 output=output, 1308 metadata=metadata, 1309 version=version, 1310 level=level, 1311 status_message=status_message, 1312 )
Create a new observation and set it as the current span in a context manager.
This method creates a new observation of the specified type and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the observation lifecycle within a code block.
The created observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation (e.g., function or operation name)
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation (info, warning, error)
- status_message: Optional status message for the observation
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
- The following parameters are available when as_type is: "generation" or "embedding".
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields the appropriate observation type based on as_type
Example:
# Create a span with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result") # Create a tool observation with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: # Do tool work results = search_web(query) tool.update(output=results) # Create a generation observation with langfuse.start_as_current_observation( name="answer-generation", as_type="generation", model="gpt-4" ) as generation: # Generate answer response = llm.generate(...) generation.update(output=response)
1473 def update_current_generation( 1474 self, 1475 *, 1476 name: Optional[str] = None, 1477 input: Optional[Any] = None, 1478 output: Optional[Any] = None, 1479 metadata: Optional[Any] = None, 1480 version: Optional[str] = None, 1481 level: Optional[SpanLevel] = None, 1482 status_message: Optional[str] = None, 1483 completion_start_time: Optional[datetime] = None, 1484 model: Optional[str] = None, 1485 model_parameters: Optional[Dict[str, MapValue]] = None, 1486 usage_details: Optional[Dict[str, int]] = None, 1487 cost_details: Optional[Dict[str, float]] = None, 1488 prompt: Optional[PromptClient] = None, 1489 ) -> None: 1490 """Update the current active generation span with new information. 1491 1492 This method updates the current generation span in the active context with 1493 additional information. It's useful for adding output, usage stats, or other 1494 details that become available during or after model generation. 1495 1496 Args: 1497 name: The generation name 1498 input: Updated input data for the model 1499 output: Output from the model (e.g., completions) 1500 metadata: Additional metadata to associate with the generation 1501 version: Version identifier for the model or component 1502 level: Importance level of the generation (info, warning, error) 1503 status_message: Optional status message for the generation 1504 completion_start_time: When the model started generating the response 1505 model: Name/identifier of the AI model used (e.g., "gpt-4") 1506 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1507 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1508 cost_details: Cost information for the model call 1509 prompt: Associated prompt template from Langfuse prompt management 1510 1511 Example: 1512 ```python 1513 with langfuse.start_as_current_generation(name="answer-query") as generation: 1514 # Initial setup and API call 1515 response = llm.generate(...) 1516 1517 # Update with results that weren't available at creation time 1518 langfuse.update_current_generation( 1519 output=response.text, 1520 usage_details={ 1521 "prompt_tokens": response.usage.prompt_tokens, 1522 "completion_tokens": response.usage.completion_tokens 1523 } 1524 ) 1525 ``` 1526 """ 1527 if not self._tracing_enabled: 1528 langfuse_logger.debug( 1529 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1530 ) 1531 return 1532 1533 current_otel_span = self._get_current_otel_span() 1534 1535 if current_otel_span is not None: 1536 generation = LangfuseGeneration( 1537 otel_span=current_otel_span, langfuse_client=self 1538 ) 1539 1540 if name: 1541 current_otel_span.update_name(name) 1542 1543 generation.update( 1544 input=input, 1545 output=output, 1546 metadata=metadata, 1547 version=version, 1548 level=level, 1549 status_message=status_message, 1550 completion_start_time=completion_start_time, 1551 model=model, 1552 model_parameters=model_parameters, 1553 usage_details=usage_details, 1554 cost_details=cost_details, 1555 prompt=prompt, 1556 )
Update the current active generation span with new information.
This method updates the current generation span in the active context with additional information. It's useful for adding output, usage stats, or other details that become available during or after model generation.
Arguments:
- name: The generation name
- input: Updated input data for the model
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Initial setup and API call response = llm.generate(...) # Update with results that weren't available at creation time langfuse.update_current_generation( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1558 def update_current_span( 1559 self, 1560 *, 1561 name: Optional[str] = None, 1562 input: Optional[Any] = None, 1563 output: Optional[Any] = None, 1564 metadata: Optional[Any] = None, 1565 version: Optional[str] = None, 1566 level: Optional[SpanLevel] = None, 1567 status_message: Optional[str] = None, 1568 ) -> None: 1569 """Update the current active span with new information. 1570 1571 This method updates the current span in the active context with 1572 additional information. It's useful for adding outputs or metadata 1573 that become available during execution. 1574 1575 Args: 1576 name: The span name 1577 input: Updated input data for the operation 1578 output: Output data from the operation 1579 metadata: Additional metadata to associate with the span 1580 version: Version identifier for the code or component 1581 level: Importance level of the span (info, warning, error) 1582 status_message: Optional status message for the span 1583 1584 Example: 1585 ```python 1586 with langfuse.start_as_current_span(name="process-data") as span: 1587 # Initial processing 1588 result = process_first_part() 1589 1590 # Update with intermediate results 1591 langfuse.update_current_span(metadata={"intermediate_result": result}) 1592 1593 # Continue processing 1594 final_result = process_second_part(result) 1595 1596 # Final update 1597 langfuse.update_current_span(output=final_result) 1598 ``` 1599 """ 1600 if not self._tracing_enabled: 1601 langfuse_logger.debug( 1602 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1603 ) 1604 return 1605 1606 current_otel_span = self._get_current_otel_span() 1607 1608 if current_otel_span is not None: 1609 span = LangfuseSpan( 1610 otel_span=current_otel_span, 1611 langfuse_client=self, 1612 environment=self._environment, 1613 ) 1614 1615 if name: 1616 current_otel_span.update_name(name) 1617 1618 span.update( 1619 input=input, 1620 output=output, 1621 metadata=metadata, 1622 version=version, 1623 level=level, 1624 status_message=status_message, 1625 )
Update the current active span with new information.
This method updates the current span in the active context with additional information. It's useful for adding outputs or metadata that become available during execution.
Arguments:
- name: The span name
- input: Updated input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Example:
with langfuse.start_as_current_span(name="process-data") as span: # Initial processing result = process_first_part() # Update with intermediate results langfuse.update_current_span(metadata={"intermediate_result": result}) # Continue processing final_result = process_second_part(result) # Final update langfuse.update_current_span(output=final_result)
1627 def update_current_trace( 1628 self, 1629 *, 1630 name: Optional[str] = None, 1631 user_id: Optional[str] = None, 1632 session_id: Optional[str] = None, 1633 version: Optional[str] = None, 1634 input: Optional[Any] = None, 1635 output: Optional[Any] = None, 1636 metadata: Optional[Any] = None, 1637 tags: Optional[List[str]] = None, 1638 public: Optional[bool] = None, 1639 ) -> None: 1640 """Update the current trace with additional information. 1641 1642 This method updates the Langfuse trace that the current span belongs to. It's useful for 1643 adding trace-level metadata like user ID, session ID, or tags that apply to 1644 the entire Langfuse trace rather than just a single observation. 1645 1646 Args: 1647 name: Updated name for the Langfuse trace 1648 user_id: ID of the user who initiated the Langfuse trace 1649 session_id: Session identifier for grouping related Langfuse traces 1650 version: Version identifier for the application or service 1651 input: Input data for the overall Langfuse trace 1652 output: Output data from the overall Langfuse trace 1653 metadata: Additional metadata to associate with the Langfuse trace 1654 tags: List of tags to categorize the Langfuse trace 1655 public: Whether the Langfuse trace should be publicly accessible 1656 1657 Example: 1658 ```python 1659 with langfuse.start_as_current_span(name="handle-request") as span: 1660 # Get user information 1661 user = authenticate_user(request) 1662 1663 # Update trace with user context 1664 langfuse.update_current_trace( 1665 user_id=user.id, 1666 session_id=request.session_id, 1667 tags=["production", "web-app"] 1668 ) 1669 1670 # Continue processing 1671 response = process_request(request) 1672 1673 # Update span with results 1674 span.update(output=response) 1675 ``` 1676 """ 1677 if not self._tracing_enabled: 1678 langfuse_logger.debug( 1679 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1680 ) 1681 return 1682 1683 current_otel_span = self._get_current_otel_span() 1684 1685 if current_otel_span is not None: 1686 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1687 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1688 ) 1689 # We need to preserve the class to keep the correct observation type 1690 span_class = self._get_span_class(existing_observation_type) 1691 span = span_class( 1692 otel_span=current_otel_span, 1693 langfuse_client=self, 1694 environment=self._environment, 1695 ) 1696 1697 span.update_trace( 1698 name=name, 1699 user_id=user_id, 1700 session_id=session_id, 1701 version=version, 1702 input=input, 1703 output=output, 1704 metadata=metadata, 1705 tags=tags, 1706 public=public, 1707 )
Update the current trace with additional information.
This method updates the Langfuse trace that the current span belongs to. It's useful for adding trace-level metadata like user ID, session ID, or tags that apply to the entire Langfuse trace rather than just a single observation.
Arguments:
- name: Updated name for the Langfuse trace
- user_id: ID of the user who initiated the Langfuse trace
- session_id: Session identifier for grouping related Langfuse traces
- version: Version identifier for the application or service
- input: Input data for the overall Langfuse trace
- output: Output data from the overall Langfuse trace
- metadata: Additional metadata to associate with the Langfuse trace
- tags: List of tags to categorize the Langfuse trace
- public: Whether the Langfuse trace should be publicly accessible
Example:
with langfuse.start_as_current_span(name="handle-request") as span: # Get user information user = authenticate_user(request) # Update trace with user context langfuse.update_current_trace( user_id=user.id, session_id=request.session_id, tags=["production", "web-app"] ) # Continue processing response = process_request(request) # Update span with results span.update(output=response)
1709 def create_event( 1710 self, 1711 *, 1712 trace_context: Optional[TraceContext] = None, 1713 name: str, 1714 input: Optional[Any] = None, 1715 output: Optional[Any] = None, 1716 metadata: Optional[Any] = None, 1717 version: Optional[str] = None, 1718 level: Optional[SpanLevel] = None, 1719 status_message: Optional[str] = None, 1720 ) -> LangfuseEvent: 1721 """Create a new Langfuse observation of type 'EVENT'. 1722 1723 The created Langfuse Event observation will be the child of the current span in the context. 1724 1725 Args: 1726 trace_context: Optional context for connecting to an existing trace 1727 name: Name of the span (e.g., function or operation name) 1728 input: Input data for the operation (can be any JSON-serializable object) 1729 output: Output data from the operation (can be any JSON-serializable object) 1730 metadata: Additional metadata to associate with the span 1731 version: Version identifier for the code or component 1732 level: Importance level of the span (info, warning, error) 1733 status_message: Optional status message for the span 1734 1735 Returns: 1736 The Langfuse Event object 1737 1738 Example: 1739 ```python 1740 event = langfuse.create_event(name="process-event") 1741 ``` 1742 """ 1743 timestamp = time_ns() 1744 1745 if trace_context: 1746 trace_id = trace_context.get("trace_id", None) 1747 parent_span_id = trace_context.get("parent_span_id", None) 1748 1749 if trace_id: 1750 remote_parent_span = self._create_remote_parent_span( 1751 trace_id=trace_id, parent_span_id=parent_span_id 1752 ) 1753 1754 with otel_trace_api.use_span( 1755 cast(otel_trace_api.Span, remote_parent_span) 1756 ): 1757 otel_span = self._otel_tracer.start_span( 1758 name=name, start_time=timestamp 1759 ) 1760 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1761 1762 return cast( 1763 LangfuseEvent, 1764 LangfuseEvent( 1765 otel_span=otel_span, 1766 langfuse_client=self, 1767 environment=self._environment, 1768 input=input, 1769 output=output, 1770 metadata=metadata, 1771 version=version, 1772 level=level, 1773 status_message=status_message, 1774 ).end(end_time=timestamp), 1775 ) 1776 1777 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1778 1779 return cast( 1780 LangfuseEvent, 1781 LangfuseEvent( 1782 otel_span=otel_span, 1783 langfuse_client=self, 1784 environment=self._environment, 1785 input=input, 1786 output=output, 1787 metadata=metadata, 1788 version=version, 1789 level=level, 1790 status_message=status_message, 1791 ).end(end_time=timestamp), 1792 )
Create a new Langfuse observation of type 'EVENT'.
The created Langfuse Event observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The Langfuse Event object
Example:
event = langfuse.create_event(name="process-event")
1881 @staticmethod 1882 def create_trace_id(*, seed: Optional[str] = None) -> str: 1883 """Create a unique trace ID for use with Langfuse. 1884 1885 This method generates a unique trace ID for use with various Langfuse APIs. 1886 It can either generate a random ID or create a deterministic ID based on 1887 a seed string. 1888 1889 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1890 This method ensures the generated ID meets this requirement. If you need to 1891 correlate an external ID with a Langfuse trace ID, use the external ID as the 1892 seed to get a valid, deterministic Langfuse trace ID. 1893 1894 Args: 1895 seed: Optional string to use as a seed for deterministic ID generation. 1896 If provided, the same seed will always produce the same ID. 1897 If not provided, a random ID will be generated. 1898 1899 Returns: 1900 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1901 1902 Example: 1903 ```python 1904 # Generate a random trace ID 1905 trace_id = langfuse.create_trace_id() 1906 1907 # Generate a deterministic ID based on a seed 1908 session_trace_id = langfuse.create_trace_id(seed="session-456") 1909 1910 # Correlate an external ID with a Langfuse trace ID 1911 external_id = "external-system-123456" 1912 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1913 1914 # Use the ID with trace context 1915 with langfuse.start_as_current_span( 1916 name="process-request", 1917 trace_context={"trace_id": trace_id} 1918 ) as span: 1919 # Operation will be part of the specific trace 1920 pass 1921 ``` 1922 """ 1923 if not seed: 1924 trace_id_int = RandomIdGenerator().generate_trace_id() 1925 1926 return Langfuse._format_otel_trace_id(trace_id_int) 1927 1928 return sha256(seed.encode("utf-8")).digest()[:16].hex()
Create a unique trace ID for use with Langfuse.
This method generates a unique trace ID for use with various Langfuse APIs. It can either generate a random ID or create a deterministic ID based on a seed string.
Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. This method ensures the generated ID meets this requirement. If you need to correlate an external ID with a Langfuse trace ID, use the external ID as the seed to get a valid, deterministic Langfuse trace ID.
Arguments:
- seed: Optional string to use as a seed for deterministic ID generation. If provided, the same seed will always produce the same ID. If not provided, a random ID will be generated.
Returns:
A 32-character lowercase hexadecimal string representing the Langfuse trace ID.
Example:
# Generate a random trace ID trace_id = langfuse.create_trace_id() # Generate a deterministic ID based on a seed session_trace_id = langfuse.create_trace_id(seed="session-456") # Correlate an external ID with a Langfuse trace ID external_id = "external-system-123456" correlated_trace_id = langfuse.create_trace_id(seed=external_id) # Use the ID with trace context with langfuse.start_as_current_span( name="process-request", trace_context={"trace_id": trace_id} ) as span: # Operation will be part of the specific trace pass
2004 def create_score( 2005 self, 2006 *, 2007 name: str, 2008 value: Union[float, str], 2009 session_id: Optional[str] = None, 2010 dataset_run_id: Optional[str] = None, 2011 trace_id: Optional[str] = None, 2012 observation_id: Optional[str] = None, 2013 score_id: Optional[str] = None, 2014 data_type: Optional[ScoreDataType] = None, 2015 comment: Optional[str] = None, 2016 config_id: Optional[str] = None, 2017 metadata: Optional[Any] = None, 2018 ) -> None: 2019 """Create a score for a specific trace or observation. 2020 2021 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2022 used to track quality metrics, user feedback, or automated evaluations. 2023 2024 Args: 2025 name: Name of the score (e.g., "relevance", "accuracy") 2026 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2027 session_id: ID of the Langfuse session to associate the score with 2028 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2029 trace_id: ID of the Langfuse trace to associate the score with 2030 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2031 score_id: Optional custom ID for the score (auto-generated if not provided) 2032 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2033 comment: Optional comment or explanation for the score 2034 config_id: Optional ID of a score config defined in Langfuse 2035 metadata: Optional metadata to be attached to the score 2036 2037 Example: 2038 ```python 2039 # Create a numeric score for accuracy 2040 langfuse.create_score( 2041 name="accuracy", 2042 value=0.92, 2043 trace_id="abcdef1234567890abcdef1234567890", 2044 data_type="NUMERIC", 2045 comment="High accuracy with minor irrelevant details" 2046 ) 2047 2048 # Create a categorical score for sentiment 2049 langfuse.create_score( 2050 name="sentiment", 2051 value="positive", 2052 trace_id="abcdef1234567890abcdef1234567890", 2053 observation_id="abcdef1234567890", 2054 data_type="CATEGORICAL" 2055 ) 2056 ``` 2057 """ 2058 if not self._tracing_enabled: 2059 return 2060 2061 score_id = score_id or self._create_observation_id() 2062 2063 try: 2064 new_body = ScoreBody( 2065 id=score_id, 2066 sessionId=session_id, 2067 datasetRunId=dataset_run_id, 2068 traceId=trace_id, 2069 observationId=observation_id, 2070 name=name, 2071 value=value, 2072 dataType=data_type, # type: ignore 2073 comment=comment, 2074 configId=config_id, 2075 environment=self._environment, 2076 metadata=metadata, 2077 ) 2078 2079 event = { 2080 "id": self.create_trace_id(), 2081 "type": "score-create", 2082 "timestamp": _get_timestamp(), 2083 "body": new_body, 2084 } 2085 2086 if self._resources is not None: 2087 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2088 force_sample = ( 2089 not self._is_valid_trace_id(trace_id) if trace_id else True 2090 ) 2091 2092 self._resources.add_score_task( 2093 event, 2094 force_sample=force_sample, 2095 ) 2096 2097 except Exception as e: 2098 langfuse_logger.exception( 2099 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2100 )
Create a score for a specific trace or observation.
This method creates a score for evaluating a Langfuse trace or observation. Scores can be used to track quality metrics, user feedback, or automated evaluations.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- session_id: ID of the Langfuse session to associate the score with
- dataset_run_id: ID of the Langfuse dataset run to associate the score with
- trace_id: ID of the Langfuse trace to associate the score with
- observation_id: Optional ID of the specific observation to score. Trace ID must be provided too.
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
- metadata: Optional metadata to be attached to the score
Example:
# Create a numeric score for accuracy langfuse.create_score( name="accuracy", value=0.92, trace_id="abcdef1234567890abcdef1234567890", data_type="NUMERIC", comment="High accuracy with minor irrelevant details" ) # Create a categorical score for sentiment langfuse.create_score( name="sentiment", value="positive", trace_id="abcdef1234567890abcdef1234567890", observation_id="abcdef1234567890", data_type="CATEGORICAL" )
2126 def score_current_span( 2127 self, 2128 *, 2129 name: str, 2130 value: Union[float, str], 2131 score_id: Optional[str] = None, 2132 data_type: Optional[ScoreDataType] = None, 2133 comment: Optional[str] = None, 2134 config_id: Optional[str] = None, 2135 ) -> None: 2136 """Create a score for the current active span. 2137 2138 This method scores the currently active span in the context. It's a convenient 2139 way to score the current operation without needing to know its trace and span IDs. 2140 2141 Args: 2142 name: Name of the score (e.g., "relevance", "accuracy") 2143 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2144 score_id: Optional custom ID for the score (auto-generated if not provided) 2145 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2146 comment: Optional comment or explanation for the score 2147 config_id: Optional ID of a score config defined in Langfuse 2148 2149 Example: 2150 ```python 2151 with langfuse.start_as_current_generation(name="answer-query") as generation: 2152 # Generate answer 2153 response = generate_answer(...) 2154 generation.update(output=response) 2155 2156 # Score the generation 2157 langfuse.score_current_span( 2158 name="relevance", 2159 value=0.85, 2160 data_type="NUMERIC", 2161 comment="Mostly relevant but contains some tangential information" 2162 ) 2163 ``` 2164 """ 2165 current_span = self._get_current_otel_span() 2166 2167 if current_span is not None: 2168 trace_id = self._get_otel_trace_id(current_span) 2169 observation_id = self._get_otel_span_id(current_span) 2170 2171 langfuse_logger.info( 2172 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2173 ) 2174 2175 self.create_score( 2176 trace_id=trace_id, 2177 observation_id=observation_id, 2178 name=name, 2179 value=cast(str, value), 2180 score_id=score_id, 2181 data_type=cast(Literal["CATEGORICAL"], data_type), 2182 comment=comment, 2183 config_id=config_id, 2184 )
Create a score for the current active span.
This method scores the currently active span in the context. It's a convenient way to score the current operation without needing to know its trace and span IDs.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Generate answer response = generate_answer(...) generation.update(output=response) # Score the generation langfuse.score_current_span( name="relevance", value=0.85, data_type="NUMERIC", comment="Mostly relevant but contains some tangential information" )
2210 def score_current_trace( 2211 self, 2212 *, 2213 name: str, 2214 value: Union[float, str], 2215 score_id: Optional[str] = None, 2216 data_type: Optional[ScoreDataType] = None, 2217 comment: Optional[str] = None, 2218 config_id: Optional[str] = None, 2219 ) -> None: 2220 """Create a score for the current trace. 2221 2222 This method scores the trace of the currently active span. Unlike score_current_span, 2223 this method associates the score with the entire trace rather than a specific span. 2224 It's useful for scoring overall performance or quality of the entire operation. 2225 2226 Args: 2227 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2228 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2229 score_id: Optional custom ID for the score (auto-generated if not provided) 2230 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2231 comment: Optional comment or explanation for the score 2232 config_id: Optional ID of a score config defined in Langfuse 2233 2234 Example: 2235 ```python 2236 with langfuse.start_as_current_span(name="process-user-request") as span: 2237 # Process request 2238 result = process_complete_request() 2239 span.update(output=result) 2240 2241 # Score the overall trace 2242 langfuse.score_current_trace( 2243 name="overall_quality", 2244 value=0.95, 2245 data_type="NUMERIC", 2246 comment="High quality end-to-end response" 2247 ) 2248 ``` 2249 """ 2250 current_span = self._get_current_otel_span() 2251 2252 if current_span is not None: 2253 trace_id = self._get_otel_trace_id(current_span) 2254 2255 langfuse_logger.info( 2256 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2257 ) 2258 2259 self.create_score( 2260 trace_id=trace_id, 2261 name=name, 2262 value=cast(str, value), 2263 score_id=score_id, 2264 data_type=cast(Literal["CATEGORICAL"], data_type), 2265 comment=comment, 2266 config_id=config_id, 2267 )
Create a score for the current trace.
This method scores the trace of the currently active span. Unlike score_current_span, this method associates the score with the entire trace rather than a specific span. It's useful for scoring overall performance or quality of the entire operation.
Arguments:
- name: Name of the score (e.g., "user_satisfaction", "overall_quality")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_span(name="process-user-request") as span: # Process request result = process_complete_request() span.update(output=result) # Score the overall trace langfuse.score_current_trace( name="overall_quality", value=0.95, data_type="NUMERIC", comment="High quality end-to-end response" )
2269 def flush(self) -> None: 2270 """Force flush all pending spans and events to the Langfuse API. 2271 2272 This method manually flushes any pending spans, scores, and other events to the 2273 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2274 before proceeding, without waiting for the automatic flush interval. 2275 2276 Example: 2277 ```python 2278 # Record some spans and scores 2279 with langfuse.start_as_current_span(name="operation") as span: 2280 # Do work... 2281 pass 2282 2283 # Ensure all data is sent to Langfuse before proceeding 2284 langfuse.flush() 2285 2286 # Continue with other work 2287 ``` 2288 """ 2289 if self._resources is not None: 2290 self._resources.flush()
Force flush all pending spans and events to the Langfuse API.
This method manually flushes any pending spans, scores, and other events to the Langfuse API. It's useful in scenarios where you want to ensure all data is sent before proceeding, without waiting for the automatic flush interval.
Example:
# Record some spans and scores with langfuse.start_as_current_span(name="operation") as span: # Do work... pass # Ensure all data is sent to Langfuse before proceeding langfuse.flush() # Continue with other work
2292 def shutdown(self) -> None: 2293 """Shut down the Langfuse client and flush all pending data. 2294 2295 This method cleanly shuts down the Langfuse client, ensuring all pending data 2296 is flushed to the API and all background threads are properly terminated. 2297 2298 It's important to call this method when your application is shutting down to 2299 prevent data loss and resource leaks. For most applications, using the client 2300 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2301 2302 Example: 2303 ```python 2304 # Initialize Langfuse 2305 langfuse = Langfuse(public_key="...", secret_key="...") 2306 2307 # Use Langfuse throughout your application 2308 # ... 2309 2310 # When application is shutting down 2311 langfuse.shutdown() 2312 ``` 2313 """ 2314 if self._resources is not None: 2315 self._resources.shutdown()
Shut down the Langfuse client and flush all pending data.
This method cleanly shuts down the Langfuse client, ensuring all pending data is flushed to the API and all background threads are properly terminated.
It's important to call this method when your application is shutting down to prevent data loss and resource leaks. For most applications, using the client as a context manager or relying on the automatic shutdown via atexit is sufficient.
Example:
# Initialize Langfuse langfuse = Langfuse(public_key="...", secret_key="...") # Use Langfuse throughout your application # ... # When application is shutting down langfuse.shutdown()
2317 def get_current_trace_id(self) -> Optional[str]: 2318 """Get the trace ID of the current active span. 2319 2320 This method retrieves the trace ID from the currently active span in the context. 2321 It can be used to get the trace ID for referencing in logs, external systems, 2322 or for creating related operations. 2323 2324 Returns: 2325 The current trace ID as a 32-character lowercase hexadecimal string, 2326 or None if there is no active span. 2327 2328 Example: 2329 ```python 2330 with langfuse.start_as_current_span(name="process-request") as span: 2331 # Get the current trace ID for reference 2332 trace_id = langfuse.get_current_trace_id() 2333 2334 # Use it for external correlation 2335 log.info(f"Processing request with trace_id: {trace_id}") 2336 2337 # Or pass to another system 2338 external_system.process(data, trace_id=trace_id) 2339 ``` 2340 """ 2341 if not self._tracing_enabled: 2342 langfuse_logger.debug( 2343 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2344 ) 2345 return None 2346 2347 current_otel_span = self._get_current_otel_span() 2348 2349 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None
Get the trace ID of the current active span.
This method retrieves the trace ID from the currently active span in the context. It can be used to get the trace ID for referencing in logs, external systems, or for creating related operations.
Returns:
The current trace ID as a 32-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Get the current trace ID for reference trace_id = langfuse.get_current_trace_id() # Use it for external correlation log.info(f"Processing request with trace_id: {trace_id}") # Or pass to another system external_system.process(data, trace_id=trace_id)
2351 def get_current_observation_id(self) -> Optional[str]: 2352 """Get the observation ID (span ID) of the current active span. 2353 2354 This method retrieves the observation ID from the currently active span in the context. 2355 It can be used to get the observation ID for referencing in logs, external systems, 2356 or for creating scores or other related operations. 2357 2358 Returns: 2359 The current observation ID as a 16-character lowercase hexadecimal string, 2360 or None if there is no active span. 2361 2362 Example: 2363 ```python 2364 with langfuse.start_as_current_span(name="process-user-query") as span: 2365 # Get the current observation ID 2366 observation_id = langfuse.get_current_observation_id() 2367 2368 # Store it for later reference 2369 cache.set(f"query_{query_id}_observation", observation_id) 2370 2371 # Process the query... 2372 ``` 2373 """ 2374 if not self._tracing_enabled: 2375 langfuse_logger.debug( 2376 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2377 ) 2378 return None 2379 2380 current_otel_span = self._get_current_otel_span() 2381 2382 return self._get_otel_span_id(current_otel_span) if current_otel_span else None
Get the observation ID (span ID) of the current active span.
This method retrieves the observation ID from the currently active span in the context. It can be used to get the observation ID for referencing in logs, external systems, or for creating scores or other related operations.
Returns:
The current observation ID as a 16-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-user-query") as span: # Get the current observation ID observation_id = langfuse.get_current_observation_id() # Store it for later reference cache.set(f"query_{query_id}_observation", observation_id) # Process the query...
2395 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2396 """Get the URL to view a trace in the Langfuse UI. 2397 2398 This method generates a URL that links directly to a trace in the Langfuse UI. 2399 It's useful for providing links in logs, notifications, or debugging tools. 2400 2401 Args: 2402 trace_id: Optional trace ID to generate a URL for. If not provided, 2403 the trace ID of the current active span will be used. 2404 2405 Returns: 2406 A URL string pointing to the trace in the Langfuse UI, 2407 or None if the project ID couldn't be retrieved or no trace ID is available. 2408 2409 Example: 2410 ```python 2411 # Get URL for the current trace 2412 with langfuse.start_as_current_span(name="process-request") as span: 2413 trace_url = langfuse.get_trace_url() 2414 log.info(f"Processing trace: {trace_url}") 2415 2416 # Get URL for a specific trace 2417 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2418 send_notification(f"Review needed for trace: {specific_trace_url}") 2419 ``` 2420 """ 2421 project_id = self._get_project_id() 2422 final_trace_id = trace_id or self.get_current_trace_id() 2423 2424 return ( 2425 f"{self._base_url}/project/{project_id}/traces/{final_trace_id}" 2426 if project_id and final_trace_id 2427 else None 2428 )
Get the URL to view a trace in the Langfuse UI.
This method generates a URL that links directly to a trace in the Langfuse UI. It's useful for providing links in logs, notifications, or debugging tools.
Arguments:
- trace_id: Optional trace ID to generate a URL for. If not provided, the trace ID of the current active span will be used.
Returns:
A URL string pointing to the trace in the Langfuse UI, or None if the project ID couldn't be retrieved or no trace ID is available.
Example:
# Get URL for the current trace with langfuse.start_as_current_span(name="process-request") as span: trace_url = langfuse.get_trace_url() log.info(f"Processing trace: {trace_url}") # Get URL for a specific trace specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") send_notification(f"Review needed for trace: {specific_trace_url}")
2430 def get_dataset( 2431 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2432 ) -> "DatasetClient": 2433 """Fetch a dataset by its name. 2434 2435 Args: 2436 name (str): The name of the dataset to fetch. 2437 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2438 2439 Returns: 2440 DatasetClient: The dataset with the given name. 2441 """ 2442 try: 2443 langfuse_logger.debug(f"Getting datasets {name}") 2444 dataset = self.api.datasets.get(dataset_name=name) 2445 2446 dataset_items = [] 2447 page = 1 2448 2449 while True: 2450 new_items = self.api.dataset_items.list( 2451 dataset_name=self._url_encode(name, is_url_param=True), 2452 page=page, 2453 limit=fetch_items_page_size, 2454 ) 2455 dataset_items.extend(new_items.data) 2456 2457 if new_items.meta.total_pages <= page: 2458 break 2459 2460 page += 1 2461 2462 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2463 2464 return DatasetClient(dataset, items=items) 2465 2466 except Error as e: 2467 handle_fern_exception(e) 2468 raise e
Fetch a dataset by its name.
Arguments:
- name (str): The name of the dataset to fetch.
- fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
Returns:
DatasetClient: The dataset with the given name.
2470 def run_experiment( 2471 self, 2472 *, 2473 name: str, 2474 run_name: Optional[str] = None, 2475 description: Optional[str] = None, 2476 data: ExperimentData, 2477 task: TaskFunction, 2478 evaluators: List[EvaluatorFunction] = [], 2479 run_evaluators: List[RunEvaluatorFunction] = [], 2480 max_concurrency: int = 50, 2481 metadata: Optional[Dict[str, Any]] = None, 2482 ) -> ExperimentResult: 2483 """Run an experiment on a dataset with automatic tracing and evaluation. 2484 2485 This method executes a task function on each item in the provided dataset, 2486 automatically traces all executions with Langfuse for observability, runs 2487 item-level and run-level evaluators on the outputs, and returns comprehensive 2488 results with evaluation metrics. 2489 2490 The experiment system provides: 2491 - Automatic tracing of all task executions 2492 - Concurrent processing with configurable limits 2493 - Comprehensive error handling that isolates failures 2494 - Integration with Langfuse datasets for experiment tracking 2495 - Flexible evaluation framework supporting both sync and async evaluators 2496 2497 Args: 2498 name: Human-readable name for the experiment. Used for identification 2499 in the Langfuse UI. 2500 run_name: Optional exact name for the experiment run. If provided, this will be 2501 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2502 If not provided, this will default to the experiment name appended with an ISO timestamp. 2503 description: Optional description explaining the experiment's purpose, 2504 methodology, or expected outcomes. 2505 data: Array of data items to process. Can be either: 2506 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2507 - List of Langfuse DatasetItem objects from dataset.items 2508 task: Function that processes each data item and returns output. 2509 Must accept 'item' as keyword argument and can return sync or async results. 2510 The task function signature should be: task(*, item, **kwargs) -> Any 2511 evaluators: List of functions to evaluate each item's output individually. 2512 Each evaluator receives input, output, expected_output, and metadata. 2513 Can return single Evaluation dict or list of Evaluation dicts. 2514 run_evaluators: List of functions to evaluate the entire experiment run. 2515 Each run evaluator receives all item_results and can compute aggregate metrics. 2516 Useful for calculating averages, distributions, or cross-item comparisons. 2517 max_concurrency: Maximum number of concurrent task executions (default: 50). 2518 Controls the number of items processed simultaneously. Adjust based on 2519 API rate limits and system resources. 2520 metadata: Optional metadata dictionary to attach to all experiment traces. 2521 This metadata will be included in every trace created during the experiment. 2522 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2523 2524 Returns: 2525 ExperimentResult containing: 2526 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2527 - item_results: List of results for each processed item with outputs and evaluations 2528 - run_evaluations: List of aggregate evaluation results for the entire run 2529 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2530 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2531 2532 Raises: 2533 ValueError: If required parameters are missing or invalid 2534 Exception: If experiment setup fails (individual item failures are handled gracefully) 2535 2536 Examples: 2537 Basic experiment with local data: 2538 ```python 2539 def summarize_text(*, item, **kwargs): 2540 return f"Summary: {item['input'][:50]}..." 2541 2542 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2543 return { 2544 "name": "output_length", 2545 "value": len(output), 2546 "comment": f"Output contains {len(output)} characters" 2547 } 2548 2549 result = langfuse.run_experiment( 2550 name="Text Summarization Test", 2551 description="Evaluate summarization quality and length", 2552 data=[ 2553 {"input": "Long article text...", "expected_output": "Expected summary"}, 2554 {"input": "Another article...", "expected_output": "Another summary"} 2555 ], 2556 task=summarize_text, 2557 evaluators=[length_evaluator] 2558 ) 2559 2560 print(f"Processed {len(result.item_results)} items") 2561 for item_result in result.item_results: 2562 print(f"Input: {item_result.item['input']}") 2563 print(f"Output: {item_result.output}") 2564 print(f"Evaluations: {item_result.evaluations}") 2565 ``` 2566 2567 Advanced experiment with async task and multiple evaluators: 2568 ```python 2569 async def llm_task(*, item, **kwargs): 2570 # Simulate async LLM call 2571 response = await openai_client.chat.completions.create( 2572 model="gpt-4", 2573 messages=[{"role": "user", "content": item["input"]}] 2574 ) 2575 return response.choices[0].message.content 2576 2577 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2578 if expected_output and expected_output.lower() in output.lower(): 2579 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2580 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2581 2582 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2583 # Simulate toxicity check 2584 toxicity_score = check_toxicity(output) # Your toxicity checker 2585 return { 2586 "name": "toxicity", 2587 "value": toxicity_score, 2588 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2589 } 2590 2591 def average_accuracy(*, item_results, **kwargs): 2592 accuracies = [ 2593 eval.value for result in item_results 2594 for eval in result.evaluations 2595 if eval.name == "accuracy" 2596 ] 2597 return { 2598 "name": "average_accuracy", 2599 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2600 "comment": f"Average accuracy across {len(accuracies)} items" 2601 } 2602 2603 result = langfuse.run_experiment( 2604 name="LLM Safety and Accuracy Test", 2605 description="Evaluate model accuracy and safety across diverse prompts", 2606 data=test_dataset, # Your dataset items 2607 task=llm_task, 2608 evaluators=[accuracy_evaluator, toxicity_evaluator], 2609 run_evaluators=[average_accuracy], 2610 max_concurrency=5, # Limit concurrent API calls 2611 metadata={"model": "gpt-4", "temperature": 0.7} 2612 ) 2613 ``` 2614 2615 Using with Langfuse datasets: 2616 ```python 2617 # Get dataset from Langfuse 2618 dataset = langfuse.get_dataset("my-eval-dataset") 2619 2620 result = dataset.run_experiment( 2621 name="Production Model Evaluation", 2622 description="Monthly evaluation of production model performance", 2623 task=my_production_task, 2624 evaluators=[accuracy_evaluator, latency_evaluator] 2625 ) 2626 2627 # Results automatically linked to dataset in Langfuse UI 2628 print(f"View results: {result['dataset_run_url']}") 2629 ``` 2630 2631 Note: 2632 - Task and evaluator functions can be either synchronous or asynchronous 2633 - Individual item failures are logged but don't stop the experiment 2634 - All executions are automatically traced and visible in Langfuse UI 2635 - When using Langfuse datasets, results are automatically linked for easy comparison 2636 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2637 - Async execution is handled automatically with smart event loop detection 2638 """ 2639 return cast( 2640 ExperimentResult, 2641 run_async_safely( 2642 self._run_experiment_async( 2643 name=name, 2644 run_name=self._create_experiment_run_name( 2645 name=name, run_name=run_name 2646 ), 2647 description=description, 2648 data=data, 2649 task=task, 2650 evaluators=evaluators or [], 2651 run_evaluators=run_evaluators or [], 2652 max_concurrency=max_concurrency, 2653 metadata=metadata or {}, 2654 ), 2655 ), 2656 )
Run an experiment on a dataset with automatic tracing and evaluation.
This method executes a task function on each item in the provided dataset, automatically traces all executions with Langfuse for observability, runs item-level and run-level evaluators on the outputs, and returns comprehensive results with evaluation metrics.
The experiment system provides:
- Automatic tracing of all task executions
- Concurrent processing with configurable limits
- Comprehensive error handling that isolates failures
- Integration with Langfuse datasets for experiment tracking
- Flexible evaluation framework supporting both sync and async evaluators
Arguments:
- name: Human-readable name for the experiment. Used for identification in the Langfuse UI.
- run_name:  Optional exact name for the experiment run. If provided, this will be
used as the exact dataset run name if the datacontains Langfuse dataset items. If not provided, this will default to the experiment name appended with an ISO timestamp.
- description: Optional description explaining the experiment's purpose, methodology, or expected outcomes.
- data:  Array of data items to process. Can be either:
- List of dict-like items with 'input', 'expected_output', 'metadata' keys
- List of Langfuse DatasetItem objects from dataset.items
 
- task: Function that processes each data item and returns output. Must accept 'item' as keyword argument and can return sync or async results. The task function signature should be: task(, item, *kwargs) -> Any
- evaluators: List of functions to evaluate each item's output individually. Each evaluator receives input, output, expected_output, and metadata. Can return single Evaluation dict or list of Evaluation dicts.
- run_evaluators: List of functions to evaluate the entire experiment run. Each run evaluator receives all item_results and can compute aggregate metrics. Useful for calculating averages, distributions, or cross-item comparisons.
- max_concurrency: Maximum number of concurrent task executions (default: 50). Controls the number of items processed simultaneously. Adjust based on API rate limits and system resources.
- metadata:  Optional metadata dictionary to attach to all experiment traces.
This metadata will be included in every trace created during the experiment.
If dataare Langfuse dataset items, the metadata will be attached to the dataset run, too.
Returns:
ExperimentResult containing:
- run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset.
- item_results: List of results for each processed item with outputs and evaluations
- run_evaluations: List of aggregate evaluation results for the entire run
- dataset_run_id: ID of the dataset run (if using Langfuse datasets)
- dataset_run_url: Direct URL to view results in Langfuse UI (if applicable)
Raises:
- ValueError: If required parameters are missing or invalid
- Exception: If experiment setup fails (individual item failures are handled gracefully)
Examples:
Basic experiment with local data:
def summarize_text(*, item, **kwargs): return f"Summary: {item['input'][:50]}..." def length_evaluator(*, input, output, expected_output=None, **kwargs): return { "name": "output_length", "value": len(output), "comment": f"Output contains {len(output)} characters" } result = langfuse.run_experiment( name="Text Summarization Test", description="Evaluate summarization quality and length", data=[ {"input": "Long article text...", "expected_output": "Expected summary"}, {"input": "Another article...", "expected_output": "Another summary"} ], task=summarize_text, evaluators=[length_evaluator] ) print(f"Processed {len(result.item_results)} items") for item_result in result.item_results: print(f"Input: {item_result.item['input']}") print(f"Output: {item_result.output}") print(f"Evaluations: {item_result.evaluations}")Advanced experiment with async task and multiple evaluators:
async def llm_task(*, item, **kwargs): # Simulate async LLM call response = await openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": item["input"]}] ) return response.choices[0].message.content def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if expected_output and expected_output.lower() in output.lower(): return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): # Simulate toxicity check toxicity_score = check_toxicity(output) # Your toxicity checker return { "name": "toxicity", "value": toxicity_score, "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" } def average_accuracy(*, item_results, **kwargs): accuracies = [ eval.value for result in item_results for eval in result.evaluations if eval.name == "accuracy" ] return { "name": "average_accuracy", "value": sum(accuracies) / len(accuracies) if accuracies else 0, "comment": f"Average accuracy across {len(accuracies)} items" } result = langfuse.run_experiment( name="LLM Safety and Accuracy Test", description="Evaluate model accuracy and safety across diverse prompts", data=test_dataset, # Your dataset items task=llm_task, evaluators=[accuracy_evaluator, toxicity_evaluator], run_evaluators=[average_accuracy], max_concurrency=5, # Limit concurrent API calls metadata={"model": "gpt-4", "temperature": 0.7} )Using with Langfuse datasets:
# Get dataset from Langfuse dataset = langfuse.get_dataset("my-eval-dataset") result = dataset.run_experiment( name="Production Model Evaluation", description="Monthly evaluation of production model performance", task=my_production_task, evaluators=[accuracy_evaluator, latency_evaluator] ) # Results automatically linked to dataset in Langfuse UI print(f"View results: {result['dataset_run_url']}")
Note:
- Task and evaluator functions can be either synchronous or asynchronous
- Individual item failures are logged but don't stop the experiment
- All executions are automatically traced and visible in Langfuse UI
- When using Langfuse datasets, results are automatically linked for easy comparison
- This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
- Async execution is handled automatically with smart event loop detection
2901 def auth_check(self) -> bool: 2902 """Check if the provided credentials (public and secret key) are valid. 2903 2904 Raises: 2905 Exception: If no projects were found for the provided credentials. 2906 2907 Note: 2908 This method is blocking. It is discouraged to use it in production code. 2909 """ 2910 try: 2911 projects = self.api.projects.get() 2912 langfuse_logger.debug( 2913 f"Auth check successful, found {len(projects.data)} projects" 2914 ) 2915 if len(projects.data) == 0: 2916 raise Exception( 2917 "Auth check failed, no project found for the keys provided." 2918 ) 2919 return True 2920 2921 except AttributeError as e: 2922 langfuse_logger.warning( 2923 f"Auth check failed: Client not properly initialized. Error: {e}" 2924 ) 2925 return False 2926 2927 except Error as e: 2928 handle_fern_exception(e) 2929 raise e
Check if the provided credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
2931 def create_dataset( 2932 self, 2933 *, 2934 name: str, 2935 description: Optional[str] = None, 2936 metadata: Optional[Any] = None, 2937 ) -> Dataset: 2938 """Create a dataset with the given name on Langfuse. 2939 2940 Args: 2941 name: Name of the dataset to create. 2942 description: Description of the dataset. Defaults to None. 2943 metadata: Additional metadata. Defaults to None. 2944 2945 Returns: 2946 Dataset: The created dataset as returned by the Langfuse API. 2947 """ 2948 try: 2949 body = CreateDatasetRequest( 2950 name=name, description=description, metadata=metadata 2951 ) 2952 langfuse_logger.debug(f"Creating datasets {body}") 2953 2954 return self.api.datasets.create(request=body) 2955 2956 except Error as e: 2957 handle_fern_exception(e) 2958 raise e
Create a dataset with the given name on Langfuse.
Arguments:
- name: Name of the dataset to create.
- description: Description of the dataset. Defaults to None.
- metadata: Additional metadata. Defaults to None.
Returns:
Dataset: The created dataset as returned by the Langfuse API.
2960 def create_dataset_item( 2961 self, 2962 *, 2963 dataset_name: str, 2964 input: Optional[Any] = None, 2965 expected_output: Optional[Any] = None, 2966 metadata: Optional[Any] = None, 2967 source_trace_id: Optional[str] = None, 2968 source_observation_id: Optional[str] = None, 2969 status: Optional[DatasetStatus] = None, 2970 id: Optional[str] = None, 2971 ) -> DatasetItem: 2972 """Create a dataset item. 2973 2974 Upserts if an item with id already exists. 2975 2976 Args: 2977 dataset_name: Name of the dataset in which the dataset item should be created. 2978 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2979 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2980 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2981 source_trace_id: Id of the source trace. Defaults to None. 2982 source_observation_id: Id of the source observation. Defaults to None. 2983 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2984 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2985 2986 Returns: 2987 DatasetItem: The created dataset item as returned by the Langfuse API. 2988 2989 Example: 2990 ```python 2991 from langfuse import Langfuse 2992 2993 langfuse = Langfuse() 2994 2995 # Uploading items to the Langfuse dataset named "capital_cities" 2996 langfuse.create_dataset_item( 2997 dataset_name="capital_cities", 2998 input={"input": {"country": "Italy"}}, 2999 expected_output={"expected_output": "Rome"}, 3000 metadata={"foo": "bar"} 3001 ) 3002 ``` 3003 """ 3004 try: 3005 body = CreateDatasetItemRequest( 3006 datasetName=dataset_name, 3007 input=input, 3008 expectedOutput=expected_output, 3009 metadata=metadata, 3010 sourceTraceId=source_trace_id, 3011 sourceObservationId=source_observation_id, 3012 status=status, 3013 id=id, 3014 ) 3015 langfuse_logger.debug(f"Creating dataset item {body}") 3016 return self.api.dataset_items.create(request=body) 3017 except Error as e: 3018 handle_fern_exception(e) 3019 raise e
Create a dataset item.
Upserts if an item with id already exists.
Arguments:
- dataset_name: Name of the dataset in which the dataset item should be created.
- input: Input data. Defaults to None. Can contain any dict, list or scalar.
- expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
- metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
- source_trace_id: Id of the source trace. Defaults to None.
- source_observation_id: Id of the source observation. Defaults to None.
- status: Status of the dataset item. Defaults to ACTIVE for newly created items.
- id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
Returns:
DatasetItem: The created dataset item as returned by the Langfuse API.
Example:
from langfuse import Langfuse langfuse = Langfuse() # Uploading items to the Langfuse dataset named "capital_cities" langfuse.create_dataset_item( dataset_name="capital_cities", input={"input": {"country": "Italy"}}, expected_output={"expected_output": "Rome"}, metadata={"foo": "bar"} )
3021 def resolve_media_references( 3022 self, 3023 *, 3024 obj: Any, 3025 resolve_with: Literal["base64_data_uri"], 3026 max_depth: int = 10, 3027 content_fetch_timeout_seconds: int = 5, 3028 ) -> Any: 3029 """Replace media reference strings in an object with base64 data URIs. 3030 3031 This method recursively traverses an object (up to max_depth) looking for media reference strings 3032 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3033 the provided Langfuse client and replaces the reference string with a base64 data URI. 3034 3035 If fetching media content fails for a reference string, a warning is logged and the reference 3036 string is left unchanged. 3037 3038 Args: 3039 obj: The object to process. Can be a primitive value, array, or nested object. 3040 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3041 resolve_with: The representation of the media content to replace the media reference string with. 3042 Currently only "base64_data_uri" is supported. 3043 max_depth: int: The maximum depth to traverse the object. Default is 10. 3044 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3045 3046 Returns: 3047 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3048 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3049 3050 Example: 3051 obj = { 3052 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3053 "nested": { 3054 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3055 } 3056 } 3057 3058 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3059 3060 # Result: 3061 # { 3062 # "image": "...", 3063 # "nested": { 3064 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3065 # } 3066 # } 3067 """ 3068 return LangfuseMedia.resolve_media_references( 3069 langfuse_client=self, 3070 obj=obj, 3071 resolve_with=resolve_with, 3072 max_depth=max_depth, 3073 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3074 )
Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
Arguments:
- obj: The object to process. Can be a primitive value, array, or nested object. If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
- resolve_with: The representation of the media content to replace the media reference string with. Currently only "base64_data_uri" is supported.
- max_depth: int: The maximum depth to traverse the object. Default is 10.
- content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible. If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = { "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", "nested": { "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" } }
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
Result:
{
"image": "...",
"nested": {
"pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
}
}
3104 def get_prompt( 3105 self, 3106 name: str, 3107 *, 3108 version: Optional[int] = None, 3109 label: Optional[str] = None, 3110 type: Literal["chat", "text"] = "text", 3111 cache_ttl_seconds: Optional[int] = None, 3112 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3113 max_retries: Optional[int] = None, 3114 fetch_timeout_seconds: Optional[int] = None, 3115 ) -> PromptClient: 3116 """Get a prompt. 3117 3118 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3119 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3120 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3121 return the expired prompt as a fallback. 3122 3123 Args: 3124 name (str): The name of the prompt to retrieve. 3125 3126 Keyword Args: 3127 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3128 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3129 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3130 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3131 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3132 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3133 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3134 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3135 3136 Returns: 3137 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3138 - TextPromptClient, if type argument is 'text'. 3139 - ChatPromptClient, if type argument is 'chat'. 3140 3141 Raises: 3142 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3143 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3144 """ 3145 if self._resources is None: 3146 raise Error( 3147 "SDK is not correctly initialized. Check the init logs for more details." 3148 ) 3149 if version is not None and label is not None: 3150 raise ValueError("Cannot specify both version and label at the same time.") 3151 3152 if not name: 3153 raise ValueError("Prompt name cannot be empty.") 3154 3155 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3156 bounded_max_retries = self._get_bounded_max_retries( 3157 max_retries, default_max_retries=2, max_retries_upper_bound=4 3158 ) 3159 3160 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3161 cached_prompt = self._resources.prompt_cache.get(cache_key) 3162 3163 if cached_prompt is None or cache_ttl_seconds == 0: 3164 langfuse_logger.debug( 3165 f"Prompt '{cache_key}' not found in cache or caching disabled." 3166 ) 3167 try: 3168 return self._fetch_prompt_and_update_cache( 3169 name, 3170 version=version, 3171 label=label, 3172 ttl_seconds=cache_ttl_seconds, 3173 max_retries=bounded_max_retries, 3174 fetch_timeout_seconds=fetch_timeout_seconds, 3175 ) 3176 except Exception as e: 3177 if fallback: 3178 langfuse_logger.warning( 3179 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3180 ) 3181 3182 fallback_client_args: Dict[str, Any] = { 3183 "name": name, 3184 "prompt": fallback, 3185 "type": type, 3186 "version": version or 0, 3187 "config": {}, 3188 "labels": [label] if label else [], 3189 "tags": [], 3190 } 3191 3192 if type == "text": 3193 return TextPromptClient( 3194 prompt=Prompt_Text(**fallback_client_args), 3195 is_fallback=True, 3196 ) 3197 3198 if type == "chat": 3199 return ChatPromptClient( 3200 prompt=Prompt_Chat(**fallback_client_args), 3201 is_fallback=True, 3202 ) 3203 3204 raise e 3205 3206 if cached_prompt.is_expired(): 3207 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3208 try: 3209 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3210 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3211 3212 def refresh_task() -> None: 3213 self._fetch_prompt_and_update_cache( 3214 name, 3215 version=version, 3216 label=label, 3217 ttl_seconds=cache_ttl_seconds, 3218 max_retries=bounded_max_retries, 3219 fetch_timeout_seconds=fetch_timeout_seconds, 3220 ) 3221 3222 self._resources.prompt_cache.add_refresh_prompt_task( 3223 cache_key, 3224 refresh_task, 3225 ) 3226 langfuse_logger.debug( 3227 f"Returning stale prompt '{cache_key}' from cache." 3228 ) 3229 # return stale prompt 3230 return cached_prompt.value 3231 3232 except Exception as e: 3233 langfuse_logger.warning( 3234 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3235 ) 3236 # creation of refresh prompt task failed, return stale prompt 3237 return cached_prompt.value 3238 3239 return cached_prompt.value
Get a prompt.
This method attempts to fetch the requested prompt from the local cache. If the prompt is not found in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will return the expired prompt as a fallback.
Arguments:
- name (str): The name of the prompt to retrieve.
Keyword Args:
version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the
productionlabel is returned. Specify either version or label, not both. label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, theproductionlabel is returned. Specify either version or label, not both. cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default.
Returns:
The prompt object retrieved from the cache or directly fetched if not cached or expired of type
- TextPromptClient, if type argument is 'text'.
- ChatPromptClient, if type argument is 'chat'.
Raises:
- Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
- expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
3333 def create_prompt( 3334 self, 3335 *, 3336 name: str, 3337 prompt: Union[ 3338 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3339 ], 3340 labels: List[str] = [], 3341 tags: Optional[List[str]] = None, 3342 type: Optional[Literal["chat", "text"]] = "text", 3343 config: Optional[Any] = None, 3344 commit_message: Optional[str] = None, 3345 ) -> PromptClient: 3346 """Create a new prompt in Langfuse. 3347 3348 Keyword Args: 3349 name : The name of the prompt to be created. 3350 prompt : The content of the prompt to be created. 3351 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3352 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3353 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3354 config: Additional structured data to be saved with the prompt. Defaults to None. 3355 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3356 commit_message: Optional string describing the change. 3357 3358 Returns: 3359 TextPromptClient: The prompt if type argument is 'text'. 3360 ChatPromptClient: The prompt if type argument is 'chat'. 3361 """ 3362 try: 3363 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3364 3365 if type == "chat": 3366 if not isinstance(prompt, list): 3367 raise ValueError( 3368 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3369 ) 3370 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3371 CreatePromptRequest_Chat( 3372 name=name, 3373 prompt=cast(Any, prompt), 3374 labels=labels, 3375 tags=tags, 3376 config=config or {}, 3377 commitMessage=commit_message, 3378 type="chat", 3379 ) 3380 ) 3381 server_prompt = self.api.prompts.create(request=request) 3382 3383 if self._resources is not None: 3384 self._resources.prompt_cache.invalidate(name) 3385 3386 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3387 3388 if not isinstance(prompt, str): 3389 raise ValueError("For 'text' type, 'prompt' must be a string.") 3390 3391 request = CreatePromptRequest_Text( 3392 name=name, 3393 prompt=prompt, 3394 labels=labels, 3395 tags=tags, 3396 config=config or {}, 3397 commitMessage=commit_message, 3398 type="text", 3399 ) 3400 3401 server_prompt = self.api.prompts.create(request=request) 3402 3403 if self._resources is not None: 3404 self._resources.prompt_cache.invalidate(name) 3405 3406 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3407 3408 except Error as e: 3409 handle_fern_exception(e) 3410 raise e
Create a new prompt in Langfuse.
Keyword Args:
name : The name of the prompt to be created. prompt : The content of the prompt to be created. is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. config: Additional structured data to be saved with the prompt. Defaults to None. type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". commit_message: Optional string describing the change.
Returns:
TextPromptClient: The prompt if type argument is 'text'. ChatPromptClient: The prompt if type argument is 'chat'.
3412 def update_prompt( 3413 self, 3414 *, 3415 name: str, 3416 version: int, 3417 new_labels: List[str] = [], 3418 ) -> Any: 3419 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3420 3421 Args: 3422 name (str): The name of the prompt to update. 3423 version (int): The version number of the prompt to update. 3424 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3425 3426 Returns: 3427 Prompt: The updated prompt from the Langfuse API. 3428 3429 """ 3430 updated_prompt = self.api.prompt_version.update( 3431 name=self._url_encode(name), 3432 version=version, 3433 new_labels=new_labels, 3434 ) 3435 3436 if self._resources is not None: 3437 self._resources.prompt_cache.invalidate(name) 3438 3439 return updated_prompt
Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name.
Arguments:
- name (str): The name of the prompt to update.
- version (int): The version number of the prompt to update.
- new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to [].
Returns:
Prompt: The updated prompt from the Langfuse API.
3454 def clear_prompt_cache(self) -> None: 3455 """Clear the entire prompt cache, removing all cached prompts. 3456 3457 This method is useful when you want to force a complete refresh of all 3458 cached prompts, for example after major updates or when you need to 3459 ensure the latest versions are fetched from the server. 3460 """ 3461 if self._resources is not None: 3462 self._resources.prompt_cache.clear()
Clear the entire prompt cache, removing all cached prompts.
This method is useful when you want to force a complete refresh of all cached prompts, for example after major updates or when you need to ensure the latest versions are fetched from the server.
59def get_client(*, public_key: Optional[str] = None) -> Langfuse: 60 """Get or create a Langfuse client instance. 61 62 Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, 63 providing a public_key is required. Multi-project support is experimental - see Langfuse docs. 64 65 Behavior: 66 - Single project: Returns existing client or creates new one 67 - Multi-project: Requires public_key to return specific client 68 - No public_key in multi-project: Returns disabled client to prevent data leakage 69 70 The function uses a singleton pattern per public_key to conserve resources and maintain state. 71 72 Args: 73 public_key (Optional[str]): Project identifier 74 - With key: Returns client for that project 75 - Without key: Returns single client or disabled client if multiple exist 76 77 Returns: 78 Langfuse: Client instance in one of three states: 79 1. Client for specified public_key 80 2. Default client for single-project setup 81 3. Disabled client when multiple projects exist without key 82 83 Security: 84 Disables tracing when multiple projects exist without explicit key to prevent 85 cross-project data leakage. Multi-project setups are experimental. 86 87 Example: 88 ```python 89 # Single project 90 client = get_client() # Default client 91 92 # In multi-project usage: 93 client_a = get_client(public_key="project_a_key") # Returns project A's client 94 client_b = get_client(public_key="project_b_key") # Returns project B's client 95 96 # Without specific key in multi-project setup: 97 client = get_client() # Returns disabled client for safety 98 ``` 99 """ 100 with LangfuseResourceManager._lock: 101 active_instances = LangfuseResourceManager._instances 102 103 # If no explicit public_key provided, check execution context 104 if not public_key: 105 public_key = _current_public_key.get(None) 106 107 if not public_key: 108 if len(active_instances) == 0: 109 # No clients initialized yet, create default instance 110 return Langfuse() 111 112 if len(active_instances) == 1: 113 # Only one client exists, safe to use without specifying key 114 instance = list(active_instances.values())[0] 115 116 # Initialize with the credentials bound to the instance 117 # This is important if the original instance was instantiated 118 # via constructor arguments 119 return _create_client_from_instance(instance) 120 121 else: 122 # Multiple clients exist but no key specified - disable tracing 123 # to prevent cross-project data leakage 124 langfuse_logger.warning( 125 "No 'langfuse_public_key' passed to decorated function, but multiple langfuse clients are instantiated in current process. Skipping tracing for this function to avoid cross-project leakage." 126 ) 127 return Langfuse( 128 tracing_enabled=False, public_key="fake", secret_key="fake" 129 ) 130 131 else: 132 # Specific key provided, look up existing instance 133 target_instance: Optional[LangfuseResourceManager] = active_instances.get( 134 public_key, None 135 ) 136 137 if target_instance is None: 138 # No instance found with this key - client not initialized properly 139 langfuse_logger.warning( 140 f"No Langfuse client with public key {public_key} has been initialized. Skipping tracing for decorated function." 141 ) 142 return Langfuse( 143 tracing_enabled=False, public_key="fake", secret_key="fake" 144 ) 145 146 # target_instance is guaranteed to be not None at this point 147 return _create_client_from_instance(target_instance, public_key)
Get or create a Langfuse client instance.
Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, providing a public_key is required. Multi-project support is experimental - see Langfuse docs.
Behavior:
- Single project: Returns existing client or creates new one
- Multi-project: Requires public_key to return specific client
- No public_key in multi-project: Returns disabled client to prevent data leakage
The function uses a singleton pattern per public_key to conserve resources and maintain state.
Arguments:
- public_key (Optional[str]):  Project identifier
- With key: Returns client for that project
- Without key: Returns single client or disabled client if multiple exist
 
Returns:
Langfuse: Client instance in one of three states: 1. Client for specified public_key 2. Default client for single-project setup 3. Disabled client when multiple projects exist without key
Security:
Disables tracing when multiple projects exist without explicit key to prevent cross-project data leakage. Multi-project setups are experimental.
Example:
# Single project client = get_client() # Default client # In multi-project usage: client_a = get_client(public_key="project_a_key") # Returns project A's client client_b = get_client(public_key="project_b_key") # Returns project B's client # Without specific key in multi-project setup: client = get_client() # Returns disabled client for safety
90 def observe( 91 self, 92 func: Optional[F] = None, 93 *, 94 name: Optional[str] = None, 95 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 96 capture_input: Optional[bool] = None, 97 capture_output: Optional[bool] = None, 98 transform_to_string: Optional[Callable[[Iterable], str]] = None, 99 ) -> Union[F, Callable[[F], F]]: 100 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 101 102 This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates 103 spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator 104 intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints. 105 106 Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, 107 enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details. 108 109 Args: 110 func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None. 111 name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used. 112 as_type (Optional[Literal]): Set the observation type. Supported values: 113 "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". 114 Observation types are highlighted in the Langfuse UI for filtering and visualization. 115 The types "generation" and "embedding" create a span on which additional attributes such as model metrics 116 can be set. 117 118 Returns: 119 Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans. 120 121 Example: 122 For general function tracing with automatic naming: 123 ```python 124 @observe() 125 def process_user_request(user_id, query): 126 # Function is automatically traced with name "process_user_request" 127 return get_response(query) 128 ``` 129 130 For language model generation tracking: 131 ```python 132 @observe(name="answer-generation", as_type="generation") 133 async def generate_answer(query): 134 # Creates a generation-type span with extended LLM metrics 135 response = await openai.chat.completions.create( 136 model="gpt-4", 137 messages=[{"role": "user", "content": query}] 138 ) 139 return response.choices[0].message.content 140 ``` 141 142 For trace context propagation between functions: 143 ```python 144 @observe() 145 def main_process(): 146 # Parent span is created 147 return sub_process() # Child span automatically connected to parent 148 149 @observe() 150 def sub_process(): 151 # Automatically becomes a child span of main_process 152 return "result" 153 ``` 154 155 Raises: 156 Exception: Propagates any exceptions from the wrapped function after logging them in the trace. 157 158 Notes: 159 - The decorator preserves the original function's signature, docstring, and return type. 160 - Proper parent-child relationships between spans are automatically maintained. 161 - Special keyword arguments can be passed to control tracing: 162 - langfuse_trace_id: Explicitly set the trace ID for this function call 163 - langfuse_parent_observation_id: Explicitly set the parent span ID 164 - langfuse_public_key: Use a specific Langfuse project (when multiple clients exist) 165 - For async functions, the decorator returns an async function wrapper. 166 - For sync functions, the decorator returns a synchronous wrapper. 167 """ 168 valid_types = set(get_observation_types_list(ObservationTypeLiteralNoEvent)) 169 if as_type is not None and as_type not in valid_types: 170 self._log.warning( 171 f"Invalid as_type '{as_type}'. Valid types are: {', '.join(sorted(valid_types))}. Defaulting to 'span'." 172 ) 173 as_type = "span" 174 175 function_io_capture_enabled = os.environ.get( 176 LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, "True" 177 ).lower() not in ("false", "0") 178 179 should_capture_input = ( 180 capture_input if capture_input is not None else function_io_capture_enabled 181 ) 182 183 should_capture_output = ( 184 capture_output 185 if capture_output is not None 186 else function_io_capture_enabled 187 ) 188 189 def decorator(func: F) -> F: 190 return ( 191 self._async_observe( 192 func, 193 name=name, 194 as_type=as_type, 195 capture_input=should_capture_input, 196 capture_output=should_capture_output, 197 transform_to_string=transform_to_string, 198 ) 199 if asyncio.iscoroutinefunction(func) 200 else self._sync_observe( 201 func, 202 name=name, 203 as_type=as_type, 204 capture_input=should_capture_input, 205 capture_output=should_capture_output, 206 transform_to_string=transform_to_string, 207 ) 208 ) 209 210 """Handle decorator with or without parentheses. 211 212 This logic enables the decorator to work both with and without parentheses: 213 - @observe - Python passes the function directly to the decorator 214 - @observe() - Python calls the decorator first, which must return a function decorator 215 216 When called without arguments (@observe), the func parameter contains the function to decorate, 217 so we directly apply the decorator to it. When called with parentheses (@observe()), 218 func is None, so we return the decorator function itself for Python to apply in the next step. 219 """ 220 if func is None: 221 return decorator 222 else: 223 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints.
Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details.
Arguments:
- func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None.
- name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used.
- as_type (Optional[Literal]): Set the observation type. Supported values: "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". Observation types are highlighted in the Langfuse UI for filtering and visualization. The types "generation" and "embedding" create a span on which additional attributes such as model metrics can be set.
Returns:
Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans.
Example:
For general function tracing with automatic naming:
@observe() def process_user_request(user_id, query): # Function is automatically traced with name "process_user_request" return get_response(query)For language model generation tracking:
@observe(name="answer-generation", as_type="generation") async def generate_answer(query): # Creates a generation-type span with extended LLM metrics response = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}] ) return response.choices[0].message.contentFor trace context propagation between functions:
@observe() def main_process(): # Parent span is created return sub_process() # Child span automatically connected to parent @observe() def sub_process(): # Automatically becomes a child span of main_process return "result"
Raises:
- Exception: Propagates any exceptions from the wrapped function after logging them in the trace.
Notes:
- The decorator preserves the original function's signature, docstring, and return type.
- Proper parent-child relationships between spans are automatically maintained.
- Special keyword arguments can be passed to control tracing:
- langfuse_trace_id: Explicitly set the trace ID for this function call
- langfuse_parent_observation_id: Explicitly set the parent span ID
- langfuse_public_key: Use a specific Langfuse project (when multiple clients exist)
- For async functions, the decorator returns an async function wrapper.
- For sync functions, the decorator returns a synchronous wrapper.
1147class LangfuseSpan(LangfuseObservationWrapper): 1148 """Standard span implementation for general operations in Langfuse. 1149 1150 This class represents a general-purpose span that can be used to trace 1151 any operation in your application. It extends the base LangfuseObservationWrapper 1152 with specific methods for creating child spans, generations, and updating 1153 span-specific attributes. If possible, use a more specific type for 1154 better observability and insights. 1155 """ 1156 1157 def __init__( 1158 self, 1159 *, 1160 otel_span: otel_trace_api.Span, 1161 langfuse_client: "Langfuse", 1162 input: Optional[Any] = None, 1163 output: Optional[Any] = None, 1164 metadata: Optional[Any] = None, 1165 environment: Optional[str] = None, 1166 version: Optional[str] = None, 1167 level: Optional[SpanLevel] = None, 1168 status_message: Optional[str] = None, 1169 ): 1170 """Initialize a new LangfuseSpan. 1171 1172 Args: 1173 otel_span: The OpenTelemetry span to wrap 1174 langfuse_client: Reference to the parent Langfuse client 1175 input: Input data for the span (any JSON-serializable object) 1176 output: Output data from the span (any JSON-serializable object) 1177 metadata: Additional metadata to associate with the span 1178 environment: The tracing environment 1179 version: Version identifier for the code or component 1180 level: Importance level of the span (info, warning, error) 1181 status_message: Optional status message for the span 1182 """ 1183 super().__init__( 1184 otel_span=otel_span, 1185 as_type="span", 1186 langfuse_client=langfuse_client, 1187 input=input, 1188 output=output, 1189 metadata=metadata, 1190 environment=environment, 1191 version=version, 1192 level=level, 1193 status_message=status_message, 1194 ) 1195 1196 def start_span( 1197 self, 1198 name: str, 1199 input: Optional[Any] = None, 1200 output: Optional[Any] = None, 1201 metadata: Optional[Any] = None, 1202 version: Optional[str] = None, 1203 level: Optional[SpanLevel] = None, 1204 status_message: Optional[str] = None, 1205 ) -> "LangfuseSpan": 1206 """Create a new child span. 1207 1208 This method creates a new child span with this span as the parent. 1209 Unlike start_as_current_span(), this method does not set the new span 1210 as the current span in the context. 1211 1212 Args: 1213 name: Name of the span (e.g., function or operation name) 1214 input: Input data for the operation 1215 output: Output data from the operation 1216 metadata: Additional metadata to associate with the span 1217 version: Version identifier for the code or component 1218 level: Importance level of the span (info, warning, error) 1219 status_message: Optional status message for the span 1220 1221 Returns: 1222 A new LangfuseSpan that must be ended with .end() when complete 1223 1224 Example: 1225 ```python 1226 parent_span = langfuse.start_span(name="process-request") 1227 try: 1228 # Create a child span 1229 child_span = parent_span.start_span(name="validate-input") 1230 try: 1231 # Do validation work 1232 validation_result = validate(request_data) 1233 child_span.update(output=validation_result) 1234 finally: 1235 child_span.end() 1236 1237 # Continue with parent span 1238 result = process_validated_data(validation_result) 1239 parent_span.update(output=result) 1240 finally: 1241 parent_span.end() 1242 ``` 1243 """ 1244 return self.start_observation( 1245 name=name, 1246 as_type="span", 1247 input=input, 1248 output=output, 1249 metadata=metadata, 1250 version=version, 1251 level=level, 1252 status_message=status_message, 1253 ) 1254 1255 def start_as_current_span( 1256 self, 1257 *, 1258 name: str, 1259 input: Optional[Any] = None, 1260 output: Optional[Any] = None, 1261 metadata: Optional[Any] = None, 1262 version: Optional[str] = None, 1263 level: Optional[SpanLevel] = None, 1264 status_message: Optional[str] = None, 1265 ) -> _AgnosticContextManager["LangfuseSpan"]: 1266 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1267 1268 DEPRECATED: This method is deprecated and will be removed in a future version. 1269 Use start_as_current_observation(as_type='span') instead. 1270 1271 This method creates a new child span and sets it as the current span within 1272 a context manager. It should be used with a 'with' statement to automatically 1273 manage the span's lifecycle. 1274 1275 Args: 1276 name: Name of the span (e.g., function or operation name) 1277 input: Input data for the operation 1278 output: Output data from the operation 1279 metadata: Additional metadata to associate with the span 1280 version: Version identifier for the code or component 1281 level: Importance level of the span (info, warning, error) 1282 status_message: Optional status message for the span 1283 1284 Returns: 1285 A context manager that yields a new LangfuseSpan 1286 1287 Example: 1288 ```python 1289 with langfuse.start_as_current_span(name="process-request") as parent_span: 1290 # Parent span is active here 1291 1292 # Create a child span with context management 1293 with parent_span.start_as_current_span(name="validate-input") as child_span: 1294 # Child span is active here 1295 validation_result = validate(request_data) 1296 child_span.update(output=validation_result) 1297 1298 # Back to parent span context 1299 result = process_validated_data(validation_result) 1300 parent_span.update(output=result) 1301 ``` 1302 """ 1303 warnings.warn( 1304 "start_as_current_span is deprecated and will be removed in a future version. " 1305 "Use start_as_current_observation(as_type='span') instead.", 1306 DeprecationWarning, 1307 stacklevel=2, 1308 ) 1309 return self.start_as_current_observation( 1310 name=name, 1311 as_type="span", 1312 input=input, 1313 output=output, 1314 metadata=metadata, 1315 version=version, 1316 level=level, 1317 status_message=status_message, 1318 ) 1319 1320 def start_generation( 1321 self, 1322 *, 1323 name: str, 1324 input: Optional[Any] = None, 1325 output: Optional[Any] = None, 1326 metadata: Optional[Any] = None, 1327 version: Optional[str] = None, 1328 level: Optional[SpanLevel] = None, 1329 status_message: Optional[str] = None, 1330 completion_start_time: Optional[datetime] = None, 1331 model: Optional[str] = None, 1332 model_parameters: Optional[Dict[str, MapValue]] = None, 1333 usage_details: Optional[Dict[str, int]] = None, 1334 cost_details: Optional[Dict[str, float]] = None, 1335 prompt: Optional[PromptClient] = None, 1336 ) -> "LangfuseGeneration": 1337 """[DEPRECATED] Create a new child generation span. 1338 1339 DEPRECATED: This method is deprecated and will be removed in a future version. 1340 Use start_observation(as_type='generation') instead. 1341 1342 This method creates a new child generation span with this span as the parent. 1343 Generation spans are specialized for AI/LLM operations and include additional 1344 fields for model information, usage stats, and costs. 1345 1346 Unlike start_as_current_generation(), this method does not set the new span 1347 as the current span in the context. 1348 1349 Args: 1350 name: Name of the generation operation 1351 input: Input data for the model (e.g., prompts) 1352 output: Output from the model (e.g., completions) 1353 metadata: Additional metadata to associate with the generation 1354 version: Version identifier for the model or component 1355 level: Importance level of the generation (info, warning, error) 1356 status_message: Optional status message for the generation 1357 completion_start_time: When the model started generating the response 1358 model: Name/identifier of the AI model used (e.g., "gpt-4") 1359 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1360 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1361 cost_details: Cost information for the model call 1362 prompt: Associated prompt template from Langfuse prompt management 1363 1364 Returns: 1365 A new LangfuseGeneration that must be ended with .end() when complete 1366 1367 Example: 1368 ```python 1369 span = langfuse.start_span(name="process-query") 1370 try: 1371 # Create a generation child span 1372 generation = span.start_generation( 1373 name="generate-answer", 1374 model="gpt-4", 1375 input={"prompt": "Explain quantum computing"} 1376 ) 1377 try: 1378 # Call model API 1379 response = llm.generate(...) 1380 1381 generation.update( 1382 output=response.text, 1383 usage_details={ 1384 "prompt_tokens": response.usage.prompt_tokens, 1385 "completion_tokens": response.usage.completion_tokens 1386 } 1387 ) 1388 finally: 1389 generation.end() 1390 1391 # Continue with parent span 1392 span.update(output={"answer": response.text, "source": "gpt-4"}) 1393 finally: 1394 span.end() 1395 ``` 1396 """ 1397 warnings.warn( 1398 "start_generation is deprecated and will be removed in a future version. " 1399 "Use start_observation(as_type='generation') instead.", 1400 DeprecationWarning, 1401 stacklevel=2, 1402 ) 1403 return self.start_observation( 1404 name=name, 1405 as_type="generation", 1406 input=input, 1407 output=output, 1408 metadata=metadata, 1409 version=version, 1410 level=level, 1411 status_message=status_message, 1412 completion_start_time=completion_start_time, 1413 model=model, 1414 model_parameters=model_parameters, 1415 usage_details=usage_details, 1416 cost_details=cost_details, 1417 prompt=prompt, 1418 ) 1419 1420 def start_as_current_generation( 1421 self, 1422 *, 1423 name: str, 1424 input: Optional[Any] = None, 1425 output: Optional[Any] = None, 1426 metadata: Optional[Any] = None, 1427 version: Optional[str] = None, 1428 level: Optional[SpanLevel] = None, 1429 status_message: Optional[str] = None, 1430 completion_start_time: Optional[datetime] = None, 1431 model: Optional[str] = None, 1432 model_parameters: Optional[Dict[str, MapValue]] = None, 1433 usage_details: Optional[Dict[str, int]] = None, 1434 cost_details: Optional[Dict[str, float]] = None, 1435 prompt: Optional[PromptClient] = None, 1436 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1437 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1438 1439 DEPRECATED: This method is deprecated and will be removed in a future version. 1440 Use start_as_current_observation(as_type='generation') instead. 1441 1442 This method creates a new child generation span and sets it as the current span 1443 within a context manager. Generation spans are specialized for AI/LLM operations 1444 and include additional fields for model information, usage stats, and costs. 1445 1446 Args: 1447 name: Name of the generation operation 1448 input: Input data for the model (e.g., prompts) 1449 output: Output from the model (e.g., completions) 1450 metadata: Additional metadata to associate with the generation 1451 version: Version identifier for the model or component 1452 level: Importance level of the generation (info, warning, error) 1453 status_message: Optional status message for the generation 1454 completion_start_time: When the model started generating the response 1455 model: Name/identifier of the AI model used (e.g., "gpt-4") 1456 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1457 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1458 cost_details: Cost information for the model call 1459 prompt: Associated prompt template from Langfuse prompt management 1460 1461 Returns: 1462 A context manager that yields a new LangfuseGeneration 1463 1464 Example: 1465 ```python 1466 with langfuse.start_as_current_span(name="process-request") as span: 1467 # Prepare data 1468 query = preprocess_user_query(user_input) 1469 1470 # Create a generation span with context management 1471 with span.start_as_current_generation( 1472 name="generate-answer", 1473 model="gpt-4", 1474 input={"query": query} 1475 ) as generation: 1476 # Generation span is active here 1477 response = llm.generate(query) 1478 1479 # Update with results 1480 generation.update( 1481 output=response.text, 1482 usage_details={ 1483 "prompt_tokens": response.usage.prompt_tokens, 1484 "completion_tokens": response.usage.completion_tokens 1485 } 1486 ) 1487 1488 # Back to parent span context 1489 span.update(output={"answer": response.text, "source": "gpt-4"}) 1490 ``` 1491 """ 1492 warnings.warn( 1493 "start_as_current_generation is deprecated and will be removed in a future version. " 1494 "Use start_as_current_observation(as_type='generation') instead.", 1495 DeprecationWarning, 1496 stacklevel=2, 1497 ) 1498 return self.start_as_current_observation( 1499 name=name, 1500 as_type="generation", 1501 input=input, 1502 output=output, 1503 metadata=metadata, 1504 version=version, 1505 level=level, 1506 status_message=status_message, 1507 completion_start_time=completion_start_time, 1508 model=model, 1509 model_parameters=model_parameters, 1510 usage_details=usage_details, 1511 cost_details=cost_details, 1512 prompt=prompt, 1513 ) 1514 1515 def create_event( 1516 self, 1517 *, 1518 name: str, 1519 input: Optional[Any] = None, 1520 output: Optional[Any] = None, 1521 metadata: Optional[Any] = None, 1522 version: Optional[str] = None, 1523 level: Optional[SpanLevel] = None, 1524 status_message: Optional[str] = None, 1525 ) -> "LangfuseEvent": 1526 """Create a new Langfuse observation of type 'EVENT'. 1527 1528 Args: 1529 name: Name of the span (e.g., function or operation name) 1530 input: Input data for the operation (can be any JSON-serializable object) 1531 output: Output data from the operation (can be any JSON-serializable object) 1532 metadata: Additional metadata to associate with the span 1533 version: Version identifier for the code or component 1534 level: Importance level of the span (info, warning, error) 1535 status_message: Optional status message for the span 1536 1537 Returns: 1538 The LangfuseEvent object 1539 1540 Example: 1541 ```python 1542 event = langfuse.create_event(name="process-event") 1543 ``` 1544 """ 1545 timestamp = time_ns() 1546 1547 with otel_trace_api.use_span(self._otel_span): 1548 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1549 name=name, start_time=timestamp 1550 ) 1551 1552 return cast( 1553 "LangfuseEvent", 1554 LangfuseEvent( 1555 otel_span=new_otel_span, 1556 langfuse_client=self._langfuse_client, 1557 input=input, 1558 output=output, 1559 metadata=metadata, 1560 environment=self._environment, 1561 version=version, 1562 level=level, 1563 status_message=status_message, 1564 ).end(end_time=timestamp), 1565 )
Standard span implementation for general operations in Langfuse.
This class represents a general-purpose span that can be used to trace any operation in your application. It extends the base LangfuseObservationWrapper with specific methods for creating child spans, generations, and updating span-specific attributes. If possible, use a more specific type for better observability and insights.
1157 def __init__( 1158 self, 1159 *, 1160 otel_span: otel_trace_api.Span, 1161 langfuse_client: "Langfuse", 1162 input: Optional[Any] = None, 1163 output: Optional[Any] = None, 1164 metadata: Optional[Any] = None, 1165 environment: Optional[str] = None, 1166 version: Optional[str] = None, 1167 level: Optional[SpanLevel] = None, 1168 status_message: Optional[str] = None, 1169 ): 1170 """Initialize a new LangfuseSpan. 1171 1172 Args: 1173 otel_span: The OpenTelemetry span to wrap 1174 langfuse_client: Reference to the parent Langfuse client 1175 input: Input data for the span (any JSON-serializable object) 1176 output: Output data from the span (any JSON-serializable object) 1177 metadata: Additional metadata to associate with the span 1178 environment: The tracing environment 1179 version: Version identifier for the code or component 1180 level: Importance level of the span (info, warning, error) 1181 status_message: Optional status message for the span 1182 """ 1183 super().__init__( 1184 otel_span=otel_span, 1185 as_type="span", 1186 langfuse_client=langfuse_client, 1187 input=input, 1188 output=output, 1189 metadata=metadata, 1190 environment=environment, 1191 version=version, 1192 level=level, 1193 status_message=status_message, 1194 )
Initialize a new LangfuseSpan.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the span (any JSON-serializable object)
- output: Output data from the span (any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- environment: The tracing environment
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
1196 def start_span( 1197 self, 1198 name: str, 1199 input: Optional[Any] = None, 1200 output: Optional[Any] = None, 1201 metadata: Optional[Any] = None, 1202 version: Optional[str] = None, 1203 level: Optional[SpanLevel] = None, 1204 status_message: Optional[str] = None, 1205 ) -> "LangfuseSpan": 1206 """Create a new child span. 1207 1208 This method creates a new child span with this span as the parent. 1209 Unlike start_as_current_span(), this method does not set the new span 1210 as the current span in the context. 1211 1212 Args: 1213 name: Name of the span (e.g., function or operation name) 1214 input: Input data for the operation 1215 output: Output data from the operation 1216 metadata: Additional metadata to associate with the span 1217 version: Version identifier for the code or component 1218 level: Importance level of the span (info, warning, error) 1219 status_message: Optional status message for the span 1220 1221 Returns: 1222 A new LangfuseSpan that must be ended with .end() when complete 1223 1224 Example: 1225 ```python 1226 parent_span = langfuse.start_span(name="process-request") 1227 try: 1228 # Create a child span 1229 child_span = parent_span.start_span(name="validate-input") 1230 try: 1231 # Do validation work 1232 validation_result = validate(request_data) 1233 child_span.update(output=validation_result) 1234 finally: 1235 child_span.end() 1236 1237 # Continue with parent span 1238 result = process_validated_data(validation_result) 1239 parent_span.update(output=result) 1240 finally: 1241 parent_span.end() 1242 ``` 1243 """ 1244 return self.start_observation( 1245 name=name, 1246 as_type="span", 1247 input=input, 1248 output=output, 1249 metadata=metadata, 1250 version=version, 1251 level=level, 1252 status_message=status_message, 1253 )
Create a new child span.
This method creates a new child span with this span as the parent. Unlike start_as_current_span(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A new LangfuseSpan that must be ended with .end() when complete
Example:
parent_span = langfuse.start_span(name="process-request") try: # Create a child span child_span = parent_span.start_span(name="validate-input") try: # Do validation work validation_result = validate(request_data) child_span.update(output=validation_result) finally: child_span.end() # Continue with parent span result = process_validated_data(validation_result) parent_span.update(output=result) finally: parent_span.end()
1255 def start_as_current_span( 1256 self, 1257 *, 1258 name: str, 1259 input: Optional[Any] = None, 1260 output: Optional[Any] = None, 1261 metadata: Optional[Any] = None, 1262 version: Optional[str] = None, 1263 level: Optional[SpanLevel] = None, 1264 status_message: Optional[str] = None, 1265 ) -> _AgnosticContextManager["LangfuseSpan"]: 1266 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1267 1268 DEPRECATED: This method is deprecated and will be removed in a future version. 1269 Use start_as_current_observation(as_type='span') instead. 1270 1271 This method creates a new child span and sets it as the current span within 1272 a context manager. It should be used with a 'with' statement to automatically 1273 manage the span's lifecycle. 1274 1275 Args: 1276 name: Name of the span (e.g., function or operation name) 1277 input: Input data for the operation 1278 output: Output data from the operation 1279 metadata: Additional metadata to associate with the span 1280 version: Version identifier for the code or component 1281 level: Importance level of the span (info, warning, error) 1282 status_message: Optional status message for the span 1283 1284 Returns: 1285 A context manager that yields a new LangfuseSpan 1286 1287 Example: 1288 ```python 1289 with langfuse.start_as_current_span(name="process-request") as parent_span: 1290 # Parent span is active here 1291 1292 # Create a child span with context management 1293 with parent_span.start_as_current_span(name="validate-input") as child_span: 1294 # Child span is active here 1295 validation_result = validate(request_data) 1296 child_span.update(output=validation_result) 1297 1298 # Back to parent span context 1299 result = process_validated_data(validation_result) 1300 parent_span.update(output=result) 1301 ``` 1302 """ 1303 warnings.warn( 1304 "start_as_current_span is deprecated and will be removed in a future version. " 1305 "Use start_as_current_observation(as_type='span') instead.", 1306 DeprecationWarning, 1307 stacklevel=2, 1308 ) 1309 return self.start_as_current_observation( 1310 name=name, 1311 as_type="span", 1312 input=input, 1313 output=output, 1314 metadata=metadata, 1315 version=version, 1316 level=level, 1317 status_message=status_message, 1318 )
[DEPRECATED] Create a new child span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='span') instead.
This method creates a new child span and sets it as the current span within a context manager. It should be used with a 'with' statement to automatically manage the span's lifecycle.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A context manager that yields a new LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-request") as parent_span: # Parent span is active here # Create a child span with context management with parent_span.start_as_current_span(name="validate-input") as child_span: # Child span is active here validation_result = validate(request_data) child_span.update(output=validation_result) # Back to parent span context result = process_validated_data(validation_result) parent_span.update(output=result)
1320 def start_generation( 1321 self, 1322 *, 1323 name: str, 1324 input: Optional[Any] = None, 1325 output: Optional[Any] = None, 1326 metadata: Optional[Any] = None, 1327 version: Optional[str] = None, 1328 level: Optional[SpanLevel] = None, 1329 status_message: Optional[str] = None, 1330 completion_start_time: Optional[datetime] = None, 1331 model: Optional[str] = None, 1332 model_parameters: Optional[Dict[str, MapValue]] = None, 1333 usage_details: Optional[Dict[str, int]] = None, 1334 cost_details: Optional[Dict[str, float]] = None, 1335 prompt: Optional[PromptClient] = None, 1336 ) -> "LangfuseGeneration": 1337 """[DEPRECATED] Create a new child generation span. 1338 1339 DEPRECATED: This method is deprecated and will be removed in a future version. 1340 Use start_observation(as_type='generation') instead. 1341 1342 This method creates a new child generation span with this span as the parent. 1343 Generation spans are specialized for AI/LLM operations and include additional 1344 fields for model information, usage stats, and costs. 1345 1346 Unlike start_as_current_generation(), this method does not set the new span 1347 as the current span in the context. 1348 1349 Args: 1350 name: Name of the generation operation 1351 input: Input data for the model (e.g., prompts) 1352 output: Output from the model (e.g., completions) 1353 metadata: Additional metadata to associate with the generation 1354 version: Version identifier for the model or component 1355 level: Importance level of the generation (info, warning, error) 1356 status_message: Optional status message for the generation 1357 completion_start_time: When the model started generating the response 1358 model: Name/identifier of the AI model used (e.g., "gpt-4") 1359 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1360 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1361 cost_details: Cost information for the model call 1362 prompt: Associated prompt template from Langfuse prompt management 1363 1364 Returns: 1365 A new LangfuseGeneration that must be ended with .end() when complete 1366 1367 Example: 1368 ```python 1369 span = langfuse.start_span(name="process-query") 1370 try: 1371 # Create a generation child span 1372 generation = span.start_generation( 1373 name="generate-answer", 1374 model="gpt-4", 1375 input={"prompt": "Explain quantum computing"} 1376 ) 1377 try: 1378 # Call model API 1379 response = llm.generate(...) 1380 1381 generation.update( 1382 output=response.text, 1383 usage_details={ 1384 "prompt_tokens": response.usage.prompt_tokens, 1385 "completion_tokens": response.usage.completion_tokens 1386 } 1387 ) 1388 finally: 1389 generation.end() 1390 1391 # Continue with parent span 1392 span.update(output={"answer": response.text, "source": "gpt-4"}) 1393 finally: 1394 span.end() 1395 ``` 1396 """ 1397 warnings.warn( 1398 "start_generation is deprecated and will be removed in a future version. " 1399 "Use start_observation(as_type='generation') instead.", 1400 DeprecationWarning, 1401 stacklevel=2, 1402 ) 1403 return self.start_observation( 1404 name=name, 1405 as_type="generation", 1406 input=input, 1407 output=output, 1408 metadata=metadata, 1409 version=version, 1410 level=level, 1411 status_message=status_message, 1412 completion_start_time=completion_start_time, 1413 model=model, 1414 model_parameters=model_parameters, 1415 usage_details=usage_details, 1416 cost_details=cost_details, 1417 prompt=prompt, 1418 )
[DEPRECATED] Create a new child generation span.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a new child generation span with this span as the parent. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Unlike start_as_current_generation(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A new LangfuseGeneration that must be ended with .end() when complete
Example:
span = langfuse.start_span(name="process-query") try: # Create a generation child span generation = span.start_generation( name="generate-answer", model="gpt-4", input={"prompt": "Explain quantum computing"} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end() # Continue with parent span span.update(output={"answer": response.text, "source": "gpt-4"}) finally: span.end()
1420 def start_as_current_generation( 1421 self, 1422 *, 1423 name: str, 1424 input: Optional[Any] = None, 1425 output: Optional[Any] = None, 1426 metadata: Optional[Any] = None, 1427 version: Optional[str] = None, 1428 level: Optional[SpanLevel] = None, 1429 status_message: Optional[str] = None, 1430 completion_start_time: Optional[datetime] = None, 1431 model: Optional[str] = None, 1432 model_parameters: Optional[Dict[str, MapValue]] = None, 1433 usage_details: Optional[Dict[str, int]] = None, 1434 cost_details: Optional[Dict[str, float]] = None, 1435 prompt: Optional[PromptClient] = None, 1436 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1437 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1438 1439 DEPRECATED: This method is deprecated and will be removed in a future version. 1440 Use start_as_current_observation(as_type='generation') instead. 1441 1442 This method creates a new child generation span and sets it as the current span 1443 within a context manager. Generation spans are specialized for AI/LLM operations 1444 and include additional fields for model information, usage stats, and costs. 1445 1446 Args: 1447 name: Name of the generation operation 1448 input: Input data for the model (e.g., prompts) 1449 output: Output from the model (e.g., completions) 1450 metadata: Additional metadata to associate with the generation 1451 version: Version identifier for the model or component 1452 level: Importance level of the generation (info, warning, error) 1453 status_message: Optional status message for the generation 1454 completion_start_time: When the model started generating the response 1455 model: Name/identifier of the AI model used (e.g., "gpt-4") 1456 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1457 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1458 cost_details: Cost information for the model call 1459 prompt: Associated prompt template from Langfuse prompt management 1460 1461 Returns: 1462 A context manager that yields a new LangfuseGeneration 1463 1464 Example: 1465 ```python 1466 with langfuse.start_as_current_span(name="process-request") as span: 1467 # Prepare data 1468 query = preprocess_user_query(user_input) 1469 1470 # Create a generation span with context management 1471 with span.start_as_current_generation( 1472 name="generate-answer", 1473 model="gpt-4", 1474 input={"query": query} 1475 ) as generation: 1476 # Generation span is active here 1477 response = llm.generate(query) 1478 1479 # Update with results 1480 generation.update( 1481 output=response.text, 1482 usage_details={ 1483 "prompt_tokens": response.usage.prompt_tokens, 1484 "completion_tokens": response.usage.completion_tokens 1485 } 1486 ) 1487 1488 # Back to parent span context 1489 span.update(output={"answer": response.text, "source": "gpt-4"}) 1490 ``` 1491 """ 1492 warnings.warn( 1493 "start_as_current_generation is deprecated and will be removed in a future version. " 1494 "Use start_as_current_observation(as_type='generation') instead.", 1495 DeprecationWarning, 1496 stacklevel=2, 1497 ) 1498 return self.start_as_current_observation( 1499 name=name, 1500 as_type="generation", 1501 input=input, 1502 output=output, 1503 metadata=metadata, 1504 version=version, 1505 level=level, 1506 status_message=status_message, 1507 completion_start_time=completion_start_time, 1508 model=model, 1509 model_parameters=model_parameters, 1510 usage_details=usage_details, 1511 cost_details=cost_details, 1512 prompt=prompt, 1513 )
[DEPRECATED] Create a new child generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a new child generation span and sets it as the current span within a context manager. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields a new LangfuseGeneration
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Prepare data query = preprocess_user_query(user_input) # Create a generation span with context management with span.start_as_current_generation( name="generate-answer", model="gpt-4", input={"query": query} ) as generation: # Generation span is active here response = llm.generate(query) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) # Back to parent span context span.update(output={"answer": response.text, "source": "gpt-4"})
1515 def create_event( 1516 self, 1517 *, 1518 name: str, 1519 input: Optional[Any] = None, 1520 output: Optional[Any] = None, 1521 metadata: Optional[Any] = None, 1522 version: Optional[str] = None, 1523 level: Optional[SpanLevel] = None, 1524 status_message: Optional[str] = None, 1525 ) -> "LangfuseEvent": 1526 """Create a new Langfuse observation of type 'EVENT'. 1527 1528 Args: 1529 name: Name of the span (e.g., function or operation name) 1530 input: Input data for the operation (can be any JSON-serializable object) 1531 output: Output data from the operation (can be any JSON-serializable object) 1532 metadata: Additional metadata to associate with the span 1533 version: Version identifier for the code or component 1534 level: Importance level of the span (info, warning, error) 1535 status_message: Optional status message for the span 1536 1537 Returns: 1538 The LangfuseEvent object 1539 1540 Example: 1541 ```python 1542 event = langfuse.create_event(name="process-event") 1543 ``` 1544 """ 1545 timestamp = time_ns() 1546 1547 with otel_trace_api.use_span(self._otel_span): 1548 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1549 name=name, start_time=timestamp 1550 ) 1551 1552 return cast( 1553 "LangfuseEvent", 1554 LangfuseEvent( 1555 otel_span=new_otel_span, 1556 langfuse_client=self._langfuse_client, 1557 input=input, 1558 output=output, 1559 metadata=metadata, 1560 environment=self._environment, 1561 version=version, 1562 level=level, 1563 status_message=status_message, 1564 ).end(end_time=timestamp), 1565 )
Create a new Langfuse observation of type 'EVENT'.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The LangfuseEvent object
Example:
event = langfuse.create_event(name="process-event")
1568class LangfuseGeneration(LangfuseObservationWrapper): 1569 """Specialized span implementation for AI model generations in Langfuse. 1570 1571 This class represents a generation span specifically designed for tracking 1572 AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized 1573 attributes for model details, token usage, and costs. 1574 """ 1575 1576 def __init__( 1577 self, 1578 *, 1579 otel_span: otel_trace_api.Span, 1580 langfuse_client: "Langfuse", 1581 input: Optional[Any] = None, 1582 output: Optional[Any] = None, 1583 metadata: Optional[Any] = None, 1584 environment: Optional[str] = None, 1585 version: Optional[str] = None, 1586 level: Optional[SpanLevel] = None, 1587 status_message: Optional[str] = None, 1588 completion_start_time: Optional[datetime] = None, 1589 model: Optional[str] = None, 1590 model_parameters: Optional[Dict[str, MapValue]] = None, 1591 usage_details: Optional[Dict[str, int]] = None, 1592 cost_details: Optional[Dict[str, float]] = None, 1593 prompt: Optional[PromptClient] = None, 1594 ): 1595 """Initialize a new LangfuseGeneration span. 1596 1597 Args: 1598 otel_span: The OpenTelemetry span to wrap 1599 langfuse_client: Reference to the parent Langfuse client 1600 input: Input data for the generation (e.g., prompts) 1601 output: Output from the generation (e.g., completions) 1602 metadata: Additional metadata to associate with the generation 1603 environment: The tracing environment 1604 version: Version identifier for the model or component 1605 level: Importance level of the generation (info, warning, error) 1606 status_message: Optional status message for the generation 1607 completion_start_time: When the model started generating the response 1608 model: Name/identifier of the AI model used (e.g., "gpt-4") 1609 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1610 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1611 cost_details: Cost information for the model call 1612 prompt: Associated prompt template from Langfuse prompt management 1613 """ 1614 super().__init__( 1615 as_type="generation", 1616 otel_span=otel_span, 1617 langfuse_client=langfuse_client, 1618 input=input, 1619 output=output, 1620 metadata=metadata, 1621 environment=environment, 1622 version=version, 1623 level=level, 1624 status_message=status_message, 1625 completion_start_time=completion_start_time, 1626 model=model, 1627 model_parameters=model_parameters, 1628 usage_details=usage_details, 1629 cost_details=cost_details, 1630 prompt=prompt, 1631 )
Specialized span implementation for AI model generations in Langfuse.
This class represents a generation span specifically designed for tracking AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized attributes for model details, token usage, and costs.
1576 def __init__( 1577 self, 1578 *, 1579 otel_span: otel_trace_api.Span, 1580 langfuse_client: "Langfuse", 1581 input: Optional[Any] = None, 1582 output: Optional[Any] = None, 1583 metadata: Optional[Any] = None, 1584 environment: Optional[str] = None, 1585 version: Optional[str] = None, 1586 level: Optional[SpanLevel] = None, 1587 status_message: Optional[str] = None, 1588 completion_start_time: Optional[datetime] = None, 1589 model: Optional[str] = None, 1590 model_parameters: Optional[Dict[str, MapValue]] = None, 1591 usage_details: Optional[Dict[str, int]] = None, 1592 cost_details: Optional[Dict[str, float]] = None, 1593 prompt: Optional[PromptClient] = None, 1594 ): 1595 """Initialize a new LangfuseGeneration span. 1596 1597 Args: 1598 otel_span: The OpenTelemetry span to wrap 1599 langfuse_client: Reference to the parent Langfuse client 1600 input: Input data for the generation (e.g., prompts) 1601 output: Output from the generation (e.g., completions) 1602 metadata: Additional metadata to associate with the generation 1603 environment: The tracing environment 1604 version: Version identifier for the model or component 1605 level: Importance level of the generation (info, warning, error) 1606 status_message: Optional status message for the generation 1607 completion_start_time: When the model started generating the response 1608 model: Name/identifier of the AI model used (e.g., "gpt-4") 1609 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1610 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1611 cost_details: Cost information for the model call 1612 prompt: Associated prompt template from Langfuse prompt management 1613 """ 1614 super().__init__( 1615 as_type="generation", 1616 otel_span=otel_span, 1617 langfuse_client=langfuse_client, 1618 input=input, 1619 output=output, 1620 metadata=metadata, 1621 environment=environment, 1622 version=version, 1623 level=level, 1624 status_message=status_message, 1625 completion_start_time=completion_start_time, 1626 model=model, 1627 model_parameters=model_parameters, 1628 usage_details=usage_details, 1629 cost_details=cost_details, 1630 prompt=prompt, 1631 )
Initialize a new LangfuseGeneration span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the generation (e.g., prompts)
- output: Output from the generation (e.g., completions)
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
1634class LangfuseEvent(LangfuseObservationWrapper): 1635 """Specialized span implementation for Langfuse Events.""" 1636 1637 def __init__( 1638 self, 1639 *, 1640 otel_span: otel_trace_api.Span, 1641 langfuse_client: "Langfuse", 1642 input: Optional[Any] = None, 1643 output: Optional[Any] = None, 1644 metadata: Optional[Any] = None, 1645 environment: Optional[str] = None, 1646 version: Optional[str] = None, 1647 level: Optional[SpanLevel] = None, 1648 status_message: Optional[str] = None, 1649 ): 1650 """Initialize a new LangfuseEvent span. 1651 1652 Args: 1653 otel_span: The OpenTelemetry span to wrap 1654 langfuse_client: Reference to the parent Langfuse client 1655 input: Input data for the event 1656 output: Output from the event 1657 metadata: Additional metadata to associate with the generation 1658 environment: The tracing environment 1659 version: Version identifier for the model or component 1660 level: Importance level of the generation (info, warning, error) 1661 status_message: Optional status message for the generation 1662 """ 1663 super().__init__( 1664 otel_span=otel_span, 1665 as_type="event", 1666 langfuse_client=langfuse_client, 1667 input=input, 1668 output=output, 1669 metadata=metadata, 1670 environment=environment, 1671 version=version, 1672 level=level, 1673 status_message=status_message, 1674 ) 1675 1676 def update( 1677 self, 1678 *, 1679 name: Optional[str] = None, 1680 input: Optional[Any] = None, 1681 output: Optional[Any] = None, 1682 metadata: Optional[Any] = None, 1683 version: Optional[str] = None, 1684 level: Optional[SpanLevel] = None, 1685 status_message: Optional[str] = None, 1686 completion_start_time: Optional[datetime] = None, 1687 model: Optional[str] = None, 1688 model_parameters: Optional[Dict[str, MapValue]] = None, 1689 usage_details: Optional[Dict[str, int]] = None, 1690 cost_details: Optional[Dict[str, float]] = None, 1691 prompt: Optional[PromptClient] = None, 1692 **kwargs: Any, 1693 ) -> "LangfuseEvent": 1694 """Update is not allowed for LangfuseEvent because events cannot be updated. 1695 1696 This method logs a warning and returns self without making changes. 1697 1698 Returns: 1699 self: Returns the unchanged LangfuseEvent instance 1700 """ 1701 langfuse_logger.warning( 1702 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1703 ) 1704 return self
Specialized span implementation for Langfuse Events.
1637 def __init__( 1638 self, 1639 *, 1640 otel_span: otel_trace_api.Span, 1641 langfuse_client: "Langfuse", 1642 input: Optional[Any] = None, 1643 output: Optional[Any] = None, 1644 metadata: Optional[Any] = None, 1645 environment: Optional[str] = None, 1646 version: Optional[str] = None, 1647 level: Optional[SpanLevel] = None, 1648 status_message: Optional[str] = None, 1649 ): 1650 """Initialize a new LangfuseEvent span. 1651 1652 Args: 1653 otel_span: The OpenTelemetry span to wrap 1654 langfuse_client: Reference to the parent Langfuse client 1655 input: Input data for the event 1656 output: Output from the event 1657 metadata: Additional metadata to associate with the generation 1658 environment: The tracing environment 1659 version: Version identifier for the model or component 1660 level: Importance level of the generation (info, warning, error) 1661 status_message: Optional status message for the generation 1662 """ 1663 super().__init__( 1664 otel_span=otel_span, 1665 as_type="event", 1666 langfuse_client=langfuse_client, 1667 input=input, 1668 output=output, 1669 metadata=metadata, 1670 environment=environment, 1671 version=version, 1672 level=level, 1673 status_message=status_message, 1674 )
Initialize a new LangfuseEvent span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the event
- output: Output from the event
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
1676 def update( 1677 self, 1678 *, 1679 name: Optional[str] = None, 1680 input: Optional[Any] = None, 1681 output: Optional[Any] = None, 1682 metadata: Optional[Any] = None, 1683 version: Optional[str] = None, 1684 level: Optional[SpanLevel] = None, 1685 status_message: Optional[str] = None, 1686 completion_start_time: Optional[datetime] = None, 1687 model: Optional[str] = None, 1688 model_parameters: Optional[Dict[str, MapValue]] = None, 1689 usage_details: Optional[Dict[str, int]] = None, 1690 cost_details: Optional[Dict[str, float]] = None, 1691 prompt: Optional[PromptClient] = None, 1692 **kwargs: Any, 1693 ) -> "LangfuseEvent": 1694 """Update is not allowed for LangfuseEvent because events cannot be updated. 1695 1696 This method logs a warning and returns self without making changes. 1697 1698 Returns: 1699 self: Returns the unchanged LangfuseEvent instance 1700 """ 1701 langfuse_logger.warning( 1702 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1703 ) 1704 return self
Update is not allowed for LangfuseEvent because events cannot be updated.
This method logs a warning and returns self without making changes.
Returns:
self: Returns the unchanged LangfuseEvent instance
28class LangfuseOtelSpanAttributes: 29 # Langfuse-Trace attributes 30 TRACE_NAME = "langfuse.trace.name" 31 TRACE_USER_ID = "user.id" 32 TRACE_SESSION_ID = "session.id" 33 TRACE_TAGS = "langfuse.trace.tags" 34 TRACE_PUBLIC = "langfuse.trace.public" 35 TRACE_METADATA = "langfuse.trace.metadata" 36 TRACE_INPUT = "langfuse.trace.input" 37 TRACE_OUTPUT = "langfuse.trace.output" 38 39 # Langfuse-observation attributes 40 OBSERVATION_TYPE = "langfuse.observation.type" 41 OBSERVATION_METADATA = "langfuse.observation.metadata" 42 OBSERVATION_LEVEL = "langfuse.observation.level" 43 OBSERVATION_STATUS_MESSAGE = "langfuse.observation.status_message" 44 OBSERVATION_INPUT = "langfuse.observation.input" 45 OBSERVATION_OUTPUT = "langfuse.observation.output" 46 47 # Langfuse-observation of type Generation attributes 48 OBSERVATION_COMPLETION_START_TIME = "langfuse.observation.completion_start_time" 49 OBSERVATION_MODEL = "langfuse.observation.model.name" 50 OBSERVATION_MODEL_PARAMETERS = "langfuse.observation.model.parameters" 51 OBSERVATION_USAGE_DETAILS = "langfuse.observation.usage_details" 52 OBSERVATION_COST_DETAILS = "langfuse.observation.cost_details" 53 OBSERVATION_PROMPT_NAME = "langfuse.observation.prompt.name" 54 OBSERVATION_PROMPT_VERSION = "langfuse.observation.prompt.version" 55 56 # General 57 ENVIRONMENT = "langfuse.environment" 58 RELEASE = "langfuse.release" 59 VERSION = "langfuse.version" 60 61 # Internal 62 AS_ROOT = "langfuse.internal.as_root"
1707class LangfuseAgent(LangfuseObservationWrapper): 1708 """Agent observation for reasoning blocks that act on tools using LLM guidance.""" 1709 1710 def __init__(self, **kwargs: Any) -> None: 1711 """Initialize a new LangfuseAgent span.""" 1712 kwargs["as_type"] = "agent" 1713 super().__init__(**kwargs)
Agent observation for reasoning blocks that act on tools using LLM guidance.
1716class LangfuseTool(LangfuseObservationWrapper): 1717 """Tool observation representing external tool calls, e.g., calling a weather API.""" 1718 1719 def __init__(self, **kwargs: Any) -> None: 1720 """Initialize a new LangfuseTool span.""" 1721 kwargs["as_type"] = "tool" 1722 super().__init__(**kwargs)
Tool observation representing external tool calls, e.g., calling a weather API.
1725class LangfuseChain(LangfuseObservationWrapper): 1726 """Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.""" 1727 1728 def __init__(self, **kwargs: Any) -> None: 1729 """Initialize a new LangfuseChain span.""" 1730 kwargs["as_type"] = "chain" 1731 super().__init__(**kwargs)
Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.
1743class LangfuseEmbedding(LangfuseObservationWrapper): 1744 """Embedding observation for LLM embedding calls, typically used before retrieval.""" 1745 1746 def __init__(self, **kwargs: Any) -> None: 1747 """Initialize a new LangfuseEmbedding span.""" 1748 kwargs["as_type"] = "embedding" 1749 super().__init__(**kwargs)
Embedding observation for LLM embedding calls, typically used before retrieval.
1752class LangfuseEvaluator(LangfuseObservationWrapper): 1753 """Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.""" 1754 1755 def __init__(self, **kwargs: Any) -> None: 1756 """Initialize a new LangfuseEvaluator span.""" 1757 kwargs["as_type"] = "evaluator" 1758 super().__init__(**kwargs)
Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.
1734class LangfuseRetriever(LangfuseObservationWrapper): 1735 """Retriever observation for data retrieval steps, e.g. vector store or database queries.""" 1736 1737 def __init__(self, **kwargs: Any) -> None: 1738 """Initialize a new LangfuseRetriever span.""" 1739 kwargs["as_type"] = "retriever" 1740 super().__init__(**kwargs)
Retriever observation for data retrieval steps, e.g. vector store or database queries.
1761class LangfuseGuardrail(LangfuseObservationWrapper): 1762 """Guardrail observation for protection e.g. against jailbreaks or offensive content.""" 1763 1764 def __init__(self, **kwargs: Any) -> None: 1765 """Initialize a new LangfuseGuardrail span.""" 1766 kwargs["as_type"] = "guardrail" 1767 super().__init__(**kwargs)
Guardrail observation for protection e.g. against jailbreaks or offensive content.
97class Evaluation: 98 """Represents an evaluation result for an experiment item or an entire experiment run. 99 100 This class provides a strongly-typed way to create evaluation results in evaluator functions. 101 Users must use keyword arguments when instantiating this class. 102 103 Attributes: 104 name: Unique identifier for the evaluation metric. Should be descriptive 105 and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). 106 Used for aggregation and comparison across experiment runs. 107 value: The evaluation score or result. Can be: 108 - Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42) 109 - String: For categorical results like "positive", "negative", "neutral" 110 - Boolean: For binary assessments like "passes_safety_check" 111 - None: When evaluation cannot be computed (missing data, API errors, etc.) 112 comment: Optional human-readable explanation of the evaluation result. 113 Useful for providing context, explaining scoring rationale, or noting 114 special conditions. Displayed in Langfuse UI for interpretability. 115 metadata: Optional structured metadata about the evaluation process. 116 Can include confidence scores, intermediate calculations, model versions, 117 or any other relevant technical details. 118 data_type: Optional score data type. Required if value is not NUMERIC. 119 One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC. 120 config_id: Optional Langfuse score config ID. 121 122 Examples: 123 Basic accuracy evaluation: 124 ```python 125 from langfuse import Evaluation 126 127 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 128 if not expected_output: 129 return Evaluation(name="accuracy", value=None, comment="No expected output") 130 131 is_correct = output.strip().lower() == expected_output.strip().lower() 132 return Evaluation( 133 name="accuracy", 134 value=1.0 if is_correct else 0.0, 135 comment="Correct answer" if is_correct else "Incorrect answer" 136 ) 137 ``` 138 139 Multi-metric evaluator: 140 ```python 141 def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): 142 return [ 143 Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), 144 Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), 145 Evaluation( 146 name="quality", 147 value=0.85, 148 comment="High quality response", 149 metadata={"confidence": 0.92, "model": "gpt-4"} 150 ) 151 ] 152 ``` 153 154 Categorical evaluation: 155 ```python 156 def sentiment_evaluator(*, input, output, **kwargs): 157 sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" 158 return Evaluation( 159 name="sentiment", 160 value=sentiment, 161 comment=f"Response expresses {sentiment} sentiment", 162 data_type="CATEGORICAL" 163 ) 164 ``` 165 166 Failed evaluation with error handling: 167 ```python 168 def external_api_evaluator(*, input, output, **kwargs): 169 try: 170 score = external_api.evaluate(output) 171 return Evaluation(name="external_score", value=score) 172 except Exception as e: 173 return Evaluation( 174 name="external_score", 175 value=None, 176 comment=f"API unavailable: {e}", 177 metadata={"error": str(e), "retry_count": 3} 178 ) 179 ``` 180 181 Note: 182 All arguments must be passed as keywords. Positional arguments are not allowed 183 to ensure code clarity and prevent errors from argument reordering. 184 """ 185 186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Represents an evaluation result for an experiment item or an entire experiment run.
This class provides a strongly-typed way to create evaluation results in evaluator functions. Users must use keyword arguments when instantiating this class.
Attributes:
- name: Unique identifier for the evaluation metric. Should be descriptive and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). Used for aggregation and comparison across experiment runs.
- value:  The evaluation score or result. Can be:
- Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42)
- String: For categorical results like "positive", "negative", "neutral"
- Boolean: For binary assessments like "passes_safety_check"
- None: When evaluation cannot be computed (missing data, API errors, etc.)
 
- comment: Optional human-readable explanation of the evaluation result. Useful for providing context, explaining scoring rationale, or noting special conditions. Displayed in Langfuse UI for interpretability.
- metadata: Optional structured metadata about the evaluation process. Can include confidence scores, intermediate calculations, model versions, or any other relevant technical details.
- data_type: Optional score data type. Required if value is not NUMERIC. One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC.
- config_id: Optional Langfuse score config ID.
Examples:
Basic accuracy evaluation:
from langfuse import Evaluation def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if not expected_output: return Evaluation(name="accuracy", value=None, comment="No expected output") is_correct = output.strip().lower() == expected_output.strip().lower() return Evaluation( name="accuracy", value=1.0 if is_correct else 0.0, comment="Correct answer" if is_correct else "Incorrect answer" )Multi-metric evaluator:
def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): return [ Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), Evaluation( name="quality", value=0.85, comment="High quality response", metadata={"confidence": 0.92, "model": "gpt-4"} ) ]Categorical evaluation:
def sentiment_evaluator(*, input, output, **kwargs): sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" return Evaluation( name="sentiment", value=sentiment, comment=f"Response expresses {sentiment} sentiment", data_type="CATEGORICAL" )Failed evaluation with error handling:
def external_api_evaluator(*, input, output, **kwargs): try: score = external_api.evaluate(output) return Evaluation(name="external_score", value=score) except Exception as e: return Evaluation( name="external_score", value=None, comment=f"API unavailable: {e}", metadata={"error": str(e), "retry_count": 3} )
Note:
All arguments must be passed as keywords. Positional arguments are not allowed to ensure code clarity and prevent errors from argument reordering.
186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Initialize an Evaluation with the provided data.
Arguments:
- name: Unique identifier for the evaluation metric.
- value: The evaluation score or result.
- comment: Optional human-readable explanation of the result.
- metadata: Optional structured metadata about the evaluation process.
- data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN).
- config_id: Optional Langfuse score config ID.
Note:
All arguments must be provided as keywords. Positional arguments will raise a TypeError.