langfuse
Langfuse Python SDK
Installation
The SDK was rewritten in v3 and released in June 2025. Refer to the v3 migration guide for instructions on updating your code.
pip install langfuse
Docs
Please see our docs for detailed information on this SDK.
1""".. include:: ../README.md""" 2 3from langfuse.experiment import Evaluation 4 5from ._client import client as _client_module 6from ._client.attributes import LangfuseOtelSpanAttributes 7from ._client.constants import ObservationTypeLiteral 8from ._client.get_client import get_client 9from ._client.observe import observe 10from ._client.span import ( 11 LangfuseAgent, 12 LangfuseChain, 13 LangfuseEmbedding, 14 LangfuseEvaluator, 15 LangfuseEvent, 16 LangfuseGeneration, 17 LangfuseGuardrail, 18 LangfuseRetriever, 19 LangfuseSpan, 20 LangfuseTool, 21) 22 23Langfuse = _client_module.Langfuse 24 25__all__ = [ 26 "Langfuse", 27 "get_client", 28 "observe", 29 "ObservationTypeLiteral", 30 "LangfuseSpan", 31 "LangfuseGeneration", 32 "LangfuseEvent", 33 "LangfuseOtelSpanAttributes", 34 "LangfuseAgent", 35 "LangfuseTool", 36 "LangfuseChain", 37 "LangfuseEmbedding", 38 "LangfuseEvaluator", 39 "LangfuseRetriever", 40 "LangfuseGuardrail", 41 "Evaluation", 42 "experiment", 43 "api", 44]
116class Langfuse: 117 """Main client for Langfuse tracing and platform features. 118 119 This class provides an interface for creating and managing traces, spans, 120 and generations in Langfuse as well as interacting with the Langfuse API. 121 122 The client features a thread-safe singleton pattern for each unique public API key, 123 ensuring consistent trace context propagation across your application. It implements 124 efficient batching of spans with configurable flush settings and includes background 125 thread management for media uploads and score ingestion. 126 127 Configuration is flexible through either direct parameters or environment variables, 128 with graceful fallbacks and runtime configuration updates. 129 130 Attributes: 131 api: Synchronous API client for Langfuse backend communication 132 async_api: Asynchronous API client for Langfuse backend communication 133 _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components 134 135 Parameters: 136 public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable. 137 secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable. 138 host (Optional[str]): The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_HOST environment variable. 139 timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. 140 httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. 141 debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. 142 tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. 143 flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. 144 flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable. 145 environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. 146 release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release. 147 media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. 148 sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. 149 mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. 150 blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (`metadata.scope.name`) 151 additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. 152 tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees. 153 154 Example: 155 ```python 156 from langfuse.otel import Langfuse 157 158 # Initialize the client (reads from env vars if not provided) 159 langfuse = Langfuse( 160 public_key="your-public-key", 161 secret_key="your-secret-key", 162 host="https://cloud.langfuse.com", # Optional, default shown 163 ) 164 165 # Create a trace span 166 with langfuse.start_as_current_span(name="process-query") as span: 167 # Your application code here 168 169 # Create a nested generation span for an LLM call 170 with span.start_as_current_generation( 171 name="generate-response", 172 model="gpt-4", 173 input={"query": "Tell me about AI"}, 174 model_parameters={"temperature": 0.7, "max_tokens": 500} 175 ) as generation: 176 # Generate response here 177 response = "AI is a field of computer science..." 178 179 generation.update( 180 output=response, 181 usage_details={"prompt_tokens": 10, "completion_tokens": 50}, 182 cost_details={"total_cost": 0.0023} 183 ) 184 185 # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) 186 generation.score(name="relevance", value=0.95, data_type="NUMERIC") 187 ``` 188 """ 189 190 _resources: Optional[LangfuseResourceManager] = None 191 _mask: Optional[MaskFunction] = None 192 _otel_tracer: otel_trace_api.Tracer 193 194 def __init__( 195 self, 196 *, 197 public_key: Optional[str] = None, 198 secret_key: Optional[str] = None, 199 host: Optional[str] = None, 200 timeout: Optional[int] = None, 201 httpx_client: Optional[httpx.Client] = None, 202 debug: bool = False, 203 tracing_enabled: Optional[bool] = True, 204 flush_at: Optional[int] = None, 205 flush_interval: Optional[float] = None, 206 environment: Optional[str] = None, 207 release: Optional[str] = None, 208 media_upload_thread_count: Optional[int] = None, 209 sample_rate: Optional[float] = None, 210 mask: Optional[MaskFunction] = None, 211 blocked_instrumentation_scopes: Optional[List[str]] = None, 212 additional_headers: Optional[Dict[str, str]] = None, 213 tracer_provider: Optional[TracerProvider] = None, 214 ): 215 self._host = host or cast( 216 str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 217 ) 218 self._environment = environment or cast( 219 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 220 ) 221 self._project_id: Optional[str] = None 222 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 223 if not 0.0 <= sample_rate <= 1.0: 224 raise ValueError( 225 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 226 ) 227 228 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 229 230 self._tracing_enabled = ( 231 tracing_enabled 232 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 233 ) 234 if not self._tracing_enabled: 235 langfuse_logger.info( 236 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 237 ) 238 239 debug = ( 240 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 241 ) 242 if debug: 243 logging.basicConfig( 244 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 245 ) 246 langfuse_logger.setLevel(logging.DEBUG) 247 248 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 249 if public_key is None: 250 langfuse_logger.warning( 251 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 252 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 253 ) 254 self._otel_tracer = otel_trace_api.NoOpTracer() 255 return 256 257 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 258 if secret_key is None: 259 langfuse_logger.warning( 260 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 261 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 262 ) 263 self._otel_tracer = otel_trace_api.NoOpTracer() 264 return 265 266 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 267 langfuse_logger.warning( 268 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 269 ) 270 271 # Initialize api and tracer if requirements are met 272 self._resources = LangfuseResourceManager( 273 public_key=public_key, 274 secret_key=secret_key, 275 host=self._host, 276 timeout=timeout, 277 environment=self._environment, 278 release=release, 279 flush_at=flush_at, 280 flush_interval=flush_interval, 281 httpx_client=httpx_client, 282 media_upload_thread_count=media_upload_thread_count, 283 sample_rate=sample_rate, 284 mask=mask, 285 tracing_enabled=self._tracing_enabled, 286 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 287 additional_headers=additional_headers, 288 tracer_provider=tracer_provider, 289 ) 290 self._mask = self._resources.mask 291 292 self._otel_tracer = ( 293 self._resources.tracer 294 if self._tracing_enabled and self._resources.tracer is not None 295 else otel_trace_api.NoOpTracer() 296 ) 297 self.api = self._resources.api 298 self.async_api = self._resources.async_api 299 300 def start_span( 301 self, 302 *, 303 trace_context: Optional[TraceContext] = None, 304 name: str, 305 input: Optional[Any] = None, 306 output: Optional[Any] = None, 307 metadata: Optional[Any] = None, 308 version: Optional[str] = None, 309 level: Optional[SpanLevel] = None, 310 status_message: Optional[str] = None, 311 ) -> LangfuseSpan: 312 """Create a new span for tracing a unit of work. 313 314 This method creates a new span but does not set it as the current span in the 315 context. To create and use a span within a context, use start_as_current_span(). 316 317 The created span will be the child of the current span in the context. 318 319 Args: 320 trace_context: Optional context for connecting to an existing trace 321 name: Name of the span (e.g., function or operation name) 322 input: Input data for the operation (can be any JSON-serializable object) 323 output: Output data from the operation (can be any JSON-serializable object) 324 metadata: Additional metadata to associate with the span 325 version: Version identifier for the code or component 326 level: Importance level of the span (info, warning, error) 327 status_message: Optional status message for the span 328 329 Returns: 330 A LangfuseSpan object that must be ended with .end() when the operation completes 331 332 Example: 333 ```python 334 span = langfuse.start_span(name="process-data") 335 try: 336 # Do work 337 span.update(output="result") 338 finally: 339 span.end() 340 ``` 341 """ 342 return self.start_observation( 343 trace_context=trace_context, 344 name=name, 345 as_type="span", 346 input=input, 347 output=output, 348 metadata=metadata, 349 version=version, 350 level=level, 351 status_message=status_message, 352 ) 353 354 def start_as_current_span( 355 self, 356 *, 357 trace_context: Optional[TraceContext] = None, 358 name: str, 359 input: Optional[Any] = None, 360 output: Optional[Any] = None, 361 metadata: Optional[Any] = None, 362 version: Optional[str] = None, 363 level: Optional[SpanLevel] = None, 364 status_message: Optional[str] = None, 365 end_on_exit: Optional[bool] = None, 366 ) -> _AgnosticContextManager[LangfuseSpan]: 367 """Create a new span and set it as the current span in a context manager. 368 369 This method creates a new span and sets it as the current span within a context 370 manager. Use this method with a 'with' statement to automatically handle span 371 lifecycle within a code block. 372 373 The created span will be the child of the current span in the context. 374 375 Args: 376 trace_context: Optional context for connecting to an existing trace 377 name: Name of the span (e.g., function or operation name) 378 input: Input data for the operation (can be any JSON-serializable object) 379 output: Output data from the operation (can be any JSON-serializable object) 380 metadata: Additional metadata to associate with the span 381 version: Version identifier for the code or component 382 level: Importance level of the span (info, warning, error) 383 status_message: Optional status message for the span 384 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 385 386 Returns: 387 A context manager that yields a LangfuseSpan 388 389 Example: 390 ```python 391 with langfuse.start_as_current_span(name="process-query") as span: 392 # Do work 393 result = process_data() 394 span.update(output=result) 395 396 # Create a child span automatically 397 with span.start_as_current_span(name="sub-operation") as child_span: 398 # Do sub-operation work 399 child_span.update(output="sub-result") 400 ``` 401 """ 402 return self.start_as_current_observation( 403 trace_context=trace_context, 404 name=name, 405 as_type="span", 406 input=input, 407 output=output, 408 metadata=metadata, 409 version=version, 410 level=level, 411 status_message=status_message, 412 end_on_exit=end_on_exit, 413 ) 414 415 @overload 416 def start_observation( 417 self, 418 *, 419 trace_context: Optional[TraceContext] = None, 420 name: str, 421 as_type: Literal["generation"], 422 input: Optional[Any] = None, 423 output: Optional[Any] = None, 424 metadata: Optional[Any] = None, 425 version: Optional[str] = None, 426 level: Optional[SpanLevel] = None, 427 status_message: Optional[str] = None, 428 completion_start_time: Optional[datetime] = None, 429 model: Optional[str] = None, 430 model_parameters: Optional[Dict[str, MapValue]] = None, 431 usage_details: Optional[Dict[str, int]] = None, 432 cost_details: Optional[Dict[str, float]] = None, 433 prompt: Optional[PromptClient] = None, 434 ) -> LangfuseGeneration: ... 435 436 @overload 437 def start_observation( 438 self, 439 *, 440 trace_context: Optional[TraceContext] = None, 441 name: str, 442 as_type: Literal["span"] = "span", 443 input: Optional[Any] = None, 444 output: Optional[Any] = None, 445 metadata: Optional[Any] = None, 446 version: Optional[str] = None, 447 level: Optional[SpanLevel] = None, 448 status_message: Optional[str] = None, 449 ) -> LangfuseSpan: ... 450 451 @overload 452 def start_observation( 453 self, 454 *, 455 trace_context: Optional[TraceContext] = None, 456 name: str, 457 as_type: Literal["agent"], 458 input: Optional[Any] = None, 459 output: Optional[Any] = None, 460 metadata: Optional[Any] = None, 461 version: Optional[str] = None, 462 level: Optional[SpanLevel] = None, 463 status_message: Optional[str] = None, 464 ) -> LangfuseAgent: ... 465 466 @overload 467 def start_observation( 468 self, 469 *, 470 trace_context: Optional[TraceContext] = None, 471 name: str, 472 as_type: Literal["tool"], 473 input: Optional[Any] = None, 474 output: Optional[Any] = None, 475 metadata: Optional[Any] = None, 476 version: Optional[str] = None, 477 level: Optional[SpanLevel] = None, 478 status_message: Optional[str] = None, 479 ) -> LangfuseTool: ... 480 481 @overload 482 def start_observation( 483 self, 484 *, 485 trace_context: Optional[TraceContext] = None, 486 name: str, 487 as_type: Literal["chain"], 488 input: Optional[Any] = None, 489 output: Optional[Any] = None, 490 metadata: Optional[Any] = None, 491 version: Optional[str] = None, 492 level: Optional[SpanLevel] = None, 493 status_message: Optional[str] = None, 494 ) -> LangfuseChain: ... 495 496 @overload 497 def start_observation( 498 self, 499 *, 500 trace_context: Optional[TraceContext] = None, 501 name: str, 502 as_type: Literal["retriever"], 503 input: Optional[Any] = None, 504 output: Optional[Any] = None, 505 metadata: Optional[Any] = None, 506 version: Optional[str] = None, 507 level: Optional[SpanLevel] = None, 508 status_message: Optional[str] = None, 509 ) -> LangfuseRetriever: ... 510 511 @overload 512 def start_observation( 513 self, 514 *, 515 trace_context: Optional[TraceContext] = None, 516 name: str, 517 as_type: Literal["evaluator"], 518 input: Optional[Any] = None, 519 output: Optional[Any] = None, 520 metadata: Optional[Any] = None, 521 version: Optional[str] = None, 522 level: Optional[SpanLevel] = None, 523 status_message: Optional[str] = None, 524 ) -> LangfuseEvaluator: ... 525 526 @overload 527 def start_observation( 528 self, 529 *, 530 trace_context: Optional[TraceContext] = None, 531 name: str, 532 as_type: Literal["embedding"], 533 input: Optional[Any] = None, 534 output: Optional[Any] = None, 535 metadata: Optional[Any] = None, 536 version: Optional[str] = None, 537 level: Optional[SpanLevel] = None, 538 status_message: Optional[str] = None, 539 completion_start_time: Optional[datetime] = None, 540 model: Optional[str] = None, 541 model_parameters: Optional[Dict[str, MapValue]] = None, 542 usage_details: Optional[Dict[str, int]] = None, 543 cost_details: Optional[Dict[str, float]] = None, 544 prompt: Optional[PromptClient] = None, 545 ) -> LangfuseEmbedding: ... 546 547 @overload 548 def start_observation( 549 self, 550 *, 551 trace_context: Optional[TraceContext] = None, 552 name: str, 553 as_type: Literal["guardrail"], 554 input: Optional[Any] = None, 555 output: Optional[Any] = None, 556 metadata: Optional[Any] = None, 557 version: Optional[str] = None, 558 level: Optional[SpanLevel] = None, 559 status_message: Optional[str] = None, 560 ) -> LangfuseGuardrail: ... 561 562 def start_observation( 563 self, 564 *, 565 trace_context: Optional[TraceContext] = None, 566 name: str, 567 as_type: ObservationTypeLiteralNoEvent = "span", 568 input: Optional[Any] = None, 569 output: Optional[Any] = None, 570 metadata: Optional[Any] = None, 571 version: Optional[str] = None, 572 level: Optional[SpanLevel] = None, 573 status_message: Optional[str] = None, 574 completion_start_time: Optional[datetime] = None, 575 model: Optional[str] = None, 576 model_parameters: Optional[Dict[str, MapValue]] = None, 577 usage_details: Optional[Dict[str, int]] = None, 578 cost_details: Optional[Dict[str, float]] = None, 579 prompt: Optional[PromptClient] = None, 580 ) -> Union[ 581 LangfuseSpan, 582 LangfuseGeneration, 583 LangfuseAgent, 584 LangfuseTool, 585 LangfuseChain, 586 LangfuseRetriever, 587 LangfuseEvaluator, 588 LangfuseEmbedding, 589 LangfuseGuardrail, 590 ]: 591 """Create a new observation of the specified type. 592 593 This method creates a new observation but does not set it as the current span in the 594 context. To create and use an observation within a context, use start_as_current_observation(). 595 596 Args: 597 trace_context: Optional context for connecting to an existing trace 598 name: Name of the observation 599 as_type: Type of observation to create (defaults to "span") 600 input: Input data for the operation 601 output: Output data from the operation 602 metadata: Additional metadata to associate with the observation 603 version: Version identifier for the code or component 604 level: Importance level of the observation 605 status_message: Optional status message for the observation 606 completion_start_time: When the model started generating (for generation types) 607 model: Name/identifier of the AI model used (for generation types) 608 model_parameters: Parameters used for the model (for generation types) 609 usage_details: Token usage information (for generation types) 610 cost_details: Cost information (for generation types) 611 prompt: Associated prompt template (for generation types) 612 613 Returns: 614 An observation object of the appropriate type that must be ended with .end() 615 """ 616 if trace_context: 617 trace_id = trace_context.get("trace_id", None) 618 parent_span_id = trace_context.get("parent_span_id", None) 619 620 if trace_id: 621 remote_parent_span = self._create_remote_parent_span( 622 trace_id=trace_id, parent_span_id=parent_span_id 623 ) 624 625 with otel_trace_api.use_span( 626 cast(otel_trace_api.Span, remote_parent_span) 627 ): 628 otel_span = self._otel_tracer.start_span(name=name) 629 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 630 631 return self._create_observation_from_otel_span( 632 otel_span=otel_span, 633 as_type=as_type, 634 input=input, 635 output=output, 636 metadata=metadata, 637 version=version, 638 level=level, 639 status_message=status_message, 640 completion_start_time=completion_start_time, 641 model=model, 642 model_parameters=model_parameters, 643 usage_details=usage_details, 644 cost_details=cost_details, 645 prompt=prompt, 646 ) 647 648 otel_span = self._otel_tracer.start_span(name=name) 649 650 return self._create_observation_from_otel_span( 651 otel_span=otel_span, 652 as_type=as_type, 653 input=input, 654 output=output, 655 metadata=metadata, 656 version=version, 657 level=level, 658 status_message=status_message, 659 completion_start_time=completion_start_time, 660 model=model, 661 model_parameters=model_parameters, 662 usage_details=usage_details, 663 cost_details=cost_details, 664 prompt=prompt, 665 ) 666 667 def _create_observation_from_otel_span( 668 self, 669 *, 670 otel_span: otel_trace_api.Span, 671 as_type: ObservationTypeLiteralNoEvent, 672 input: Optional[Any] = None, 673 output: Optional[Any] = None, 674 metadata: Optional[Any] = None, 675 version: Optional[str] = None, 676 level: Optional[SpanLevel] = None, 677 status_message: Optional[str] = None, 678 completion_start_time: Optional[datetime] = None, 679 model: Optional[str] = None, 680 model_parameters: Optional[Dict[str, MapValue]] = None, 681 usage_details: Optional[Dict[str, int]] = None, 682 cost_details: Optional[Dict[str, float]] = None, 683 prompt: Optional[PromptClient] = None, 684 ) -> Union[ 685 LangfuseSpan, 686 LangfuseGeneration, 687 LangfuseAgent, 688 LangfuseTool, 689 LangfuseChain, 690 LangfuseRetriever, 691 LangfuseEvaluator, 692 LangfuseEmbedding, 693 LangfuseGuardrail, 694 ]: 695 """Create the appropriate observation type from an OTEL span.""" 696 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 697 observation_class = self._get_span_class(as_type) 698 # Type ignore to prevent overloads of internal _get_span_class function, 699 # issue is that LangfuseEvent could be returned and that classes have diff. args 700 return observation_class( # type: ignore[return-value,call-arg] 701 otel_span=otel_span, 702 langfuse_client=self, 703 environment=self._environment, 704 input=input, 705 output=output, 706 metadata=metadata, 707 version=version, 708 level=level, 709 status_message=status_message, 710 completion_start_time=completion_start_time, 711 model=model, 712 model_parameters=model_parameters, 713 usage_details=usage_details, 714 cost_details=cost_details, 715 prompt=prompt, 716 ) 717 else: 718 # For other types (e.g. span, guardrail), create appropriate class without generation properties 719 observation_class = self._get_span_class(as_type) 720 # Type ignore to prevent overloads of internal _get_span_class function, 721 # issue is that LangfuseEvent could be returned and that classes have diff. args 722 return observation_class( # type: ignore[return-value,call-arg] 723 otel_span=otel_span, 724 langfuse_client=self, 725 environment=self._environment, 726 input=input, 727 output=output, 728 metadata=metadata, 729 version=version, 730 level=level, 731 status_message=status_message, 732 ) 733 # span._observation_type = as_type 734 # span._otel_span.set_attribute("langfuse.observation.type", as_type) 735 # return span 736 737 def start_generation( 738 self, 739 *, 740 trace_context: Optional[TraceContext] = None, 741 name: str, 742 input: Optional[Any] = None, 743 output: Optional[Any] = None, 744 metadata: Optional[Any] = None, 745 version: Optional[str] = None, 746 level: Optional[SpanLevel] = None, 747 status_message: Optional[str] = None, 748 completion_start_time: Optional[datetime] = None, 749 model: Optional[str] = None, 750 model_parameters: Optional[Dict[str, MapValue]] = None, 751 usage_details: Optional[Dict[str, int]] = None, 752 cost_details: Optional[Dict[str, float]] = None, 753 prompt: Optional[PromptClient] = None, 754 ) -> LangfuseGeneration: 755 """Create a new generation span for model generations. 756 757 DEPRECATED: This method is deprecated and will be removed in a future version. 758 Use start_observation(as_type='generation') instead. 759 760 This method creates a specialized span for tracking model generations. 761 It includes additional fields specific to model generations such as model name, 762 token usage, and cost details. 763 764 The created generation span will be the child of the current span in the context. 765 766 Args: 767 trace_context: Optional context for connecting to an existing trace 768 name: Name of the generation operation 769 input: Input data for the model (e.g., prompts) 770 output: Output from the model (e.g., completions) 771 metadata: Additional metadata to associate with the generation 772 version: Version identifier for the model or component 773 level: Importance level of the generation (info, warning, error) 774 status_message: Optional status message for the generation 775 completion_start_time: When the model started generating the response 776 model: Name/identifier of the AI model used (e.g., "gpt-4") 777 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 778 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 779 cost_details: Cost information for the model call 780 prompt: Associated prompt template from Langfuse prompt management 781 782 Returns: 783 A LangfuseGeneration object that must be ended with .end() when complete 784 785 Example: 786 ```python 787 generation = langfuse.start_generation( 788 name="answer-generation", 789 model="gpt-4", 790 input={"prompt": "Explain quantum computing"}, 791 model_parameters={"temperature": 0.7} 792 ) 793 try: 794 # Call model API 795 response = llm.generate(...) 796 797 generation.update( 798 output=response.text, 799 usage_details={ 800 "prompt_tokens": response.usage.prompt_tokens, 801 "completion_tokens": response.usage.completion_tokens 802 } 803 ) 804 finally: 805 generation.end() 806 ``` 807 """ 808 warnings.warn( 809 "start_generation is deprecated and will be removed in a future version. " 810 "Use start_observation(as_type='generation') instead.", 811 DeprecationWarning, 812 stacklevel=2, 813 ) 814 return self.start_observation( 815 trace_context=trace_context, 816 name=name, 817 as_type="generation", 818 input=input, 819 output=output, 820 metadata=metadata, 821 version=version, 822 level=level, 823 status_message=status_message, 824 completion_start_time=completion_start_time, 825 model=model, 826 model_parameters=model_parameters, 827 usage_details=usage_details, 828 cost_details=cost_details, 829 prompt=prompt, 830 ) 831 832 def start_as_current_generation( 833 self, 834 *, 835 trace_context: Optional[TraceContext] = None, 836 name: str, 837 input: Optional[Any] = None, 838 output: Optional[Any] = None, 839 metadata: Optional[Any] = None, 840 version: Optional[str] = None, 841 level: Optional[SpanLevel] = None, 842 status_message: Optional[str] = None, 843 completion_start_time: Optional[datetime] = None, 844 model: Optional[str] = None, 845 model_parameters: Optional[Dict[str, MapValue]] = None, 846 usage_details: Optional[Dict[str, int]] = None, 847 cost_details: Optional[Dict[str, float]] = None, 848 prompt: Optional[PromptClient] = None, 849 end_on_exit: Optional[bool] = None, 850 ) -> _AgnosticContextManager[LangfuseGeneration]: 851 """Create a new generation span and set it as the current span in a context manager. 852 853 DEPRECATED: This method is deprecated and will be removed in a future version. 854 Use start_as_current_observation(as_type='generation') instead. 855 856 This method creates a specialized span for model generations and sets it as the 857 current span within a context manager. Use this method with a 'with' statement to 858 automatically handle the generation span lifecycle within a code block. 859 860 The created generation span will be the child of the current span in the context. 861 862 Args: 863 trace_context: Optional context for connecting to an existing trace 864 name: Name of the generation operation 865 input: Input data for the model (e.g., prompts) 866 output: Output from the model (e.g., completions) 867 metadata: Additional metadata to associate with the generation 868 version: Version identifier for the model or component 869 level: Importance level of the generation (info, warning, error) 870 status_message: Optional status message for the generation 871 completion_start_time: When the model started generating the response 872 model: Name/identifier of the AI model used (e.g., "gpt-4") 873 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 874 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 875 cost_details: Cost information for the model call 876 prompt: Associated prompt template from Langfuse prompt management 877 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 878 879 Returns: 880 A context manager that yields a LangfuseGeneration 881 882 Example: 883 ```python 884 with langfuse.start_as_current_generation( 885 name="answer-generation", 886 model="gpt-4", 887 input={"prompt": "Explain quantum computing"} 888 ) as generation: 889 # Call model API 890 response = llm.generate(...) 891 892 # Update with results 893 generation.update( 894 output=response.text, 895 usage_details={ 896 "prompt_tokens": response.usage.prompt_tokens, 897 "completion_tokens": response.usage.completion_tokens 898 } 899 ) 900 ``` 901 """ 902 warnings.warn( 903 "start_as_current_generation is deprecated and will be removed in a future version. " 904 "Use start_as_current_observation(as_type='generation') instead.", 905 DeprecationWarning, 906 stacklevel=2, 907 ) 908 return self.start_as_current_observation( 909 trace_context=trace_context, 910 name=name, 911 as_type="generation", 912 input=input, 913 output=output, 914 metadata=metadata, 915 version=version, 916 level=level, 917 status_message=status_message, 918 completion_start_time=completion_start_time, 919 model=model, 920 model_parameters=model_parameters, 921 usage_details=usage_details, 922 cost_details=cost_details, 923 prompt=prompt, 924 end_on_exit=end_on_exit, 925 ) 926 927 @overload 928 def start_as_current_observation( 929 self, 930 *, 931 trace_context: Optional[TraceContext] = None, 932 name: str, 933 as_type: Literal["generation"], 934 input: Optional[Any] = None, 935 output: Optional[Any] = None, 936 metadata: Optional[Any] = None, 937 version: Optional[str] = None, 938 level: Optional[SpanLevel] = None, 939 status_message: Optional[str] = None, 940 completion_start_time: Optional[datetime] = None, 941 model: Optional[str] = None, 942 model_parameters: Optional[Dict[str, MapValue]] = None, 943 usage_details: Optional[Dict[str, int]] = None, 944 cost_details: Optional[Dict[str, float]] = None, 945 prompt: Optional[PromptClient] = None, 946 end_on_exit: Optional[bool] = None, 947 ) -> _AgnosticContextManager[LangfuseGeneration]: ... 948 949 @overload 950 def start_as_current_observation( 951 self, 952 *, 953 trace_context: Optional[TraceContext] = None, 954 name: str, 955 as_type: Literal["span"] = "span", 956 input: Optional[Any] = None, 957 output: Optional[Any] = None, 958 metadata: Optional[Any] = None, 959 version: Optional[str] = None, 960 level: Optional[SpanLevel] = None, 961 status_message: Optional[str] = None, 962 end_on_exit: Optional[bool] = None, 963 ) -> _AgnosticContextManager[LangfuseSpan]: ... 964 965 @overload 966 def start_as_current_observation( 967 self, 968 *, 969 trace_context: Optional[TraceContext] = None, 970 name: str, 971 as_type: Literal["agent"], 972 input: Optional[Any] = None, 973 output: Optional[Any] = None, 974 metadata: Optional[Any] = None, 975 version: Optional[str] = None, 976 level: Optional[SpanLevel] = None, 977 status_message: Optional[str] = None, 978 end_on_exit: Optional[bool] = None, 979 ) -> _AgnosticContextManager[LangfuseAgent]: ... 980 981 @overload 982 def start_as_current_observation( 983 self, 984 *, 985 trace_context: Optional[TraceContext] = None, 986 name: str, 987 as_type: Literal["tool"], 988 input: Optional[Any] = None, 989 output: Optional[Any] = None, 990 metadata: Optional[Any] = None, 991 version: Optional[str] = None, 992 level: Optional[SpanLevel] = None, 993 status_message: Optional[str] = None, 994 end_on_exit: Optional[bool] = None, 995 ) -> _AgnosticContextManager[LangfuseTool]: ... 996 997 @overload 998 def start_as_current_observation( 999 self, 1000 *, 1001 trace_context: Optional[TraceContext] = None, 1002 name: str, 1003 as_type: Literal["chain"], 1004 input: Optional[Any] = None, 1005 output: Optional[Any] = None, 1006 metadata: Optional[Any] = None, 1007 version: Optional[str] = None, 1008 level: Optional[SpanLevel] = None, 1009 status_message: Optional[str] = None, 1010 end_on_exit: Optional[bool] = None, 1011 ) -> _AgnosticContextManager[LangfuseChain]: ... 1012 1013 @overload 1014 def start_as_current_observation( 1015 self, 1016 *, 1017 trace_context: Optional[TraceContext] = None, 1018 name: str, 1019 as_type: Literal["retriever"], 1020 input: Optional[Any] = None, 1021 output: Optional[Any] = None, 1022 metadata: Optional[Any] = None, 1023 version: Optional[str] = None, 1024 level: Optional[SpanLevel] = None, 1025 status_message: Optional[str] = None, 1026 end_on_exit: Optional[bool] = None, 1027 ) -> _AgnosticContextManager[LangfuseRetriever]: ... 1028 1029 @overload 1030 def start_as_current_observation( 1031 self, 1032 *, 1033 trace_context: Optional[TraceContext] = None, 1034 name: str, 1035 as_type: Literal["evaluator"], 1036 input: Optional[Any] = None, 1037 output: Optional[Any] = None, 1038 metadata: Optional[Any] = None, 1039 version: Optional[str] = None, 1040 level: Optional[SpanLevel] = None, 1041 status_message: Optional[str] = None, 1042 end_on_exit: Optional[bool] = None, 1043 ) -> _AgnosticContextManager[LangfuseEvaluator]: ... 1044 1045 @overload 1046 def start_as_current_observation( 1047 self, 1048 *, 1049 trace_context: Optional[TraceContext] = None, 1050 name: str, 1051 as_type: Literal["embedding"], 1052 input: Optional[Any] = None, 1053 output: Optional[Any] = None, 1054 metadata: Optional[Any] = None, 1055 version: Optional[str] = None, 1056 level: Optional[SpanLevel] = None, 1057 status_message: Optional[str] = None, 1058 completion_start_time: Optional[datetime] = None, 1059 model: Optional[str] = None, 1060 model_parameters: Optional[Dict[str, MapValue]] = None, 1061 usage_details: Optional[Dict[str, int]] = None, 1062 cost_details: Optional[Dict[str, float]] = None, 1063 prompt: Optional[PromptClient] = None, 1064 end_on_exit: Optional[bool] = None, 1065 ) -> _AgnosticContextManager[LangfuseEmbedding]: ... 1066 1067 @overload 1068 def start_as_current_observation( 1069 self, 1070 *, 1071 trace_context: Optional[TraceContext] = None, 1072 name: str, 1073 as_type: Literal["guardrail"], 1074 input: Optional[Any] = None, 1075 output: Optional[Any] = None, 1076 metadata: Optional[Any] = None, 1077 version: Optional[str] = None, 1078 level: Optional[SpanLevel] = None, 1079 status_message: Optional[str] = None, 1080 end_on_exit: Optional[bool] = None, 1081 ) -> _AgnosticContextManager[LangfuseGuardrail]: ... 1082 1083 def start_as_current_observation( 1084 self, 1085 *, 1086 trace_context: Optional[TraceContext] = None, 1087 name: str, 1088 as_type: ObservationTypeLiteralNoEvent = "span", 1089 input: Optional[Any] = None, 1090 output: Optional[Any] = None, 1091 metadata: Optional[Any] = None, 1092 version: Optional[str] = None, 1093 level: Optional[SpanLevel] = None, 1094 status_message: Optional[str] = None, 1095 completion_start_time: Optional[datetime] = None, 1096 model: Optional[str] = None, 1097 model_parameters: Optional[Dict[str, MapValue]] = None, 1098 usage_details: Optional[Dict[str, int]] = None, 1099 cost_details: Optional[Dict[str, float]] = None, 1100 prompt: Optional[PromptClient] = None, 1101 end_on_exit: Optional[bool] = None, 1102 ) -> Union[ 1103 _AgnosticContextManager[LangfuseGeneration], 1104 _AgnosticContextManager[LangfuseSpan], 1105 _AgnosticContextManager[LangfuseAgent], 1106 _AgnosticContextManager[LangfuseTool], 1107 _AgnosticContextManager[LangfuseChain], 1108 _AgnosticContextManager[LangfuseRetriever], 1109 _AgnosticContextManager[LangfuseEvaluator], 1110 _AgnosticContextManager[LangfuseEmbedding], 1111 _AgnosticContextManager[LangfuseGuardrail], 1112 ]: 1113 """Create a new observation and set it as the current span in a context manager. 1114 1115 This method creates a new observation of the specified type and sets it as the 1116 current span within a context manager. Use this method with a 'with' statement to 1117 automatically handle the observation lifecycle within a code block. 1118 1119 The created observation will be the child of the current span in the context. 1120 1121 Args: 1122 trace_context: Optional context for connecting to an existing trace 1123 name: Name of the observation (e.g., function or operation name) 1124 as_type: Type of observation to create (defaults to "span") 1125 input: Input data for the operation (can be any JSON-serializable object) 1126 output: Output data from the operation (can be any JSON-serializable object) 1127 metadata: Additional metadata to associate with the observation 1128 version: Version identifier for the code or component 1129 level: Importance level of the observation (info, warning, error) 1130 status_message: Optional status message for the observation 1131 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1132 1133 The following parameters are available when as_type is: "generation" or "embedding". 1134 completion_start_time: When the model started generating the response 1135 model: Name/identifier of the AI model used (e.g., "gpt-4") 1136 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1137 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1138 cost_details: Cost information for the model call 1139 prompt: Associated prompt template from Langfuse prompt management 1140 1141 Returns: 1142 A context manager that yields the appropriate observation type based on as_type 1143 1144 Example: 1145 ```python 1146 # Create a span 1147 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1148 # Do work 1149 result = process_data() 1150 span.update(output=result) 1151 1152 # Create a child span automatically 1153 with span.start_as_current_span(name="sub-operation") as child_span: 1154 # Do sub-operation work 1155 child_span.update(output="sub-result") 1156 1157 # Create a tool observation 1158 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1159 # Do tool work 1160 results = search_web(query) 1161 tool.update(output=results) 1162 1163 # Create a generation observation 1164 with langfuse.start_as_current_observation( 1165 name="answer-generation", 1166 as_type="generation", 1167 model="gpt-4" 1168 ) as generation: 1169 # Generate answer 1170 response = llm.generate(...) 1171 generation.update(output=response) 1172 ``` 1173 """ 1174 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1175 if trace_context: 1176 trace_id = trace_context.get("trace_id", None) 1177 parent_span_id = trace_context.get("parent_span_id", None) 1178 1179 if trace_id: 1180 remote_parent_span = self._create_remote_parent_span( 1181 trace_id=trace_id, parent_span_id=parent_span_id 1182 ) 1183 1184 return cast( 1185 Union[ 1186 _AgnosticContextManager[LangfuseGeneration], 1187 _AgnosticContextManager[LangfuseEmbedding], 1188 ], 1189 self._create_span_with_parent_context( 1190 as_type=as_type, 1191 name=name, 1192 remote_parent_span=remote_parent_span, 1193 parent=None, 1194 end_on_exit=end_on_exit, 1195 input=input, 1196 output=output, 1197 metadata=metadata, 1198 version=version, 1199 level=level, 1200 status_message=status_message, 1201 completion_start_time=completion_start_time, 1202 model=model, 1203 model_parameters=model_parameters, 1204 usage_details=usage_details, 1205 cost_details=cost_details, 1206 prompt=prompt, 1207 ), 1208 ) 1209 1210 return cast( 1211 Union[ 1212 _AgnosticContextManager[LangfuseGeneration], 1213 _AgnosticContextManager[LangfuseEmbedding], 1214 ], 1215 self._start_as_current_otel_span_with_processed_media( 1216 as_type=as_type, 1217 name=name, 1218 end_on_exit=end_on_exit, 1219 input=input, 1220 output=output, 1221 metadata=metadata, 1222 version=version, 1223 level=level, 1224 status_message=status_message, 1225 completion_start_time=completion_start_time, 1226 model=model, 1227 model_parameters=model_parameters, 1228 usage_details=usage_details, 1229 cost_details=cost_details, 1230 prompt=prompt, 1231 ), 1232 ) 1233 1234 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1235 if trace_context: 1236 trace_id = trace_context.get("trace_id", None) 1237 parent_span_id = trace_context.get("parent_span_id", None) 1238 1239 if trace_id: 1240 remote_parent_span = self._create_remote_parent_span( 1241 trace_id=trace_id, parent_span_id=parent_span_id 1242 ) 1243 1244 return cast( 1245 Union[ 1246 _AgnosticContextManager[LangfuseSpan], 1247 _AgnosticContextManager[LangfuseAgent], 1248 _AgnosticContextManager[LangfuseTool], 1249 _AgnosticContextManager[LangfuseChain], 1250 _AgnosticContextManager[LangfuseRetriever], 1251 _AgnosticContextManager[LangfuseEvaluator], 1252 _AgnosticContextManager[LangfuseGuardrail], 1253 ], 1254 self._create_span_with_parent_context( 1255 as_type=as_type, 1256 name=name, 1257 remote_parent_span=remote_parent_span, 1258 parent=None, 1259 end_on_exit=end_on_exit, 1260 input=input, 1261 output=output, 1262 metadata=metadata, 1263 version=version, 1264 level=level, 1265 status_message=status_message, 1266 ), 1267 ) 1268 1269 return cast( 1270 Union[ 1271 _AgnosticContextManager[LangfuseSpan], 1272 _AgnosticContextManager[LangfuseAgent], 1273 _AgnosticContextManager[LangfuseTool], 1274 _AgnosticContextManager[LangfuseChain], 1275 _AgnosticContextManager[LangfuseRetriever], 1276 _AgnosticContextManager[LangfuseEvaluator], 1277 _AgnosticContextManager[LangfuseGuardrail], 1278 ], 1279 self._start_as_current_otel_span_with_processed_media( 1280 as_type=as_type, 1281 name=name, 1282 end_on_exit=end_on_exit, 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 ), 1290 ) 1291 1292 # This should never be reached since all valid types are handled above 1293 langfuse_logger.warning( 1294 f"Unknown observation type: {as_type}, falling back to span" 1295 ) 1296 return self._start_as_current_otel_span_with_processed_media( 1297 as_type="span", 1298 name=name, 1299 end_on_exit=end_on_exit, 1300 input=input, 1301 output=output, 1302 metadata=metadata, 1303 version=version, 1304 level=level, 1305 status_message=status_message, 1306 ) 1307 1308 def _get_span_class( 1309 self, 1310 as_type: ObservationTypeLiteral, 1311 ) -> Union[ 1312 Type[LangfuseAgent], 1313 Type[LangfuseTool], 1314 Type[LangfuseChain], 1315 Type[LangfuseRetriever], 1316 Type[LangfuseEvaluator], 1317 Type[LangfuseEmbedding], 1318 Type[LangfuseGuardrail], 1319 Type[LangfuseGeneration], 1320 Type[LangfuseEvent], 1321 Type[LangfuseSpan], 1322 ]: 1323 """Get the appropriate span class based on as_type.""" 1324 normalized_type = as_type.lower() 1325 1326 if normalized_type == "agent": 1327 return LangfuseAgent 1328 elif normalized_type == "tool": 1329 return LangfuseTool 1330 elif normalized_type == "chain": 1331 return LangfuseChain 1332 elif normalized_type == "retriever": 1333 return LangfuseRetriever 1334 elif normalized_type == "evaluator": 1335 return LangfuseEvaluator 1336 elif normalized_type == "embedding": 1337 return LangfuseEmbedding 1338 elif normalized_type == "guardrail": 1339 return LangfuseGuardrail 1340 elif normalized_type == "generation": 1341 return LangfuseGeneration 1342 elif normalized_type == "event": 1343 return LangfuseEvent 1344 elif normalized_type == "span": 1345 return LangfuseSpan 1346 else: 1347 return LangfuseSpan 1348 1349 @_agnosticcontextmanager 1350 def _create_span_with_parent_context( 1351 self, 1352 *, 1353 name: str, 1354 parent: Optional[otel_trace_api.Span] = None, 1355 remote_parent_span: Optional[otel_trace_api.Span] = None, 1356 as_type: ObservationTypeLiteralNoEvent, 1357 end_on_exit: Optional[bool] = None, 1358 input: Optional[Any] = None, 1359 output: Optional[Any] = None, 1360 metadata: Optional[Any] = None, 1361 version: Optional[str] = None, 1362 level: Optional[SpanLevel] = None, 1363 status_message: Optional[str] = None, 1364 completion_start_time: Optional[datetime] = None, 1365 model: Optional[str] = None, 1366 model_parameters: Optional[Dict[str, MapValue]] = None, 1367 usage_details: Optional[Dict[str, int]] = None, 1368 cost_details: Optional[Dict[str, float]] = None, 1369 prompt: Optional[PromptClient] = None, 1370 ) -> Any: 1371 parent_span = parent or cast(otel_trace_api.Span, remote_parent_span) 1372 1373 with otel_trace_api.use_span(parent_span): 1374 with self._start_as_current_otel_span_with_processed_media( 1375 name=name, 1376 as_type=as_type, 1377 end_on_exit=end_on_exit, 1378 input=input, 1379 output=output, 1380 metadata=metadata, 1381 version=version, 1382 level=level, 1383 status_message=status_message, 1384 completion_start_time=completion_start_time, 1385 model=model, 1386 model_parameters=model_parameters, 1387 usage_details=usage_details, 1388 cost_details=cost_details, 1389 prompt=prompt, 1390 ) as langfuse_span: 1391 if remote_parent_span is not None: 1392 langfuse_span._otel_span.set_attribute( 1393 LangfuseOtelSpanAttributes.AS_ROOT, True 1394 ) 1395 1396 yield langfuse_span 1397 1398 @_agnosticcontextmanager 1399 def _start_as_current_otel_span_with_processed_media( 1400 self, 1401 *, 1402 name: str, 1403 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 1404 end_on_exit: Optional[bool] = None, 1405 input: Optional[Any] = None, 1406 output: Optional[Any] = None, 1407 metadata: Optional[Any] = None, 1408 version: Optional[str] = None, 1409 level: Optional[SpanLevel] = None, 1410 status_message: Optional[str] = None, 1411 completion_start_time: Optional[datetime] = None, 1412 model: Optional[str] = None, 1413 model_parameters: Optional[Dict[str, MapValue]] = None, 1414 usage_details: Optional[Dict[str, int]] = None, 1415 cost_details: Optional[Dict[str, float]] = None, 1416 prompt: Optional[PromptClient] = None, 1417 ) -> Any: 1418 with self._otel_tracer.start_as_current_span( 1419 name=name, 1420 end_on_exit=end_on_exit if end_on_exit is not None else True, 1421 ) as otel_span: 1422 span_class = self._get_span_class( 1423 as_type or "generation" 1424 ) # default was "generation" 1425 common_args = { 1426 "otel_span": otel_span, 1427 "langfuse_client": self, 1428 "environment": self._environment, 1429 "input": input, 1430 "output": output, 1431 "metadata": metadata, 1432 "version": version, 1433 "level": level, 1434 "status_message": status_message, 1435 } 1436 1437 if span_class in [ 1438 LangfuseGeneration, 1439 LangfuseEmbedding, 1440 ]: 1441 common_args.update( 1442 { 1443 "completion_start_time": completion_start_time, 1444 "model": model, 1445 "model_parameters": model_parameters, 1446 "usage_details": usage_details, 1447 "cost_details": cost_details, 1448 "prompt": prompt, 1449 } 1450 ) 1451 # For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed 1452 1453 yield span_class(**common_args) # type: ignore[arg-type] 1454 1455 def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]: 1456 current_span = otel_trace_api.get_current_span() 1457 1458 if current_span is otel_trace_api.INVALID_SPAN: 1459 langfuse_logger.warning( 1460 "Context error: No active span in current context. Operations that depend on an active span will be skipped. " 1461 "Ensure spans are created with start_as_current_span() or that you're operating within an active span context." 1462 ) 1463 return None 1464 1465 return current_span 1466 1467 def update_current_generation( 1468 self, 1469 *, 1470 name: Optional[str] = None, 1471 input: Optional[Any] = None, 1472 output: Optional[Any] = None, 1473 metadata: Optional[Any] = None, 1474 version: Optional[str] = None, 1475 level: Optional[SpanLevel] = None, 1476 status_message: Optional[str] = None, 1477 completion_start_time: Optional[datetime] = None, 1478 model: Optional[str] = None, 1479 model_parameters: Optional[Dict[str, MapValue]] = None, 1480 usage_details: Optional[Dict[str, int]] = None, 1481 cost_details: Optional[Dict[str, float]] = None, 1482 prompt: Optional[PromptClient] = None, 1483 ) -> None: 1484 """Update the current active generation span with new information. 1485 1486 This method updates the current generation span in the active context with 1487 additional information. It's useful for adding output, usage stats, or other 1488 details that become available during or after model generation. 1489 1490 Args: 1491 name: The generation name 1492 input: Updated input data for the model 1493 output: Output from the model (e.g., completions) 1494 metadata: Additional metadata to associate with the generation 1495 version: Version identifier for the model or component 1496 level: Importance level of the generation (info, warning, error) 1497 status_message: Optional status message for the generation 1498 completion_start_time: When the model started generating the response 1499 model: Name/identifier of the AI model used (e.g., "gpt-4") 1500 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1501 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1502 cost_details: Cost information for the model call 1503 prompt: Associated prompt template from Langfuse prompt management 1504 1505 Example: 1506 ```python 1507 with langfuse.start_as_current_generation(name="answer-query") as generation: 1508 # Initial setup and API call 1509 response = llm.generate(...) 1510 1511 # Update with results that weren't available at creation time 1512 langfuse.update_current_generation( 1513 output=response.text, 1514 usage_details={ 1515 "prompt_tokens": response.usage.prompt_tokens, 1516 "completion_tokens": response.usage.completion_tokens 1517 } 1518 ) 1519 ``` 1520 """ 1521 if not self._tracing_enabled: 1522 langfuse_logger.debug( 1523 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1524 ) 1525 return 1526 1527 current_otel_span = self._get_current_otel_span() 1528 1529 if current_otel_span is not None: 1530 generation = LangfuseGeneration( 1531 otel_span=current_otel_span, langfuse_client=self 1532 ) 1533 1534 if name: 1535 current_otel_span.update_name(name) 1536 1537 generation.update( 1538 input=input, 1539 output=output, 1540 metadata=metadata, 1541 version=version, 1542 level=level, 1543 status_message=status_message, 1544 completion_start_time=completion_start_time, 1545 model=model, 1546 model_parameters=model_parameters, 1547 usage_details=usage_details, 1548 cost_details=cost_details, 1549 prompt=prompt, 1550 ) 1551 1552 def update_current_span( 1553 self, 1554 *, 1555 name: Optional[str] = None, 1556 input: Optional[Any] = None, 1557 output: Optional[Any] = None, 1558 metadata: Optional[Any] = None, 1559 version: Optional[str] = None, 1560 level: Optional[SpanLevel] = None, 1561 status_message: Optional[str] = None, 1562 ) -> None: 1563 """Update the current active span with new information. 1564 1565 This method updates the current span in the active context with 1566 additional information. It's useful for adding outputs or metadata 1567 that become available during execution. 1568 1569 Args: 1570 name: The span name 1571 input: Updated input data for the operation 1572 output: Output data from the operation 1573 metadata: Additional metadata to associate with the span 1574 version: Version identifier for the code or component 1575 level: Importance level of the span (info, warning, error) 1576 status_message: Optional status message for the span 1577 1578 Example: 1579 ```python 1580 with langfuse.start_as_current_span(name="process-data") as span: 1581 # Initial processing 1582 result = process_first_part() 1583 1584 # Update with intermediate results 1585 langfuse.update_current_span(metadata={"intermediate_result": result}) 1586 1587 # Continue processing 1588 final_result = process_second_part(result) 1589 1590 # Final update 1591 langfuse.update_current_span(output=final_result) 1592 ``` 1593 """ 1594 if not self._tracing_enabled: 1595 langfuse_logger.debug( 1596 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1597 ) 1598 return 1599 1600 current_otel_span = self._get_current_otel_span() 1601 1602 if current_otel_span is not None: 1603 span = LangfuseSpan( 1604 otel_span=current_otel_span, 1605 langfuse_client=self, 1606 environment=self._environment, 1607 ) 1608 1609 if name: 1610 current_otel_span.update_name(name) 1611 1612 span.update( 1613 input=input, 1614 output=output, 1615 metadata=metadata, 1616 version=version, 1617 level=level, 1618 status_message=status_message, 1619 ) 1620 1621 def update_current_trace( 1622 self, 1623 *, 1624 name: Optional[str] = None, 1625 user_id: Optional[str] = None, 1626 session_id: Optional[str] = None, 1627 version: Optional[str] = None, 1628 input: Optional[Any] = None, 1629 output: Optional[Any] = None, 1630 metadata: Optional[Any] = None, 1631 tags: Optional[List[str]] = None, 1632 public: Optional[bool] = None, 1633 ) -> None: 1634 """Update the current trace with additional information. 1635 1636 This method updates the Langfuse trace that the current span belongs to. It's useful for 1637 adding trace-level metadata like user ID, session ID, or tags that apply to 1638 the entire Langfuse trace rather than just a single observation. 1639 1640 Args: 1641 name: Updated name for the Langfuse trace 1642 user_id: ID of the user who initiated the Langfuse trace 1643 session_id: Session identifier for grouping related Langfuse traces 1644 version: Version identifier for the application or service 1645 input: Input data for the overall Langfuse trace 1646 output: Output data from the overall Langfuse trace 1647 metadata: Additional metadata to associate with the Langfuse trace 1648 tags: List of tags to categorize the Langfuse trace 1649 public: Whether the Langfuse trace should be publicly accessible 1650 1651 Example: 1652 ```python 1653 with langfuse.start_as_current_span(name="handle-request") as span: 1654 # Get user information 1655 user = authenticate_user(request) 1656 1657 # Update trace with user context 1658 langfuse.update_current_trace( 1659 user_id=user.id, 1660 session_id=request.session_id, 1661 tags=["production", "web-app"] 1662 ) 1663 1664 # Continue processing 1665 response = process_request(request) 1666 1667 # Update span with results 1668 span.update(output=response) 1669 ``` 1670 """ 1671 if not self._tracing_enabled: 1672 langfuse_logger.debug( 1673 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1674 ) 1675 return 1676 1677 current_otel_span = self._get_current_otel_span() 1678 1679 if current_otel_span is not None: 1680 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1681 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1682 ) 1683 # We need to preserve the class to keep the correct observation type 1684 span_class = self._get_span_class(existing_observation_type) 1685 span = span_class( 1686 otel_span=current_otel_span, 1687 langfuse_client=self, 1688 environment=self._environment, 1689 ) 1690 1691 span.update_trace( 1692 name=name, 1693 user_id=user_id, 1694 session_id=session_id, 1695 version=version, 1696 input=input, 1697 output=output, 1698 metadata=metadata, 1699 tags=tags, 1700 public=public, 1701 ) 1702 1703 def create_event( 1704 self, 1705 *, 1706 trace_context: Optional[TraceContext] = None, 1707 name: str, 1708 input: Optional[Any] = None, 1709 output: Optional[Any] = None, 1710 metadata: Optional[Any] = None, 1711 version: Optional[str] = None, 1712 level: Optional[SpanLevel] = None, 1713 status_message: Optional[str] = None, 1714 ) -> LangfuseEvent: 1715 """Create a new Langfuse observation of type 'EVENT'. 1716 1717 The created Langfuse Event observation will be the child of the current span in the context. 1718 1719 Args: 1720 trace_context: Optional context for connecting to an existing trace 1721 name: Name of the span (e.g., function or operation name) 1722 input: Input data for the operation (can be any JSON-serializable object) 1723 output: Output data from the operation (can be any JSON-serializable object) 1724 metadata: Additional metadata to associate with the span 1725 version: Version identifier for the code or component 1726 level: Importance level of the span (info, warning, error) 1727 status_message: Optional status message for the span 1728 1729 Returns: 1730 The Langfuse Event object 1731 1732 Example: 1733 ```python 1734 event = langfuse.create_event(name="process-event") 1735 ``` 1736 """ 1737 timestamp = time_ns() 1738 1739 if trace_context: 1740 trace_id = trace_context.get("trace_id", None) 1741 parent_span_id = trace_context.get("parent_span_id", None) 1742 1743 if trace_id: 1744 remote_parent_span = self._create_remote_parent_span( 1745 trace_id=trace_id, parent_span_id=parent_span_id 1746 ) 1747 1748 with otel_trace_api.use_span( 1749 cast(otel_trace_api.Span, remote_parent_span) 1750 ): 1751 otel_span = self._otel_tracer.start_span( 1752 name=name, start_time=timestamp 1753 ) 1754 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1755 1756 return cast( 1757 LangfuseEvent, 1758 LangfuseEvent( 1759 otel_span=otel_span, 1760 langfuse_client=self, 1761 environment=self._environment, 1762 input=input, 1763 output=output, 1764 metadata=metadata, 1765 version=version, 1766 level=level, 1767 status_message=status_message, 1768 ).end(end_time=timestamp), 1769 ) 1770 1771 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1772 1773 return cast( 1774 LangfuseEvent, 1775 LangfuseEvent( 1776 otel_span=otel_span, 1777 langfuse_client=self, 1778 environment=self._environment, 1779 input=input, 1780 output=output, 1781 metadata=metadata, 1782 version=version, 1783 level=level, 1784 status_message=status_message, 1785 ).end(end_time=timestamp), 1786 ) 1787 1788 def _create_remote_parent_span( 1789 self, *, trace_id: str, parent_span_id: Optional[str] 1790 ) -> Any: 1791 if not self._is_valid_trace_id(trace_id): 1792 langfuse_logger.warning( 1793 f"Passed trace ID '{trace_id}' is not a valid 32 lowercase hex char Langfuse trace id. Ignoring trace ID." 1794 ) 1795 1796 if parent_span_id and not self._is_valid_span_id(parent_span_id): 1797 langfuse_logger.warning( 1798 f"Passed span ID '{parent_span_id}' is not a valid 16 lowercase hex char Langfuse span id. Ignoring parent span ID." 1799 ) 1800 1801 int_trace_id = int(trace_id, 16) 1802 int_parent_span_id = ( 1803 int(parent_span_id, 16) 1804 if parent_span_id 1805 else RandomIdGenerator().generate_span_id() 1806 ) 1807 1808 span_context = otel_trace_api.SpanContext( 1809 trace_id=int_trace_id, 1810 span_id=int_parent_span_id, 1811 trace_flags=otel_trace_api.TraceFlags(0x01), # mark span as sampled 1812 is_remote=False, 1813 ) 1814 1815 return trace.NonRecordingSpan(span_context) 1816 1817 def _is_valid_trace_id(self, trace_id: str) -> bool: 1818 pattern = r"^[0-9a-f]{32}$" 1819 1820 return bool(re.match(pattern, trace_id)) 1821 1822 def _is_valid_span_id(self, span_id: str) -> bool: 1823 pattern = r"^[0-9a-f]{16}$" 1824 1825 return bool(re.match(pattern, span_id)) 1826 1827 def _create_observation_id(self, *, seed: Optional[str] = None) -> str: 1828 """Create a unique observation ID for use with Langfuse. 1829 1830 This method generates a unique observation ID (span ID in OpenTelemetry terms) 1831 for use with various Langfuse APIs. It can either generate a random ID or 1832 create a deterministic ID based on a seed string. 1833 1834 Observation IDs must be 16 lowercase hexadecimal characters, representing 8 bytes. 1835 This method ensures the generated ID meets this requirement. If you need to 1836 correlate an external ID with a Langfuse observation ID, use the external ID as 1837 the seed to get a valid, deterministic observation ID. 1838 1839 Args: 1840 seed: Optional string to use as a seed for deterministic ID generation. 1841 If provided, the same seed will always produce the same ID. 1842 If not provided, a random ID will be generated. 1843 1844 Returns: 1845 A 16-character lowercase hexadecimal string representing the observation ID. 1846 1847 Example: 1848 ```python 1849 # Generate a random observation ID 1850 obs_id = langfuse.create_observation_id() 1851 1852 # Generate a deterministic ID based on a seed 1853 user_obs_id = langfuse.create_observation_id(seed="user-123-feedback") 1854 1855 # Correlate an external item ID with a Langfuse observation ID 1856 item_id = "item-789012" 1857 correlated_obs_id = langfuse.create_observation_id(seed=item_id) 1858 1859 # Use the ID with Langfuse APIs 1860 langfuse.create_score( 1861 name="relevance", 1862 value=0.95, 1863 trace_id=trace_id, 1864 observation_id=obs_id 1865 ) 1866 ``` 1867 """ 1868 if not seed: 1869 span_id_int = RandomIdGenerator().generate_span_id() 1870 1871 return self._format_otel_span_id(span_id_int) 1872 1873 return sha256(seed.encode("utf-8")).digest()[:8].hex() 1874 1875 @staticmethod 1876 def create_trace_id(*, seed: Optional[str] = None) -> str: 1877 """Create a unique trace ID for use with Langfuse. 1878 1879 This method generates a unique trace ID for use with various Langfuse APIs. 1880 It can either generate a random ID or create a deterministic ID based on 1881 a seed string. 1882 1883 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1884 This method ensures the generated ID meets this requirement. If you need to 1885 correlate an external ID with a Langfuse trace ID, use the external ID as the 1886 seed to get a valid, deterministic Langfuse trace ID. 1887 1888 Args: 1889 seed: Optional string to use as a seed for deterministic ID generation. 1890 If provided, the same seed will always produce the same ID. 1891 If not provided, a random ID will be generated. 1892 1893 Returns: 1894 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1895 1896 Example: 1897 ```python 1898 # Generate a random trace ID 1899 trace_id = langfuse.create_trace_id() 1900 1901 # Generate a deterministic ID based on a seed 1902 session_trace_id = langfuse.create_trace_id(seed="session-456") 1903 1904 # Correlate an external ID with a Langfuse trace ID 1905 external_id = "external-system-123456" 1906 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1907 1908 # Use the ID with trace context 1909 with langfuse.start_as_current_span( 1910 name="process-request", 1911 trace_context={"trace_id": trace_id} 1912 ) as span: 1913 # Operation will be part of the specific trace 1914 pass 1915 ``` 1916 """ 1917 if not seed: 1918 trace_id_int = RandomIdGenerator().generate_trace_id() 1919 1920 return Langfuse._format_otel_trace_id(trace_id_int) 1921 1922 return sha256(seed.encode("utf-8")).digest()[:16].hex() 1923 1924 def _get_otel_trace_id(self, otel_span: otel_trace_api.Span) -> str: 1925 span_context = otel_span.get_span_context() 1926 1927 return self._format_otel_trace_id(span_context.trace_id) 1928 1929 def _get_otel_span_id(self, otel_span: otel_trace_api.Span) -> str: 1930 span_context = otel_span.get_span_context() 1931 1932 return self._format_otel_span_id(span_context.span_id) 1933 1934 @staticmethod 1935 def _format_otel_span_id(span_id_int: int) -> str: 1936 """Format an integer span ID to a 16-character lowercase hex string. 1937 1938 Internal method to convert an OpenTelemetry integer span ID to the standard 1939 W3C Trace Context format (16-character lowercase hex string). 1940 1941 Args: 1942 span_id_int: 64-bit integer representing a span ID 1943 1944 Returns: 1945 A 16-character lowercase hexadecimal string 1946 """ 1947 return format(span_id_int, "016x") 1948 1949 @staticmethod 1950 def _format_otel_trace_id(trace_id_int: int) -> str: 1951 """Format an integer trace ID to a 32-character lowercase hex string. 1952 1953 Internal method to convert an OpenTelemetry integer trace ID to the standard 1954 W3C Trace Context format (32-character lowercase hex string). 1955 1956 Args: 1957 trace_id_int: 128-bit integer representing a trace ID 1958 1959 Returns: 1960 A 32-character lowercase hexadecimal string 1961 """ 1962 return format(trace_id_int, "032x") 1963 1964 @overload 1965 def create_score( 1966 self, 1967 *, 1968 name: str, 1969 value: float, 1970 session_id: Optional[str] = None, 1971 dataset_run_id: Optional[str] = None, 1972 trace_id: Optional[str] = None, 1973 observation_id: Optional[str] = None, 1974 score_id: Optional[str] = None, 1975 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 1976 comment: Optional[str] = None, 1977 config_id: Optional[str] = None, 1978 metadata: Optional[Any] = None, 1979 ) -> None: ... 1980 1981 @overload 1982 def create_score( 1983 self, 1984 *, 1985 name: str, 1986 value: str, 1987 session_id: Optional[str] = None, 1988 dataset_run_id: Optional[str] = None, 1989 trace_id: Optional[str] = None, 1990 score_id: Optional[str] = None, 1991 observation_id: Optional[str] = None, 1992 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 1993 comment: Optional[str] = None, 1994 config_id: Optional[str] = None, 1995 metadata: Optional[Any] = None, 1996 ) -> None: ... 1997 1998 def create_score( 1999 self, 2000 *, 2001 name: str, 2002 value: Union[float, str], 2003 session_id: Optional[str] = None, 2004 dataset_run_id: Optional[str] = None, 2005 trace_id: Optional[str] = None, 2006 observation_id: Optional[str] = None, 2007 score_id: Optional[str] = None, 2008 data_type: Optional[ScoreDataType] = None, 2009 comment: Optional[str] = None, 2010 config_id: Optional[str] = None, 2011 metadata: Optional[Any] = None, 2012 ) -> None: 2013 """Create a score for a specific trace or observation. 2014 2015 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2016 used to track quality metrics, user feedback, or automated evaluations. 2017 2018 Args: 2019 name: Name of the score (e.g., "relevance", "accuracy") 2020 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2021 session_id: ID of the Langfuse session to associate the score with 2022 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2023 trace_id: ID of the Langfuse trace to associate the score with 2024 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2025 score_id: Optional custom ID for the score (auto-generated if not provided) 2026 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2027 comment: Optional comment or explanation for the score 2028 config_id: Optional ID of a score config defined in Langfuse 2029 metadata: Optional metadata to be attached to the score 2030 2031 Example: 2032 ```python 2033 # Create a numeric score for accuracy 2034 langfuse.create_score( 2035 name="accuracy", 2036 value=0.92, 2037 trace_id="abcdef1234567890abcdef1234567890", 2038 data_type="NUMERIC", 2039 comment="High accuracy with minor irrelevant details" 2040 ) 2041 2042 # Create a categorical score for sentiment 2043 langfuse.create_score( 2044 name="sentiment", 2045 value="positive", 2046 trace_id="abcdef1234567890abcdef1234567890", 2047 observation_id="abcdef1234567890", 2048 data_type="CATEGORICAL" 2049 ) 2050 ``` 2051 """ 2052 if not self._tracing_enabled: 2053 return 2054 2055 score_id = score_id or self._create_observation_id() 2056 2057 try: 2058 new_body = ScoreBody( 2059 id=score_id, 2060 sessionId=session_id, 2061 datasetRunId=dataset_run_id, 2062 traceId=trace_id, 2063 observationId=observation_id, 2064 name=name, 2065 value=value, 2066 dataType=data_type, # type: ignore 2067 comment=comment, 2068 configId=config_id, 2069 environment=self._environment, 2070 metadata=metadata, 2071 ) 2072 2073 event = { 2074 "id": self.create_trace_id(), 2075 "type": "score-create", 2076 "timestamp": _get_timestamp(), 2077 "body": new_body, 2078 } 2079 2080 if self._resources is not None: 2081 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2082 force_sample = ( 2083 not self._is_valid_trace_id(trace_id) if trace_id else True 2084 ) 2085 2086 self._resources.add_score_task( 2087 event, 2088 force_sample=force_sample, 2089 ) 2090 2091 except Exception as e: 2092 langfuse_logger.exception( 2093 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2094 ) 2095 2096 @overload 2097 def score_current_span( 2098 self, 2099 *, 2100 name: str, 2101 value: float, 2102 score_id: Optional[str] = None, 2103 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2104 comment: Optional[str] = None, 2105 config_id: Optional[str] = None, 2106 ) -> None: ... 2107 2108 @overload 2109 def score_current_span( 2110 self, 2111 *, 2112 name: str, 2113 value: str, 2114 score_id: Optional[str] = None, 2115 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2116 comment: Optional[str] = None, 2117 config_id: Optional[str] = None, 2118 ) -> None: ... 2119 2120 def score_current_span( 2121 self, 2122 *, 2123 name: str, 2124 value: Union[float, str], 2125 score_id: Optional[str] = None, 2126 data_type: Optional[ScoreDataType] = None, 2127 comment: Optional[str] = None, 2128 config_id: Optional[str] = None, 2129 ) -> None: 2130 """Create a score for the current active span. 2131 2132 This method scores the currently active span in the context. It's a convenient 2133 way to score the current operation without needing to know its trace and span IDs. 2134 2135 Args: 2136 name: Name of the score (e.g., "relevance", "accuracy") 2137 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2138 score_id: Optional custom ID for the score (auto-generated if not provided) 2139 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2140 comment: Optional comment or explanation for the score 2141 config_id: Optional ID of a score config defined in Langfuse 2142 2143 Example: 2144 ```python 2145 with langfuse.start_as_current_generation(name="answer-query") as generation: 2146 # Generate answer 2147 response = generate_answer(...) 2148 generation.update(output=response) 2149 2150 # Score the generation 2151 langfuse.score_current_span( 2152 name="relevance", 2153 value=0.85, 2154 data_type="NUMERIC", 2155 comment="Mostly relevant but contains some tangential information" 2156 ) 2157 ``` 2158 """ 2159 current_span = self._get_current_otel_span() 2160 2161 if current_span is not None: 2162 trace_id = self._get_otel_trace_id(current_span) 2163 observation_id = self._get_otel_span_id(current_span) 2164 2165 langfuse_logger.info( 2166 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2167 ) 2168 2169 self.create_score( 2170 trace_id=trace_id, 2171 observation_id=observation_id, 2172 name=name, 2173 value=cast(str, value), 2174 score_id=score_id, 2175 data_type=cast(Literal["CATEGORICAL"], data_type), 2176 comment=comment, 2177 config_id=config_id, 2178 ) 2179 2180 @overload 2181 def score_current_trace( 2182 self, 2183 *, 2184 name: str, 2185 value: float, 2186 score_id: Optional[str] = None, 2187 data_type: Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2188 comment: Optional[str] = None, 2189 config_id: Optional[str] = None, 2190 ) -> None: ... 2191 2192 @overload 2193 def score_current_trace( 2194 self, 2195 *, 2196 name: str, 2197 value: str, 2198 score_id: Optional[str] = None, 2199 data_type: Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2200 comment: Optional[str] = None, 2201 config_id: Optional[str] = None, 2202 ) -> None: ... 2203 2204 def score_current_trace( 2205 self, 2206 *, 2207 name: str, 2208 value: Union[float, str], 2209 score_id: Optional[str] = None, 2210 data_type: Optional[ScoreDataType] = None, 2211 comment: Optional[str] = None, 2212 config_id: Optional[str] = None, 2213 ) -> None: 2214 """Create a score for the current trace. 2215 2216 This method scores the trace of the currently active span. Unlike score_current_span, 2217 this method associates the score with the entire trace rather than a specific span. 2218 It's useful for scoring overall performance or quality of the entire operation. 2219 2220 Args: 2221 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2222 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2223 score_id: Optional custom ID for the score (auto-generated if not provided) 2224 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2225 comment: Optional comment or explanation for the score 2226 config_id: Optional ID of a score config defined in Langfuse 2227 2228 Example: 2229 ```python 2230 with langfuse.start_as_current_span(name="process-user-request") as span: 2231 # Process request 2232 result = process_complete_request() 2233 span.update(output=result) 2234 2235 # Score the overall trace 2236 langfuse.score_current_trace( 2237 name="overall_quality", 2238 value=0.95, 2239 data_type="NUMERIC", 2240 comment="High quality end-to-end response" 2241 ) 2242 ``` 2243 """ 2244 current_span = self._get_current_otel_span() 2245 2246 if current_span is not None: 2247 trace_id = self._get_otel_trace_id(current_span) 2248 2249 langfuse_logger.info( 2250 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2251 ) 2252 2253 self.create_score( 2254 trace_id=trace_id, 2255 name=name, 2256 value=cast(str, value), 2257 score_id=score_id, 2258 data_type=cast(Literal["CATEGORICAL"], data_type), 2259 comment=comment, 2260 config_id=config_id, 2261 ) 2262 2263 def flush(self) -> None: 2264 """Force flush all pending spans and events to the Langfuse API. 2265 2266 This method manually flushes any pending spans, scores, and other events to the 2267 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2268 before proceeding, without waiting for the automatic flush interval. 2269 2270 Example: 2271 ```python 2272 # Record some spans and scores 2273 with langfuse.start_as_current_span(name="operation") as span: 2274 # Do work... 2275 pass 2276 2277 # Ensure all data is sent to Langfuse before proceeding 2278 langfuse.flush() 2279 2280 # Continue with other work 2281 ``` 2282 """ 2283 if self._resources is not None: 2284 self._resources.flush() 2285 2286 def shutdown(self) -> None: 2287 """Shut down the Langfuse client and flush all pending data. 2288 2289 This method cleanly shuts down the Langfuse client, ensuring all pending data 2290 is flushed to the API and all background threads are properly terminated. 2291 2292 It's important to call this method when your application is shutting down to 2293 prevent data loss and resource leaks. For most applications, using the client 2294 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2295 2296 Example: 2297 ```python 2298 # Initialize Langfuse 2299 langfuse = Langfuse(public_key="...", secret_key="...") 2300 2301 # Use Langfuse throughout your application 2302 # ... 2303 2304 # When application is shutting down 2305 langfuse.shutdown() 2306 ``` 2307 """ 2308 if self._resources is not None: 2309 self._resources.shutdown() 2310 2311 def get_current_trace_id(self) -> Optional[str]: 2312 """Get the trace ID of the current active span. 2313 2314 This method retrieves the trace ID from the currently active span in the context. 2315 It can be used to get the trace ID for referencing in logs, external systems, 2316 or for creating related operations. 2317 2318 Returns: 2319 The current trace ID as a 32-character lowercase hexadecimal string, 2320 or None if there is no active span. 2321 2322 Example: 2323 ```python 2324 with langfuse.start_as_current_span(name="process-request") as span: 2325 # Get the current trace ID for reference 2326 trace_id = langfuse.get_current_trace_id() 2327 2328 # Use it for external correlation 2329 log.info(f"Processing request with trace_id: {trace_id}") 2330 2331 # Or pass to another system 2332 external_system.process(data, trace_id=trace_id) 2333 ``` 2334 """ 2335 if not self._tracing_enabled: 2336 langfuse_logger.debug( 2337 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2338 ) 2339 return None 2340 2341 current_otel_span = self._get_current_otel_span() 2342 2343 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None 2344 2345 def get_current_observation_id(self) -> Optional[str]: 2346 """Get the observation ID (span ID) of the current active span. 2347 2348 This method retrieves the observation ID from the currently active span in the context. 2349 It can be used to get the observation ID for referencing in logs, external systems, 2350 or for creating scores or other related operations. 2351 2352 Returns: 2353 The current observation ID as a 16-character lowercase hexadecimal string, 2354 or None if there is no active span. 2355 2356 Example: 2357 ```python 2358 with langfuse.start_as_current_span(name="process-user-query") as span: 2359 # Get the current observation ID 2360 observation_id = langfuse.get_current_observation_id() 2361 2362 # Store it for later reference 2363 cache.set(f"query_{query_id}_observation", observation_id) 2364 2365 # Process the query... 2366 ``` 2367 """ 2368 if not self._tracing_enabled: 2369 langfuse_logger.debug( 2370 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2371 ) 2372 return None 2373 2374 current_otel_span = self._get_current_otel_span() 2375 2376 return self._get_otel_span_id(current_otel_span) if current_otel_span else None 2377 2378 def _get_project_id(self) -> Optional[str]: 2379 """Fetch and return the current project id. Persisted across requests. Returns None if no project id is found for api keys.""" 2380 if not self._project_id: 2381 proj = self.api.projects.get() 2382 if not proj.data or not proj.data[0].id: 2383 return None 2384 2385 self._project_id = proj.data[0].id 2386 2387 return self._project_id 2388 2389 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2390 """Get the URL to view a trace in the Langfuse UI. 2391 2392 This method generates a URL that links directly to a trace in the Langfuse UI. 2393 It's useful for providing links in logs, notifications, or debugging tools. 2394 2395 Args: 2396 trace_id: Optional trace ID to generate a URL for. If not provided, 2397 the trace ID of the current active span will be used. 2398 2399 Returns: 2400 A URL string pointing to the trace in the Langfuse UI, 2401 or None if the project ID couldn't be retrieved or no trace ID is available. 2402 2403 Example: 2404 ```python 2405 # Get URL for the current trace 2406 with langfuse.start_as_current_span(name="process-request") as span: 2407 trace_url = langfuse.get_trace_url() 2408 log.info(f"Processing trace: {trace_url}") 2409 2410 # Get URL for a specific trace 2411 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2412 send_notification(f"Review needed for trace: {specific_trace_url}") 2413 ``` 2414 """ 2415 project_id = self._get_project_id() 2416 final_trace_id = trace_id or self.get_current_trace_id() 2417 2418 return ( 2419 f"{self._host}/project/{project_id}/traces/{final_trace_id}" 2420 if project_id and final_trace_id 2421 else None 2422 ) 2423 2424 def get_dataset( 2425 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2426 ) -> "DatasetClient": 2427 """Fetch a dataset by its name. 2428 2429 Args: 2430 name (str): The name of the dataset to fetch. 2431 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2432 2433 Returns: 2434 DatasetClient: The dataset with the given name. 2435 """ 2436 try: 2437 langfuse_logger.debug(f"Getting datasets {name}") 2438 dataset = self.api.datasets.get(dataset_name=name) 2439 2440 dataset_items = [] 2441 page = 1 2442 2443 while True: 2444 new_items = self.api.dataset_items.list( 2445 dataset_name=self._url_encode(name, is_url_param=True), 2446 page=page, 2447 limit=fetch_items_page_size, 2448 ) 2449 dataset_items.extend(new_items.data) 2450 2451 if new_items.meta.total_pages <= page: 2452 break 2453 2454 page += 1 2455 2456 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2457 2458 return DatasetClient(dataset, items=items) 2459 2460 except Error as e: 2461 handle_fern_exception(e) 2462 raise e 2463 2464 def run_experiment( 2465 self, 2466 *, 2467 name: str, 2468 run_name: Optional[str] = None, 2469 description: Optional[str] = None, 2470 data: ExperimentData, 2471 task: TaskFunction, 2472 evaluators: List[EvaluatorFunction] = [], 2473 run_evaluators: List[RunEvaluatorFunction] = [], 2474 max_concurrency: int = 50, 2475 metadata: Optional[Dict[str, Any]] = None, 2476 ) -> ExperimentResult: 2477 """Run an experiment on a dataset with automatic tracing and evaluation. 2478 2479 This method executes a task function on each item in the provided dataset, 2480 automatically traces all executions with Langfuse for observability, runs 2481 item-level and run-level evaluators on the outputs, and returns comprehensive 2482 results with evaluation metrics. 2483 2484 The experiment system provides: 2485 - Automatic tracing of all task executions 2486 - Concurrent processing with configurable limits 2487 - Comprehensive error handling that isolates failures 2488 - Integration with Langfuse datasets for experiment tracking 2489 - Flexible evaluation framework supporting both sync and async evaluators 2490 2491 Args: 2492 name: Human-readable name for the experiment. Used for identification 2493 in the Langfuse UI. 2494 run_name: Optional exact name for the experiment run. If provided, this will be 2495 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2496 If not provided, this will default to the experiment name appended with an ISO timestamp. 2497 description: Optional description explaining the experiment's purpose, 2498 methodology, or expected outcomes. 2499 data: Array of data items to process. Can be either: 2500 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2501 - List of Langfuse DatasetItem objects from dataset.items 2502 task: Function that processes each data item and returns output. 2503 Must accept 'item' as keyword argument and can return sync or async results. 2504 The task function signature should be: task(*, item, **kwargs) -> Any 2505 evaluators: List of functions to evaluate each item's output individually. 2506 Each evaluator receives input, output, expected_output, and metadata. 2507 Can return single Evaluation dict or list of Evaluation dicts. 2508 run_evaluators: List of functions to evaluate the entire experiment run. 2509 Each run evaluator receives all item_results and can compute aggregate metrics. 2510 Useful for calculating averages, distributions, or cross-item comparisons. 2511 max_concurrency: Maximum number of concurrent task executions (default: 50). 2512 Controls the number of items processed simultaneously. Adjust based on 2513 API rate limits and system resources. 2514 metadata: Optional metadata dictionary to attach to all experiment traces. 2515 This metadata will be included in every trace created during the experiment. 2516 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2517 2518 Returns: 2519 ExperimentResult containing: 2520 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2521 - item_results: List of results for each processed item with outputs and evaluations 2522 - run_evaluations: List of aggregate evaluation results for the entire run 2523 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2524 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2525 2526 Raises: 2527 ValueError: If required parameters are missing or invalid 2528 Exception: If experiment setup fails (individual item failures are handled gracefully) 2529 2530 Examples: 2531 Basic experiment with local data: 2532 ```python 2533 def summarize_text(*, item, **kwargs): 2534 return f"Summary: {item['input'][:50]}..." 2535 2536 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2537 return { 2538 "name": "output_length", 2539 "value": len(output), 2540 "comment": f"Output contains {len(output)} characters" 2541 } 2542 2543 result = langfuse.run_experiment( 2544 name="Text Summarization Test", 2545 description="Evaluate summarization quality and length", 2546 data=[ 2547 {"input": "Long article text...", "expected_output": "Expected summary"}, 2548 {"input": "Another article...", "expected_output": "Another summary"} 2549 ], 2550 task=summarize_text, 2551 evaluators=[length_evaluator] 2552 ) 2553 2554 print(f"Processed {len(result.item_results)} items") 2555 for item_result in result.item_results: 2556 print(f"Input: {item_result.item['input']}") 2557 print(f"Output: {item_result.output}") 2558 print(f"Evaluations: {item_result.evaluations}") 2559 ``` 2560 2561 Advanced experiment with async task and multiple evaluators: 2562 ```python 2563 async def llm_task(*, item, **kwargs): 2564 # Simulate async LLM call 2565 response = await openai_client.chat.completions.create( 2566 model="gpt-4", 2567 messages=[{"role": "user", "content": item["input"]}] 2568 ) 2569 return response.choices[0].message.content 2570 2571 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2572 if expected_output and expected_output.lower() in output.lower(): 2573 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2574 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2575 2576 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2577 # Simulate toxicity check 2578 toxicity_score = check_toxicity(output) # Your toxicity checker 2579 return { 2580 "name": "toxicity", 2581 "value": toxicity_score, 2582 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2583 } 2584 2585 def average_accuracy(*, item_results, **kwargs): 2586 accuracies = [ 2587 eval.value for result in item_results 2588 for eval in result.evaluations 2589 if eval.name == "accuracy" 2590 ] 2591 return { 2592 "name": "average_accuracy", 2593 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2594 "comment": f"Average accuracy across {len(accuracies)} items" 2595 } 2596 2597 result = langfuse.run_experiment( 2598 name="LLM Safety and Accuracy Test", 2599 description="Evaluate model accuracy and safety across diverse prompts", 2600 data=test_dataset, # Your dataset items 2601 task=llm_task, 2602 evaluators=[accuracy_evaluator, toxicity_evaluator], 2603 run_evaluators=[average_accuracy], 2604 max_concurrency=5, # Limit concurrent API calls 2605 metadata={"model": "gpt-4", "temperature": 0.7} 2606 ) 2607 ``` 2608 2609 Using with Langfuse datasets: 2610 ```python 2611 # Get dataset from Langfuse 2612 dataset = langfuse.get_dataset("my-eval-dataset") 2613 2614 result = dataset.run_experiment( 2615 name="Production Model Evaluation", 2616 description="Monthly evaluation of production model performance", 2617 task=my_production_task, 2618 evaluators=[accuracy_evaluator, latency_evaluator] 2619 ) 2620 2621 # Results automatically linked to dataset in Langfuse UI 2622 print(f"View results: {result['dataset_run_url']}") 2623 ``` 2624 2625 Note: 2626 - Task and evaluator functions can be either synchronous or asynchronous 2627 - Individual item failures are logged but don't stop the experiment 2628 - All executions are automatically traced and visible in Langfuse UI 2629 - When using Langfuse datasets, results are automatically linked for easy comparison 2630 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2631 - Async execution is handled automatically with smart event loop detection 2632 """ 2633 return cast( 2634 ExperimentResult, 2635 run_async_safely( 2636 self._run_experiment_async( 2637 name=name, 2638 run_name=self._create_experiment_run_name( 2639 name=name, run_name=run_name 2640 ), 2641 description=description, 2642 data=data, 2643 task=task, 2644 evaluators=evaluators or [], 2645 run_evaluators=run_evaluators or [], 2646 max_concurrency=max_concurrency, 2647 metadata=metadata or {}, 2648 ), 2649 ), 2650 ) 2651 2652 async def _run_experiment_async( 2653 self, 2654 *, 2655 name: str, 2656 run_name: str, 2657 description: Optional[str], 2658 data: ExperimentData, 2659 task: TaskFunction, 2660 evaluators: List[EvaluatorFunction], 2661 run_evaluators: List[RunEvaluatorFunction], 2662 max_concurrency: int, 2663 metadata: Dict[str, Any], 2664 ) -> ExperimentResult: 2665 langfuse_logger.debug( 2666 f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" 2667 ) 2668 2669 # Set up concurrency control 2670 semaphore = asyncio.Semaphore(max_concurrency) 2671 2672 # Process all items 2673 async def process_item(item: ExperimentItem) -> ExperimentItemResult: 2674 async with semaphore: 2675 return await self._process_experiment_item( 2676 item, task, evaluators, name, run_name, description, metadata 2677 ) 2678 2679 # Run all items concurrently 2680 tasks = [process_item(item) for item in data] 2681 item_results = await asyncio.gather(*tasks, return_exceptions=True) 2682 2683 # Filter out any exceptions and log errors 2684 valid_results: List[ExperimentItemResult] = [] 2685 for i, result in enumerate(item_results): 2686 if isinstance(result, Exception): 2687 langfuse_logger.error(f"Item {i} failed: {result}") 2688 elif isinstance(result, ExperimentItemResult): 2689 valid_results.append(result) # type: ignore 2690 2691 # Run experiment-level evaluators 2692 run_evaluations: List[Evaluation] = [] 2693 for run_evaluator in run_evaluators: 2694 try: 2695 evaluations = await _run_evaluator( 2696 run_evaluator, item_results=valid_results 2697 ) 2698 run_evaluations.extend(evaluations) 2699 except Exception as e: 2700 langfuse_logger.error(f"Run evaluator failed: {e}") 2701 2702 # Generate dataset run URL if applicable 2703 dataset_run_id = valid_results[0].dataset_run_id if valid_results else None 2704 dataset_run_url = None 2705 if dataset_run_id and data: 2706 try: 2707 # Check if the first item has dataset_id (for DatasetItem objects) 2708 first_item = data[0] 2709 dataset_id = None 2710 2711 if hasattr(first_item, "dataset_id"): 2712 dataset_id = getattr(first_item, "dataset_id", None) 2713 2714 if dataset_id: 2715 project_id = self._get_project_id() 2716 2717 if project_id: 2718 dataset_run_url = f"{self._host}/project/{project_id}/datasets/{dataset_id}/runs/{dataset_run_id}" 2719 2720 except Exception: 2721 pass # URL generation is optional 2722 2723 # Store run-level evaluations as scores 2724 for evaluation in run_evaluations: 2725 try: 2726 if dataset_run_id: 2727 self.create_score( 2728 dataset_run_id=dataset_run_id, 2729 name=evaluation.name or "<unknown>", 2730 value=evaluation.value, # type: ignore 2731 comment=evaluation.comment, 2732 metadata=evaluation.metadata, 2733 data_type=evaluation.data_type, # type: ignore 2734 ) 2735 2736 except Exception as e: 2737 langfuse_logger.error(f"Failed to store run evaluation: {e}") 2738 2739 # Flush scores and traces 2740 self.flush() 2741 2742 return ExperimentResult( 2743 name=name, 2744 run_name=run_name, 2745 description=description, 2746 item_results=valid_results, 2747 run_evaluations=run_evaluations, 2748 dataset_run_id=dataset_run_id, 2749 dataset_run_url=dataset_run_url, 2750 ) 2751 2752 async def _process_experiment_item( 2753 self, 2754 item: ExperimentItem, 2755 task: Callable, 2756 evaluators: List[Callable], 2757 experiment_name: str, 2758 experiment_run_name: str, 2759 experiment_description: Optional[str], 2760 experiment_metadata: Dict[str, Any], 2761 ) -> ExperimentItemResult: 2762 # Execute task with tracing 2763 span_name = "experiment-item-run" 2764 2765 with self.start_as_current_span(name=span_name) as span: 2766 try: 2767 output = await _run_task(task, item) 2768 2769 input_data = ( 2770 item.get("input") 2771 if isinstance(item, dict) 2772 else getattr(item, "input", None) 2773 ) 2774 2775 item_metadata: Dict[str, Any] = {} 2776 2777 if isinstance(item, dict): 2778 item_metadata = item.get("metadata", None) or {} 2779 2780 final_metadata = { 2781 "experiment_name": experiment_name, 2782 "experiment_run_name": experiment_run_name, 2783 **experiment_metadata, 2784 } 2785 2786 if ( 2787 not isinstance(item, dict) 2788 and hasattr(item, "dataset_id") 2789 and hasattr(item, "id") 2790 ): 2791 final_metadata.update( 2792 {"dataset_id": item.dataset_id, "dataset_item_id": item.id} 2793 ) 2794 2795 if isinstance(item_metadata, dict): 2796 final_metadata.update(item_metadata) 2797 2798 span.update( 2799 input=input_data, 2800 output=output, 2801 metadata=final_metadata, 2802 ) 2803 2804 # Get trace ID for linking 2805 trace_id = span.trace_id 2806 dataset_run_id = None 2807 2808 # Link to dataset run if this is a dataset item 2809 if hasattr(item, "id") and hasattr(item, "dataset_id"): 2810 try: 2811 dataset_run_item = self.api.dataset_run_items.create( 2812 request=CreateDatasetRunItemRequest( 2813 runName=experiment_run_name, 2814 runDescription=experiment_description, 2815 metadata=experiment_metadata, 2816 datasetItemId=item.id, # type: ignore 2817 traceId=trace_id, 2818 observationId=span.id, 2819 ) 2820 ) 2821 2822 dataset_run_id = dataset_run_item.dataset_run_id 2823 2824 except Exception as e: 2825 langfuse_logger.error(f"Failed to create dataset run item: {e}") 2826 2827 # Run evaluators 2828 evaluations = [] 2829 2830 for evaluator in evaluators: 2831 try: 2832 expected_output = None 2833 2834 if isinstance(item, dict): 2835 expected_output = item.get("expected_output") 2836 elif hasattr(item, "expected_output"): 2837 expected_output = item.expected_output 2838 2839 eval_metadata: Optional[Dict[str, Any]] = None 2840 2841 if isinstance(item, dict): 2842 eval_metadata = item.get("metadata") 2843 elif hasattr(item, "metadata"): 2844 eval_metadata = item.metadata 2845 2846 eval_results = await _run_evaluator( 2847 evaluator, 2848 input=input_data, 2849 output=output, 2850 expected_output=expected_output, 2851 metadata=eval_metadata, 2852 ) 2853 evaluations.extend(eval_results) 2854 2855 # Store evaluations as scores 2856 for evaluation in eval_results: 2857 self.create_score( 2858 trace_id=trace_id, 2859 name=evaluation.name, 2860 value=evaluation.value or -1, 2861 comment=evaluation.comment, 2862 metadata=evaluation.metadata, 2863 ) 2864 2865 except Exception as e: 2866 langfuse_logger.error(f"Evaluator failed: {e}") 2867 2868 return ExperimentItemResult( 2869 item=item, 2870 output=output, 2871 evaluations=evaluations, 2872 trace_id=trace_id, 2873 dataset_run_id=dataset_run_id, 2874 ) 2875 2876 except Exception as e: 2877 span.update( 2878 output=f"Error: {str(e)}", level="ERROR", status_message=str(e) 2879 ) 2880 raise e 2881 2882 def _create_experiment_run_name( 2883 self, *, name: Optional[str] = None, run_name: Optional[str] = None 2884 ) -> str: 2885 if run_name: 2886 return run_name 2887 2888 iso_timestamp = _get_timestamp().isoformat().replace("+00:00", "Z") 2889 2890 return f"{name} - {iso_timestamp}" 2891 2892 def auth_check(self) -> bool: 2893 """Check if the provided credentials (public and secret key) are valid. 2894 2895 Raises: 2896 Exception: If no projects were found for the provided credentials. 2897 2898 Note: 2899 This method is blocking. It is discouraged to use it in production code. 2900 """ 2901 try: 2902 projects = self.api.projects.get() 2903 langfuse_logger.debug( 2904 f"Auth check successful, found {len(projects.data)} projects" 2905 ) 2906 if len(projects.data) == 0: 2907 raise Exception( 2908 "Auth check failed, no project found for the keys provided." 2909 ) 2910 return True 2911 2912 except AttributeError as e: 2913 langfuse_logger.warning( 2914 f"Auth check failed: Client not properly initialized. Error: {e}" 2915 ) 2916 return False 2917 2918 except Error as e: 2919 handle_fern_exception(e) 2920 raise e 2921 2922 def create_dataset( 2923 self, 2924 *, 2925 name: str, 2926 description: Optional[str] = None, 2927 metadata: Optional[Any] = None, 2928 ) -> Dataset: 2929 """Create a dataset with the given name on Langfuse. 2930 2931 Args: 2932 name: Name of the dataset to create. 2933 description: Description of the dataset. Defaults to None. 2934 metadata: Additional metadata. Defaults to None. 2935 2936 Returns: 2937 Dataset: The created dataset as returned by the Langfuse API. 2938 """ 2939 try: 2940 body = CreateDatasetRequest( 2941 name=name, description=description, metadata=metadata 2942 ) 2943 langfuse_logger.debug(f"Creating datasets {body}") 2944 2945 return self.api.datasets.create(request=body) 2946 2947 except Error as e: 2948 handle_fern_exception(e) 2949 raise e 2950 2951 def create_dataset_item( 2952 self, 2953 *, 2954 dataset_name: str, 2955 input: Optional[Any] = None, 2956 expected_output: Optional[Any] = None, 2957 metadata: Optional[Any] = None, 2958 source_trace_id: Optional[str] = None, 2959 source_observation_id: Optional[str] = None, 2960 status: Optional[DatasetStatus] = None, 2961 id: Optional[str] = None, 2962 ) -> DatasetItem: 2963 """Create a dataset item. 2964 2965 Upserts if an item with id already exists. 2966 2967 Args: 2968 dataset_name: Name of the dataset in which the dataset item should be created. 2969 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2970 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2971 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2972 source_trace_id: Id of the source trace. Defaults to None. 2973 source_observation_id: Id of the source observation. Defaults to None. 2974 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2975 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2976 2977 Returns: 2978 DatasetItem: The created dataset item as returned by the Langfuse API. 2979 2980 Example: 2981 ```python 2982 from langfuse import Langfuse 2983 2984 langfuse = Langfuse() 2985 2986 # Uploading items to the Langfuse dataset named "capital_cities" 2987 langfuse.create_dataset_item( 2988 dataset_name="capital_cities", 2989 input={"input": {"country": "Italy"}}, 2990 expected_output={"expected_output": "Rome"}, 2991 metadata={"foo": "bar"} 2992 ) 2993 ``` 2994 """ 2995 try: 2996 body = CreateDatasetItemRequest( 2997 datasetName=dataset_name, 2998 input=input, 2999 expectedOutput=expected_output, 3000 metadata=metadata, 3001 sourceTraceId=source_trace_id, 3002 sourceObservationId=source_observation_id, 3003 status=status, 3004 id=id, 3005 ) 3006 langfuse_logger.debug(f"Creating dataset item {body}") 3007 return self.api.dataset_items.create(request=body) 3008 except Error as e: 3009 handle_fern_exception(e) 3010 raise e 3011 3012 def resolve_media_references( 3013 self, 3014 *, 3015 obj: Any, 3016 resolve_with: Literal["base64_data_uri"], 3017 max_depth: int = 10, 3018 content_fetch_timeout_seconds: int = 5, 3019 ) -> Any: 3020 """Replace media reference strings in an object with base64 data URIs. 3021 3022 This method recursively traverses an object (up to max_depth) looking for media reference strings 3023 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3024 the provided Langfuse client and replaces the reference string with a base64 data URI. 3025 3026 If fetching media content fails for a reference string, a warning is logged and the reference 3027 string is left unchanged. 3028 3029 Args: 3030 obj: The object to process. Can be a primitive value, array, or nested object. 3031 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3032 resolve_with: The representation of the media content to replace the media reference string with. 3033 Currently only "base64_data_uri" is supported. 3034 max_depth: int: The maximum depth to traverse the object. Default is 10. 3035 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3036 3037 Returns: 3038 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3039 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3040 3041 Example: 3042 obj = { 3043 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3044 "nested": { 3045 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3046 } 3047 } 3048 3049 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3050 3051 # Result: 3052 # { 3053 # "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", 3054 # "nested": { 3055 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3056 # } 3057 # } 3058 """ 3059 return LangfuseMedia.resolve_media_references( 3060 langfuse_client=self, 3061 obj=obj, 3062 resolve_with=resolve_with, 3063 max_depth=max_depth, 3064 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3065 ) 3066 3067 @overload 3068 def get_prompt( 3069 self, 3070 name: str, 3071 *, 3072 version: Optional[int] = None, 3073 label: Optional[str] = None, 3074 type: Literal["chat"], 3075 cache_ttl_seconds: Optional[int] = None, 3076 fallback: Optional[List[ChatMessageDict]] = None, 3077 max_retries: Optional[int] = None, 3078 fetch_timeout_seconds: Optional[int] = None, 3079 ) -> ChatPromptClient: ... 3080 3081 @overload 3082 def get_prompt( 3083 self, 3084 name: str, 3085 *, 3086 version: Optional[int] = None, 3087 label: Optional[str] = None, 3088 type: Literal["text"] = "text", 3089 cache_ttl_seconds: Optional[int] = None, 3090 fallback: Optional[str] = None, 3091 max_retries: Optional[int] = None, 3092 fetch_timeout_seconds: Optional[int] = None, 3093 ) -> TextPromptClient: ... 3094 3095 def get_prompt( 3096 self, 3097 name: str, 3098 *, 3099 version: Optional[int] = None, 3100 label: Optional[str] = None, 3101 type: Literal["chat", "text"] = "text", 3102 cache_ttl_seconds: Optional[int] = None, 3103 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3104 max_retries: Optional[int] = None, 3105 fetch_timeout_seconds: Optional[int] = None, 3106 ) -> PromptClient: 3107 """Get a prompt. 3108 3109 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3110 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3111 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3112 return the expired prompt as a fallback. 3113 3114 Args: 3115 name (str): The name of the prompt to retrieve. 3116 3117 Keyword Args: 3118 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3119 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3120 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3121 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3122 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3123 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3124 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3125 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3126 3127 Returns: 3128 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3129 - TextPromptClient, if type argument is 'text'. 3130 - ChatPromptClient, if type argument is 'chat'. 3131 3132 Raises: 3133 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3134 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3135 """ 3136 if self._resources is None: 3137 raise Error( 3138 "SDK is not correctly initialized. Check the init logs for more details." 3139 ) 3140 if version is not None and label is not None: 3141 raise ValueError("Cannot specify both version and label at the same time.") 3142 3143 if not name: 3144 raise ValueError("Prompt name cannot be empty.") 3145 3146 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3147 bounded_max_retries = self._get_bounded_max_retries( 3148 max_retries, default_max_retries=2, max_retries_upper_bound=4 3149 ) 3150 3151 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3152 cached_prompt = self._resources.prompt_cache.get(cache_key) 3153 3154 if cached_prompt is None or cache_ttl_seconds == 0: 3155 langfuse_logger.debug( 3156 f"Prompt '{cache_key}' not found in cache or caching disabled." 3157 ) 3158 try: 3159 return self._fetch_prompt_and_update_cache( 3160 name, 3161 version=version, 3162 label=label, 3163 ttl_seconds=cache_ttl_seconds, 3164 max_retries=bounded_max_retries, 3165 fetch_timeout_seconds=fetch_timeout_seconds, 3166 ) 3167 except Exception as e: 3168 if fallback: 3169 langfuse_logger.warning( 3170 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3171 ) 3172 3173 fallback_client_args: Dict[str, Any] = { 3174 "name": name, 3175 "prompt": fallback, 3176 "type": type, 3177 "version": version or 0, 3178 "config": {}, 3179 "labels": [label] if label else [], 3180 "tags": [], 3181 } 3182 3183 if type == "text": 3184 return TextPromptClient( 3185 prompt=Prompt_Text(**fallback_client_args), 3186 is_fallback=True, 3187 ) 3188 3189 if type == "chat": 3190 return ChatPromptClient( 3191 prompt=Prompt_Chat(**fallback_client_args), 3192 is_fallback=True, 3193 ) 3194 3195 raise e 3196 3197 if cached_prompt.is_expired(): 3198 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3199 try: 3200 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3201 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3202 3203 def refresh_task() -> None: 3204 self._fetch_prompt_and_update_cache( 3205 name, 3206 version=version, 3207 label=label, 3208 ttl_seconds=cache_ttl_seconds, 3209 max_retries=bounded_max_retries, 3210 fetch_timeout_seconds=fetch_timeout_seconds, 3211 ) 3212 3213 self._resources.prompt_cache.add_refresh_prompt_task( 3214 cache_key, 3215 refresh_task, 3216 ) 3217 langfuse_logger.debug( 3218 f"Returning stale prompt '{cache_key}' from cache." 3219 ) 3220 # return stale prompt 3221 return cached_prompt.value 3222 3223 except Exception as e: 3224 langfuse_logger.warning( 3225 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3226 ) 3227 # creation of refresh prompt task failed, return stale prompt 3228 return cached_prompt.value 3229 3230 return cached_prompt.value 3231 3232 def _fetch_prompt_and_update_cache( 3233 self, 3234 name: str, 3235 *, 3236 version: Optional[int] = None, 3237 label: Optional[str] = None, 3238 ttl_seconds: Optional[int] = None, 3239 max_retries: int, 3240 fetch_timeout_seconds: Optional[int], 3241 ) -> PromptClient: 3242 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3243 langfuse_logger.debug(f"Fetching prompt '{cache_key}' from server...") 3244 3245 try: 3246 3247 @backoff.on_exception( 3248 backoff.constant, Exception, max_tries=max_retries + 1, logger=None 3249 ) 3250 def fetch_prompts() -> Any: 3251 return self.api.prompts.get( 3252 self._url_encode(name), 3253 version=version, 3254 label=label, 3255 request_options={ 3256 "timeout_in_seconds": fetch_timeout_seconds, 3257 } 3258 if fetch_timeout_seconds is not None 3259 else None, 3260 ) 3261 3262 prompt_response = fetch_prompts() 3263 3264 prompt: PromptClient 3265 if prompt_response.type == "chat": 3266 prompt = ChatPromptClient(prompt_response) 3267 else: 3268 prompt = TextPromptClient(prompt_response) 3269 3270 if self._resources is not None: 3271 self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) 3272 3273 return prompt 3274 3275 except Exception as e: 3276 langfuse_logger.error( 3277 f"Error while fetching prompt '{cache_key}': {str(e)}" 3278 ) 3279 raise e 3280 3281 def _get_bounded_max_retries( 3282 self, 3283 max_retries: Optional[int], 3284 *, 3285 default_max_retries: int = 2, 3286 max_retries_upper_bound: int = 4, 3287 ) -> int: 3288 if max_retries is None: 3289 return default_max_retries 3290 3291 bounded_max_retries = min( 3292 max(max_retries, 0), 3293 max_retries_upper_bound, 3294 ) 3295 3296 return bounded_max_retries 3297 3298 @overload 3299 def create_prompt( 3300 self, 3301 *, 3302 name: str, 3303 prompt: List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]], 3304 labels: List[str] = [], 3305 tags: Optional[List[str]] = None, 3306 type: Optional[Literal["chat"]], 3307 config: Optional[Any] = None, 3308 commit_message: Optional[str] = None, 3309 ) -> ChatPromptClient: ... 3310 3311 @overload 3312 def create_prompt( 3313 self, 3314 *, 3315 name: str, 3316 prompt: str, 3317 labels: List[str] = [], 3318 tags: Optional[List[str]] = None, 3319 type: Optional[Literal["text"]] = "text", 3320 config: Optional[Any] = None, 3321 commit_message: Optional[str] = None, 3322 ) -> TextPromptClient: ... 3323 3324 def create_prompt( 3325 self, 3326 *, 3327 name: str, 3328 prompt: Union[ 3329 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3330 ], 3331 labels: List[str] = [], 3332 tags: Optional[List[str]] = None, 3333 type: Optional[Literal["chat", "text"]] = "text", 3334 config: Optional[Any] = None, 3335 commit_message: Optional[str] = None, 3336 ) -> PromptClient: 3337 """Create a new prompt in Langfuse. 3338 3339 Keyword Args: 3340 name : The name of the prompt to be created. 3341 prompt : The content of the prompt to be created. 3342 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3343 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3344 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3345 config: Additional structured data to be saved with the prompt. Defaults to None. 3346 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3347 commit_message: Optional string describing the change. 3348 3349 Returns: 3350 TextPromptClient: The prompt if type argument is 'text'. 3351 ChatPromptClient: The prompt if type argument is 'chat'. 3352 """ 3353 try: 3354 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3355 3356 if type == "chat": 3357 if not isinstance(prompt, list): 3358 raise ValueError( 3359 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3360 ) 3361 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3362 CreatePromptRequest_Chat( 3363 name=name, 3364 prompt=cast(Any, prompt), 3365 labels=labels, 3366 tags=tags, 3367 config=config or {}, 3368 commitMessage=commit_message, 3369 type="chat", 3370 ) 3371 ) 3372 server_prompt = self.api.prompts.create(request=request) 3373 3374 if self._resources is not None: 3375 self._resources.prompt_cache.invalidate(name) 3376 3377 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3378 3379 if not isinstance(prompt, str): 3380 raise ValueError("For 'text' type, 'prompt' must be a string.") 3381 3382 request = CreatePromptRequest_Text( 3383 name=name, 3384 prompt=prompt, 3385 labels=labels, 3386 tags=tags, 3387 config=config or {}, 3388 commitMessage=commit_message, 3389 type="text", 3390 ) 3391 3392 server_prompt = self.api.prompts.create(request=request) 3393 3394 if self._resources is not None: 3395 self._resources.prompt_cache.invalidate(name) 3396 3397 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3398 3399 except Error as e: 3400 handle_fern_exception(e) 3401 raise e 3402 3403 def update_prompt( 3404 self, 3405 *, 3406 name: str, 3407 version: int, 3408 new_labels: List[str] = [], 3409 ) -> Any: 3410 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3411 3412 Args: 3413 name (str): The name of the prompt to update. 3414 version (int): The version number of the prompt to update. 3415 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3416 3417 Returns: 3418 Prompt: The updated prompt from the Langfuse API. 3419 3420 """ 3421 updated_prompt = self.api.prompt_version.update( 3422 name=self._url_encode(name), 3423 version=version, 3424 new_labels=new_labels, 3425 ) 3426 3427 if self._resources is not None: 3428 self._resources.prompt_cache.invalidate(name) 3429 3430 return updated_prompt 3431 3432 def _url_encode(self, url: str, *, is_url_param: Optional[bool] = False) -> str: 3433 # httpx ≥ 0.28 does its own WHATWG-compliant quoting (eg. encodes bare 3434 # “%”, “?”, “#”, “|”, … in query/path parts). Re-quoting here would 3435 # double-encode, so we skip when the value is about to be sent straight 3436 # to httpx (`is_url_param=True`) and the installed version is ≥ 0.28. 3437 if is_url_param and Version(httpx.__version__) >= Version("0.28.0"): 3438 return url 3439 3440 # urllib.parse.quote does not escape slashes "/" by default; we need to add safe="" to force escaping 3441 # we need add safe="" to force escaping of slashes 3442 # This is necessary for prompts in prompt folders 3443 return urllib.parse.quote(url, safe="") 3444 3445 def clear_prompt_cache(self) -> None: 3446 """Clear the entire prompt cache, removing all cached prompts. 3447 3448 This method is useful when you want to force a complete refresh of all 3449 cached prompts, for example after major updates or when you need to 3450 ensure the latest versions are fetched from the server. 3451 """ 3452 if self._resources is not None: 3453 self._resources.prompt_cache.clear()
Main client for Langfuse tracing and platform features.
This class provides an interface for creating and managing traces, spans, and generations in Langfuse as well as interacting with the Langfuse API.
The client features a thread-safe singleton pattern for each unique public API key, ensuring consistent trace context propagation across your application. It implements efficient batching of spans with configurable flush settings and includes background thread management for media uploads and score ingestion.
Configuration is flexible through either direct parameters or environment variables, with graceful fallbacks and runtime configuration updates.
Attributes:
- api: Synchronous API client for Langfuse backend communication
- async_api: Asynchronous API client for Langfuse backend communication
- _otel_tracer: Internal LangfuseTracer instance managing OpenTelemetry components
Arguments:
- public_key (Optional[str]): Your Langfuse public API key. Can also be set via LANGFUSE_PUBLIC_KEY environment variable.
- secret_key (Optional[str]): Your Langfuse secret API key. Can also be set via LANGFUSE_SECRET_KEY environment variable.
- host (Optional[str]): The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". Can also be set via LANGFUSE_HOST environment variable.
- timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
- httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
- debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
- tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
- flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
- flush_interval (Optional[float]): Time in seconds between batch flushes. Defaults to 5 seconds. Can also be set via LANGFUSE_FLUSH_INTERVAL environment variable.
- environment (Optional[str]): Environment name for tracing. Default is 'default'. Can also be set via LANGFUSE_TRACING_ENVIRONMENT environment variable. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
- release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
- media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
- sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
- mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
- blocked_instrumentation_scopes (Optional[List[str]]): List of instrumentation scope names to block from being exported to Langfuse. Spans from these scopes will be filtered out before being sent to the API. Useful for filtering out spans from specific libraries or frameworks. For exported spans, you can see the instrumentation scope name in the span metadata in Langfuse (
metadata.scope.name
) - additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
- tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.
Example:
from langfuse.otel import Langfuse # Initialize the client (reads from env vars if not provided) langfuse = Langfuse( public_key="your-public-key", secret_key="your-secret-key", host="https://cloud.langfuse.com", # Optional, default shown ) # Create a trace span with langfuse.start_as_current_span(name="process-query") as span: # Your application code here # Create a nested generation span for an LLM call with span.start_as_current_generation( name="generate-response", model="gpt-4", input={"query": "Tell me about AI"}, model_parameters={"temperature": 0.7, "max_tokens": 500} ) as generation: # Generate response here response = "AI is a field of computer science..." generation.update( output=response, usage_details={"prompt_tokens": 10, "completion_tokens": 50}, cost_details={"total_cost": 0.0023} ) # Score the generation (supports NUMERIC, BOOLEAN, CATEGORICAL) generation.score(name="relevance", value=0.95, data_type="NUMERIC")
194 def __init__( 195 self, 196 *, 197 public_key: Optional[str] = None, 198 secret_key: Optional[str] = None, 199 host: Optional[str] = None, 200 timeout: Optional[int] = None, 201 httpx_client: Optional[httpx.Client] = None, 202 debug: bool = False, 203 tracing_enabled: Optional[bool] = True, 204 flush_at: Optional[int] = None, 205 flush_interval: Optional[float] = None, 206 environment: Optional[str] = None, 207 release: Optional[str] = None, 208 media_upload_thread_count: Optional[int] = None, 209 sample_rate: Optional[float] = None, 210 mask: Optional[MaskFunction] = None, 211 blocked_instrumentation_scopes: Optional[List[str]] = None, 212 additional_headers: Optional[Dict[str, str]] = None, 213 tracer_provider: Optional[TracerProvider] = None, 214 ): 215 self._host = host or cast( 216 str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") 217 ) 218 self._environment = environment or cast( 219 str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) 220 ) 221 self._project_id: Optional[str] = None 222 sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) 223 if not 0.0 <= sample_rate <= 1.0: 224 raise ValueError( 225 f"Sample rate must be between 0.0 and 1.0, got {sample_rate}" 226 ) 227 228 timeout = timeout or int(os.environ.get(LANGFUSE_TIMEOUT, 5)) 229 230 self._tracing_enabled = ( 231 tracing_enabled 232 and os.environ.get(LANGFUSE_TRACING_ENABLED, "true").lower() != "false" 233 ) 234 if not self._tracing_enabled: 235 langfuse_logger.info( 236 "Configuration: Langfuse tracing is explicitly disabled. No data will be sent to the Langfuse API." 237 ) 238 239 debug = ( 240 debug if debug else (os.getenv(LANGFUSE_DEBUG, "false").lower() == "true") 241 ) 242 if debug: 243 logging.basicConfig( 244 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 245 ) 246 langfuse_logger.setLevel(logging.DEBUG) 247 248 public_key = public_key or os.environ.get(LANGFUSE_PUBLIC_KEY) 249 if public_key is None: 250 langfuse_logger.warning( 251 "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " 252 "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " 253 ) 254 self._otel_tracer = otel_trace_api.NoOpTracer() 255 return 256 257 secret_key = secret_key or os.environ.get(LANGFUSE_SECRET_KEY) 258 if secret_key is None: 259 langfuse_logger.warning( 260 "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " 261 "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " 262 ) 263 self._otel_tracer = otel_trace_api.NoOpTracer() 264 return 265 266 if os.environ.get("OTEL_SDK_DISABLED", "false").lower() == "true": 267 langfuse_logger.warning( 268 "OTEL_SDK_DISABLED is set. Langfuse tracing will be disabled and no traces will appear in the UI." 269 ) 270 271 # Initialize api and tracer if requirements are met 272 self._resources = LangfuseResourceManager( 273 public_key=public_key, 274 secret_key=secret_key, 275 host=self._host, 276 timeout=timeout, 277 environment=self._environment, 278 release=release, 279 flush_at=flush_at, 280 flush_interval=flush_interval, 281 httpx_client=httpx_client, 282 media_upload_thread_count=media_upload_thread_count, 283 sample_rate=sample_rate, 284 mask=mask, 285 tracing_enabled=self._tracing_enabled, 286 blocked_instrumentation_scopes=blocked_instrumentation_scopes, 287 additional_headers=additional_headers, 288 tracer_provider=tracer_provider, 289 ) 290 self._mask = self._resources.mask 291 292 self._otel_tracer = ( 293 self._resources.tracer 294 if self._tracing_enabled and self._resources.tracer is not None 295 else otel_trace_api.NoOpTracer() 296 ) 297 self.api = self._resources.api 298 self.async_api = self._resources.async_api
300 def start_span( 301 self, 302 *, 303 trace_context: Optional[TraceContext] = None, 304 name: str, 305 input: Optional[Any] = None, 306 output: Optional[Any] = None, 307 metadata: Optional[Any] = None, 308 version: Optional[str] = None, 309 level: Optional[SpanLevel] = None, 310 status_message: Optional[str] = None, 311 ) -> LangfuseSpan: 312 """Create a new span for tracing a unit of work. 313 314 This method creates a new span but does not set it as the current span in the 315 context. To create and use a span within a context, use start_as_current_span(). 316 317 The created span will be the child of the current span in the context. 318 319 Args: 320 trace_context: Optional context for connecting to an existing trace 321 name: Name of the span (e.g., function or operation name) 322 input: Input data for the operation (can be any JSON-serializable object) 323 output: Output data from the operation (can be any JSON-serializable object) 324 metadata: Additional metadata to associate with the span 325 version: Version identifier for the code or component 326 level: Importance level of the span (info, warning, error) 327 status_message: Optional status message for the span 328 329 Returns: 330 A LangfuseSpan object that must be ended with .end() when the operation completes 331 332 Example: 333 ```python 334 span = langfuse.start_span(name="process-data") 335 try: 336 # Do work 337 span.update(output="result") 338 finally: 339 span.end() 340 ``` 341 """ 342 return self.start_observation( 343 trace_context=trace_context, 344 name=name, 345 as_type="span", 346 input=input, 347 output=output, 348 metadata=metadata, 349 version=version, 350 level=level, 351 status_message=status_message, 352 )
Create a new span for tracing a unit of work.
This method creates a new span but does not set it as the current span in the context. To create and use a span within a context, use start_as_current_span().
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A LangfuseSpan object that must be ended with .end() when the operation completes
Example:
span = langfuse.start_span(name="process-data") try: # Do work span.update(output="result") finally: span.end()
354 def start_as_current_span( 355 self, 356 *, 357 trace_context: Optional[TraceContext] = None, 358 name: str, 359 input: Optional[Any] = None, 360 output: Optional[Any] = None, 361 metadata: Optional[Any] = None, 362 version: Optional[str] = None, 363 level: Optional[SpanLevel] = None, 364 status_message: Optional[str] = None, 365 end_on_exit: Optional[bool] = None, 366 ) -> _AgnosticContextManager[LangfuseSpan]: 367 """Create a new span and set it as the current span in a context manager. 368 369 This method creates a new span and sets it as the current span within a context 370 manager. Use this method with a 'with' statement to automatically handle span 371 lifecycle within a code block. 372 373 The created span will be the child of the current span in the context. 374 375 Args: 376 trace_context: Optional context for connecting to an existing trace 377 name: Name of the span (e.g., function or operation name) 378 input: Input data for the operation (can be any JSON-serializable object) 379 output: Output data from the operation (can be any JSON-serializable object) 380 metadata: Additional metadata to associate with the span 381 version: Version identifier for the code or component 382 level: Importance level of the span (info, warning, error) 383 status_message: Optional status message for the span 384 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 385 386 Returns: 387 A context manager that yields a LangfuseSpan 388 389 Example: 390 ```python 391 with langfuse.start_as_current_span(name="process-query") as span: 392 # Do work 393 result = process_data() 394 span.update(output=result) 395 396 # Create a child span automatically 397 with span.start_as_current_span(name="sub-operation") as child_span: 398 # Do sub-operation work 399 child_span.update(output="sub-result") 400 ``` 401 """ 402 return self.start_as_current_observation( 403 trace_context=trace_context, 404 name=name, 405 as_type="span", 406 input=input, 407 output=output, 408 metadata=metadata, 409 version=version, 410 level=level, 411 status_message=status_message, 412 end_on_exit=end_on_exit, 413 )
Create a new span and set it as the current span in a context manager.
This method creates a new span and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle span lifecycle within a code block.
The created span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-query") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result")
562 def start_observation( 563 self, 564 *, 565 trace_context: Optional[TraceContext] = None, 566 name: str, 567 as_type: ObservationTypeLiteralNoEvent = "span", 568 input: Optional[Any] = None, 569 output: Optional[Any] = None, 570 metadata: Optional[Any] = None, 571 version: Optional[str] = None, 572 level: Optional[SpanLevel] = None, 573 status_message: Optional[str] = None, 574 completion_start_time: Optional[datetime] = None, 575 model: Optional[str] = None, 576 model_parameters: Optional[Dict[str, MapValue]] = None, 577 usage_details: Optional[Dict[str, int]] = None, 578 cost_details: Optional[Dict[str, float]] = None, 579 prompt: Optional[PromptClient] = None, 580 ) -> Union[ 581 LangfuseSpan, 582 LangfuseGeneration, 583 LangfuseAgent, 584 LangfuseTool, 585 LangfuseChain, 586 LangfuseRetriever, 587 LangfuseEvaluator, 588 LangfuseEmbedding, 589 LangfuseGuardrail, 590 ]: 591 """Create a new observation of the specified type. 592 593 This method creates a new observation but does not set it as the current span in the 594 context. To create and use an observation within a context, use start_as_current_observation(). 595 596 Args: 597 trace_context: Optional context for connecting to an existing trace 598 name: Name of the observation 599 as_type: Type of observation to create (defaults to "span") 600 input: Input data for the operation 601 output: Output data from the operation 602 metadata: Additional metadata to associate with the observation 603 version: Version identifier for the code or component 604 level: Importance level of the observation 605 status_message: Optional status message for the observation 606 completion_start_time: When the model started generating (for generation types) 607 model: Name/identifier of the AI model used (for generation types) 608 model_parameters: Parameters used for the model (for generation types) 609 usage_details: Token usage information (for generation types) 610 cost_details: Cost information (for generation types) 611 prompt: Associated prompt template (for generation types) 612 613 Returns: 614 An observation object of the appropriate type that must be ended with .end() 615 """ 616 if trace_context: 617 trace_id = trace_context.get("trace_id", None) 618 parent_span_id = trace_context.get("parent_span_id", None) 619 620 if trace_id: 621 remote_parent_span = self._create_remote_parent_span( 622 trace_id=trace_id, parent_span_id=parent_span_id 623 ) 624 625 with otel_trace_api.use_span( 626 cast(otel_trace_api.Span, remote_parent_span) 627 ): 628 otel_span = self._otel_tracer.start_span(name=name) 629 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 630 631 return self._create_observation_from_otel_span( 632 otel_span=otel_span, 633 as_type=as_type, 634 input=input, 635 output=output, 636 metadata=metadata, 637 version=version, 638 level=level, 639 status_message=status_message, 640 completion_start_time=completion_start_time, 641 model=model, 642 model_parameters=model_parameters, 643 usage_details=usage_details, 644 cost_details=cost_details, 645 prompt=prompt, 646 ) 647 648 otel_span = self._otel_tracer.start_span(name=name) 649 650 return self._create_observation_from_otel_span( 651 otel_span=otel_span, 652 as_type=as_type, 653 input=input, 654 output=output, 655 metadata=metadata, 656 version=version, 657 level=level, 658 status_message=status_message, 659 completion_start_time=completion_start_time, 660 model=model, 661 model_parameters=model_parameters, 662 usage_details=usage_details, 663 cost_details=cost_details, 664 prompt=prompt, 665 )
Create a new observation of the specified type.
This method creates a new observation but does not set it as the current span in the context. To create and use an observation within a context, use start_as_current_observation().
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation
- status_message: Optional status message for the observation
- completion_start_time: When the model started generating (for generation types)
- model: Name/identifier of the AI model used (for generation types)
- model_parameters: Parameters used for the model (for generation types)
- usage_details: Token usage information (for generation types)
- cost_details: Cost information (for generation types)
- prompt: Associated prompt template (for generation types)
Returns:
An observation object of the appropriate type that must be ended with .end()
737 def start_generation( 738 self, 739 *, 740 trace_context: Optional[TraceContext] = None, 741 name: str, 742 input: Optional[Any] = None, 743 output: Optional[Any] = None, 744 metadata: Optional[Any] = None, 745 version: Optional[str] = None, 746 level: Optional[SpanLevel] = None, 747 status_message: Optional[str] = None, 748 completion_start_time: Optional[datetime] = None, 749 model: Optional[str] = None, 750 model_parameters: Optional[Dict[str, MapValue]] = None, 751 usage_details: Optional[Dict[str, int]] = None, 752 cost_details: Optional[Dict[str, float]] = None, 753 prompt: Optional[PromptClient] = None, 754 ) -> LangfuseGeneration: 755 """Create a new generation span for model generations. 756 757 DEPRECATED: This method is deprecated and will be removed in a future version. 758 Use start_observation(as_type='generation') instead. 759 760 This method creates a specialized span for tracking model generations. 761 It includes additional fields specific to model generations such as model name, 762 token usage, and cost details. 763 764 The created generation span will be the child of the current span in the context. 765 766 Args: 767 trace_context: Optional context for connecting to an existing trace 768 name: Name of the generation operation 769 input: Input data for the model (e.g., prompts) 770 output: Output from the model (e.g., completions) 771 metadata: Additional metadata to associate with the generation 772 version: Version identifier for the model or component 773 level: Importance level of the generation (info, warning, error) 774 status_message: Optional status message for the generation 775 completion_start_time: When the model started generating the response 776 model: Name/identifier of the AI model used (e.g., "gpt-4") 777 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 778 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 779 cost_details: Cost information for the model call 780 prompt: Associated prompt template from Langfuse prompt management 781 782 Returns: 783 A LangfuseGeneration object that must be ended with .end() when complete 784 785 Example: 786 ```python 787 generation = langfuse.start_generation( 788 name="answer-generation", 789 model="gpt-4", 790 input={"prompt": "Explain quantum computing"}, 791 model_parameters={"temperature": 0.7} 792 ) 793 try: 794 # Call model API 795 response = llm.generate(...) 796 797 generation.update( 798 output=response.text, 799 usage_details={ 800 "prompt_tokens": response.usage.prompt_tokens, 801 "completion_tokens": response.usage.completion_tokens 802 } 803 ) 804 finally: 805 generation.end() 806 ``` 807 """ 808 warnings.warn( 809 "start_generation is deprecated and will be removed in a future version. " 810 "Use start_observation(as_type='generation') instead.", 811 DeprecationWarning, 812 stacklevel=2, 813 ) 814 return self.start_observation( 815 trace_context=trace_context, 816 name=name, 817 as_type="generation", 818 input=input, 819 output=output, 820 metadata=metadata, 821 version=version, 822 level=level, 823 status_message=status_message, 824 completion_start_time=completion_start_time, 825 model=model, 826 model_parameters=model_parameters, 827 usage_details=usage_details, 828 cost_details=cost_details, 829 prompt=prompt, 830 )
Create a new generation span for model generations.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a specialized span for tracking model generations. It includes additional fields specific to model generations such as model name, token usage, and cost details.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A LangfuseGeneration object that must be ended with .end() when complete
Example:
generation = langfuse.start_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"}, model_parameters={"temperature": 0.7} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end()
832 def start_as_current_generation( 833 self, 834 *, 835 trace_context: Optional[TraceContext] = None, 836 name: str, 837 input: Optional[Any] = None, 838 output: Optional[Any] = None, 839 metadata: Optional[Any] = None, 840 version: Optional[str] = None, 841 level: Optional[SpanLevel] = None, 842 status_message: Optional[str] = None, 843 completion_start_time: Optional[datetime] = None, 844 model: Optional[str] = None, 845 model_parameters: Optional[Dict[str, MapValue]] = None, 846 usage_details: Optional[Dict[str, int]] = None, 847 cost_details: Optional[Dict[str, float]] = None, 848 prompt: Optional[PromptClient] = None, 849 end_on_exit: Optional[bool] = None, 850 ) -> _AgnosticContextManager[LangfuseGeneration]: 851 """Create a new generation span and set it as the current span in a context manager. 852 853 DEPRECATED: This method is deprecated and will be removed in a future version. 854 Use start_as_current_observation(as_type='generation') instead. 855 856 This method creates a specialized span for model generations and sets it as the 857 current span within a context manager. Use this method with a 'with' statement to 858 automatically handle the generation span lifecycle within a code block. 859 860 The created generation span will be the child of the current span in the context. 861 862 Args: 863 trace_context: Optional context for connecting to an existing trace 864 name: Name of the generation operation 865 input: Input data for the model (e.g., prompts) 866 output: Output from the model (e.g., completions) 867 metadata: Additional metadata to associate with the generation 868 version: Version identifier for the model or component 869 level: Importance level of the generation (info, warning, error) 870 status_message: Optional status message for the generation 871 completion_start_time: When the model started generating the response 872 model: Name/identifier of the AI model used (e.g., "gpt-4") 873 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 874 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 875 cost_details: Cost information for the model call 876 prompt: Associated prompt template from Langfuse prompt management 877 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 878 879 Returns: 880 A context manager that yields a LangfuseGeneration 881 882 Example: 883 ```python 884 with langfuse.start_as_current_generation( 885 name="answer-generation", 886 model="gpt-4", 887 input={"prompt": "Explain quantum computing"} 888 ) as generation: 889 # Call model API 890 response = llm.generate(...) 891 892 # Update with results 893 generation.update( 894 output=response.text, 895 usage_details={ 896 "prompt_tokens": response.usage.prompt_tokens, 897 "completion_tokens": response.usage.completion_tokens 898 } 899 ) 900 ``` 901 """ 902 warnings.warn( 903 "start_as_current_generation is deprecated and will be removed in a future version. " 904 "Use start_as_current_observation(as_type='generation') instead.", 905 DeprecationWarning, 906 stacklevel=2, 907 ) 908 return self.start_as_current_observation( 909 trace_context=trace_context, 910 name=name, 911 as_type="generation", 912 input=input, 913 output=output, 914 metadata=metadata, 915 version=version, 916 level=level, 917 status_message=status_message, 918 completion_start_time=completion_start_time, 919 model=model, 920 model_parameters=model_parameters, 921 usage_details=usage_details, 922 cost_details=cost_details, 923 prompt=prompt, 924 end_on_exit=end_on_exit, 925 )
Create a new generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a specialized span for model generations and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the generation span lifecycle within a code block.
The created generation span will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
Returns:
A context manager that yields a LangfuseGeneration
Example:
with langfuse.start_as_current_generation( name="answer-generation", model="gpt-4", input={"prompt": "Explain quantum computing"} ) as generation: # Call model API response = llm.generate(...) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1083 def start_as_current_observation( 1084 self, 1085 *, 1086 trace_context: Optional[TraceContext] = None, 1087 name: str, 1088 as_type: ObservationTypeLiteralNoEvent = "span", 1089 input: Optional[Any] = None, 1090 output: Optional[Any] = None, 1091 metadata: Optional[Any] = None, 1092 version: Optional[str] = None, 1093 level: Optional[SpanLevel] = None, 1094 status_message: Optional[str] = None, 1095 completion_start_time: Optional[datetime] = None, 1096 model: Optional[str] = None, 1097 model_parameters: Optional[Dict[str, MapValue]] = None, 1098 usage_details: Optional[Dict[str, int]] = None, 1099 cost_details: Optional[Dict[str, float]] = None, 1100 prompt: Optional[PromptClient] = None, 1101 end_on_exit: Optional[bool] = None, 1102 ) -> Union[ 1103 _AgnosticContextManager[LangfuseGeneration], 1104 _AgnosticContextManager[LangfuseSpan], 1105 _AgnosticContextManager[LangfuseAgent], 1106 _AgnosticContextManager[LangfuseTool], 1107 _AgnosticContextManager[LangfuseChain], 1108 _AgnosticContextManager[LangfuseRetriever], 1109 _AgnosticContextManager[LangfuseEvaluator], 1110 _AgnosticContextManager[LangfuseEmbedding], 1111 _AgnosticContextManager[LangfuseGuardrail], 1112 ]: 1113 """Create a new observation and set it as the current span in a context manager. 1114 1115 This method creates a new observation of the specified type and sets it as the 1116 current span within a context manager. Use this method with a 'with' statement to 1117 automatically handle the observation lifecycle within a code block. 1118 1119 The created observation will be the child of the current span in the context. 1120 1121 Args: 1122 trace_context: Optional context for connecting to an existing trace 1123 name: Name of the observation (e.g., function or operation name) 1124 as_type: Type of observation to create (defaults to "span") 1125 input: Input data for the operation (can be any JSON-serializable object) 1126 output: Output data from the operation (can be any JSON-serializable object) 1127 metadata: Additional metadata to associate with the observation 1128 version: Version identifier for the code or component 1129 level: Importance level of the observation (info, warning, error) 1130 status_message: Optional status message for the observation 1131 end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks. 1132 1133 The following parameters are available when as_type is: "generation" or "embedding". 1134 completion_start_time: When the model started generating the response 1135 model: Name/identifier of the AI model used (e.g., "gpt-4") 1136 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1137 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1138 cost_details: Cost information for the model call 1139 prompt: Associated prompt template from Langfuse prompt management 1140 1141 Returns: 1142 A context manager that yields the appropriate observation type based on as_type 1143 1144 Example: 1145 ```python 1146 # Create a span 1147 with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: 1148 # Do work 1149 result = process_data() 1150 span.update(output=result) 1151 1152 # Create a child span automatically 1153 with span.start_as_current_span(name="sub-operation") as child_span: 1154 # Do sub-operation work 1155 child_span.update(output="sub-result") 1156 1157 # Create a tool observation 1158 with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: 1159 # Do tool work 1160 results = search_web(query) 1161 tool.update(output=results) 1162 1163 # Create a generation observation 1164 with langfuse.start_as_current_observation( 1165 name="answer-generation", 1166 as_type="generation", 1167 model="gpt-4" 1168 ) as generation: 1169 # Generate answer 1170 response = llm.generate(...) 1171 generation.update(output=response) 1172 ``` 1173 """ 1174 if as_type in get_observation_types_list(ObservationTypeGenerationLike): 1175 if trace_context: 1176 trace_id = trace_context.get("trace_id", None) 1177 parent_span_id = trace_context.get("parent_span_id", None) 1178 1179 if trace_id: 1180 remote_parent_span = self._create_remote_parent_span( 1181 trace_id=trace_id, parent_span_id=parent_span_id 1182 ) 1183 1184 return cast( 1185 Union[ 1186 _AgnosticContextManager[LangfuseGeneration], 1187 _AgnosticContextManager[LangfuseEmbedding], 1188 ], 1189 self._create_span_with_parent_context( 1190 as_type=as_type, 1191 name=name, 1192 remote_parent_span=remote_parent_span, 1193 parent=None, 1194 end_on_exit=end_on_exit, 1195 input=input, 1196 output=output, 1197 metadata=metadata, 1198 version=version, 1199 level=level, 1200 status_message=status_message, 1201 completion_start_time=completion_start_time, 1202 model=model, 1203 model_parameters=model_parameters, 1204 usage_details=usage_details, 1205 cost_details=cost_details, 1206 prompt=prompt, 1207 ), 1208 ) 1209 1210 return cast( 1211 Union[ 1212 _AgnosticContextManager[LangfuseGeneration], 1213 _AgnosticContextManager[LangfuseEmbedding], 1214 ], 1215 self._start_as_current_otel_span_with_processed_media( 1216 as_type=as_type, 1217 name=name, 1218 end_on_exit=end_on_exit, 1219 input=input, 1220 output=output, 1221 metadata=metadata, 1222 version=version, 1223 level=level, 1224 status_message=status_message, 1225 completion_start_time=completion_start_time, 1226 model=model, 1227 model_parameters=model_parameters, 1228 usage_details=usage_details, 1229 cost_details=cost_details, 1230 prompt=prompt, 1231 ), 1232 ) 1233 1234 if as_type in get_observation_types_list(ObservationTypeSpanLike): 1235 if trace_context: 1236 trace_id = trace_context.get("trace_id", None) 1237 parent_span_id = trace_context.get("parent_span_id", None) 1238 1239 if trace_id: 1240 remote_parent_span = self._create_remote_parent_span( 1241 trace_id=trace_id, parent_span_id=parent_span_id 1242 ) 1243 1244 return cast( 1245 Union[ 1246 _AgnosticContextManager[LangfuseSpan], 1247 _AgnosticContextManager[LangfuseAgent], 1248 _AgnosticContextManager[LangfuseTool], 1249 _AgnosticContextManager[LangfuseChain], 1250 _AgnosticContextManager[LangfuseRetriever], 1251 _AgnosticContextManager[LangfuseEvaluator], 1252 _AgnosticContextManager[LangfuseGuardrail], 1253 ], 1254 self._create_span_with_parent_context( 1255 as_type=as_type, 1256 name=name, 1257 remote_parent_span=remote_parent_span, 1258 parent=None, 1259 end_on_exit=end_on_exit, 1260 input=input, 1261 output=output, 1262 metadata=metadata, 1263 version=version, 1264 level=level, 1265 status_message=status_message, 1266 ), 1267 ) 1268 1269 return cast( 1270 Union[ 1271 _AgnosticContextManager[LangfuseSpan], 1272 _AgnosticContextManager[LangfuseAgent], 1273 _AgnosticContextManager[LangfuseTool], 1274 _AgnosticContextManager[LangfuseChain], 1275 _AgnosticContextManager[LangfuseRetriever], 1276 _AgnosticContextManager[LangfuseEvaluator], 1277 _AgnosticContextManager[LangfuseGuardrail], 1278 ], 1279 self._start_as_current_otel_span_with_processed_media( 1280 as_type=as_type, 1281 name=name, 1282 end_on_exit=end_on_exit, 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 ), 1290 ) 1291 1292 # This should never be reached since all valid types are handled above 1293 langfuse_logger.warning( 1294 f"Unknown observation type: {as_type}, falling back to span" 1295 ) 1296 return self._start_as_current_otel_span_with_processed_media( 1297 as_type="span", 1298 name=name, 1299 end_on_exit=end_on_exit, 1300 input=input, 1301 output=output, 1302 metadata=metadata, 1303 version=version, 1304 level=level, 1305 status_message=status_message, 1306 )
Create a new observation and set it as the current span in a context manager.
This method creates a new observation of the specified type and sets it as the current span within a context manager. Use this method with a 'with' statement to automatically handle the observation lifecycle within a code block.
The created observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the observation (e.g., function or operation name)
- as_type: Type of observation to create (defaults to "span")
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the observation
- version: Version identifier for the code or component
- level: Importance level of the observation (info, warning, error)
- status_message: Optional status message for the observation
- end_on_exit (default: True): Whether to end the span automatically when leaving the context manager. If False, the span must be manually ended to avoid memory leaks.
- The following parameters are available when as_type is: "generation" or "embedding".
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields the appropriate observation type based on as_type
Example:
# Create a span with langfuse.start_as_current_observation(name="process-query", as_type="span") as span: # Do work result = process_data() span.update(output=result) # Create a child span automatically with span.start_as_current_span(name="sub-operation") as child_span: # Do sub-operation work child_span.update(output="sub-result") # Create a tool observation with langfuse.start_as_current_observation(name="web-search", as_type="tool") as tool: # Do tool work results = search_web(query) tool.update(output=results) # Create a generation observation with langfuse.start_as_current_observation( name="answer-generation", as_type="generation", model="gpt-4" ) as generation: # Generate answer response = llm.generate(...) generation.update(output=response)
1467 def update_current_generation( 1468 self, 1469 *, 1470 name: Optional[str] = None, 1471 input: Optional[Any] = None, 1472 output: Optional[Any] = None, 1473 metadata: Optional[Any] = None, 1474 version: Optional[str] = None, 1475 level: Optional[SpanLevel] = None, 1476 status_message: Optional[str] = None, 1477 completion_start_time: Optional[datetime] = None, 1478 model: Optional[str] = None, 1479 model_parameters: Optional[Dict[str, MapValue]] = None, 1480 usage_details: Optional[Dict[str, int]] = None, 1481 cost_details: Optional[Dict[str, float]] = None, 1482 prompt: Optional[PromptClient] = None, 1483 ) -> None: 1484 """Update the current active generation span with new information. 1485 1486 This method updates the current generation span in the active context with 1487 additional information. It's useful for adding output, usage stats, or other 1488 details that become available during or after model generation. 1489 1490 Args: 1491 name: The generation name 1492 input: Updated input data for the model 1493 output: Output from the model (e.g., completions) 1494 metadata: Additional metadata to associate with the generation 1495 version: Version identifier for the model or component 1496 level: Importance level of the generation (info, warning, error) 1497 status_message: Optional status message for the generation 1498 completion_start_time: When the model started generating the response 1499 model: Name/identifier of the AI model used (e.g., "gpt-4") 1500 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1501 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1502 cost_details: Cost information for the model call 1503 prompt: Associated prompt template from Langfuse prompt management 1504 1505 Example: 1506 ```python 1507 with langfuse.start_as_current_generation(name="answer-query") as generation: 1508 # Initial setup and API call 1509 response = llm.generate(...) 1510 1511 # Update with results that weren't available at creation time 1512 langfuse.update_current_generation( 1513 output=response.text, 1514 usage_details={ 1515 "prompt_tokens": response.usage.prompt_tokens, 1516 "completion_tokens": response.usage.completion_tokens 1517 } 1518 ) 1519 ``` 1520 """ 1521 if not self._tracing_enabled: 1522 langfuse_logger.debug( 1523 "Operation skipped: update_current_generation - Tracing is disabled or client is in no-op mode." 1524 ) 1525 return 1526 1527 current_otel_span = self._get_current_otel_span() 1528 1529 if current_otel_span is not None: 1530 generation = LangfuseGeneration( 1531 otel_span=current_otel_span, langfuse_client=self 1532 ) 1533 1534 if name: 1535 current_otel_span.update_name(name) 1536 1537 generation.update( 1538 input=input, 1539 output=output, 1540 metadata=metadata, 1541 version=version, 1542 level=level, 1543 status_message=status_message, 1544 completion_start_time=completion_start_time, 1545 model=model, 1546 model_parameters=model_parameters, 1547 usage_details=usage_details, 1548 cost_details=cost_details, 1549 prompt=prompt, 1550 )
Update the current active generation span with new information.
This method updates the current generation span in the active context with additional information. It's useful for adding output, usage stats, or other details that become available during or after model generation.
Arguments:
- name: The generation name
- input: Updated input data for the model
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Initial setup and API call response = llm.generate(...) # Update with results that weren't available at creation time langfuse.update_current_generation( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } )
1552 def update_current_span( 1553 self, 1554 *, 1555 name: Optional[str] = None, 1556 input: Optional[Any] = None, 1557 output: Optional[Any] = None, 1558 metadata: Optional[Any] = None, 1559 version: Optional[str] = None, 1560 level: Optional[SpanLevel] = None, 1561 status_message: Optional[str] = None, 1562 ) -> None: 1563 """Update the current active span with new information. 1564 1565 This method updates the current span in the active context with 1566 additional information. It's useful for adding outputs or metadata 1567 that become available during execution. 1568 1569 Args: 1570 name: The span name 1571 input: Updated input data for the operation 1572 output: Output data from the operation 1573 metadata: Additional metadata to associate with the span 1574 version: Version identifier for the code or component 1575 level: Importance level of the span (info, warning, error) 1576 status_message: Optional status message for the span 1577 1578 Example: 1579 ```python 1580 with langfuse.start_as_current_span(name="process-data") as span: 1581 # Initial processing 1582 result = process_first_part() 1583 1584 # Update with intermediate results 1585 langfuse.update_current_span(metadata={"intermediate_result": result}) 1586 1587 # Continue processing 1588 final_result = process_second_part(result) 1589 1590 # Final update 1591 langfuse.update_current_span(output=final_result) 1592 ``` 1593 """ 1594 if not self._tracing_enabled: 1595 langfuse_logger.debug( 1596 "Operation skipped: update_current_span - Tracing is disabled or client is in no-op mode." 1597 ) 1598 return 1599 1600 current_otel_span = self._get_current_otel_span() 1601 1602 if current_otel_span is not None: 1603 span = LangfuseSpan( 1604 otel_span=current_otel_span, 1605 langfuse_client=self, 1606 environment=self._environment, 1607 ) 1608 1609 if name: 1610 current_otel_span.update_name(name) 1611 1612 span.update( 1613 input=input, 1614 output=output, 1615 metadata=metadata, 1616 version=version, 1617 level=level, 1618 status_message=status_message, 1619 )
Update the current active span with new information.
This method updates the current span in the active context with additional information. It's useful for adding outputs or metadata that become available during execution.
Arguments:
- name: The span name
- input: Updated input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Example:
with langfuse.start_as_current_span(name="process-data") as span: # Initial processing result = process_first_part() # Update with intermediate results langfuse.update_current_span(metadata={"intermediate_result": result}) # Continue processing final_result = process_second_part(result) # Final update langfuse.update_current_span(output=final_result)
1621 def update_current_trace( 1622 self, 1623 *, 1624 name: Optional[str] = None, 1625 user_id: Optional[str] = None, 1626 session_id: Optional[str] = None, 1627 version: Optional[str] = None, 1628 input: Optional[Any] = None, 1629 output: Optional[Any] = None, 1630 metadata: Optional[Any] = None, 1631 tags: Optional[List[str]] = None, 1632 public: Optional[bool] = None, 1633 ) -> None: 1634 """Update the current trace with additional information. 1635 1636 This method updates the Langfuse trace that the current span belongs to. It's useful for 1637 adding trace-level metadata like user ID, session ID, or tags that apply to 1638 the entire Langfuse trace rather than just a single observation. 1639 1640 Args: 1641 name: Updated name for the Langfuse trace 1642 user_id: ID of the user who initiated the Langfuse trace 1643 session_id: Session identifier for grouping related Langfuse traces 1644 version: Version identifier for the application or service 1645 input: Input data for the overall Langfuse trace 1646 output: Output data from the overall Langfuse trace 1647 metadata: Additional metadata to associate with the Langfuse trace 1648 tags: List of tags to categorize the Langfuse trace 1649 public: Whether the Langfuse trace should be publicly accessible 1650 1651 Example: 1652 ```python 1653 with langfuse.start_as_current_span(name="handle-request") as span: 1654 # Get user information 1655 user = authenticate_user(request) 1656 1657 # Update trace with user context 1658 langfuse.update_current_trace( 1659 user_id=user.id, 1660 session_id=request.session_id, 1661 tags=["production", "web-app"] 1662 ) 1663 1664 # Continue processing 1665 response = process_request(request) 1666 1667 # Update span with results 1668 span.update(output=response) 1669 ``` 1670 """ 1671 if not self._tracing_enabled: 1672 langfuse_logger.debug( 1673 "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." 1674 ) 1675 return 1676 1677 current_otel_span = self._get_current_otel_span() 1678 1679 if current_otel_span is not None: 1680 existing_observation_type = current_otel_span.attributes.get( # type: ignore[attr-defined] 1681 LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "span" 1682 ) 1683 # We need to preserve the class to keep the correct observation type 1684 span_class = self._get_span_class(existing_observation_type) 1685 span = span_class( 1686 otel_span=current_otel_span, 1687 langfuse_client=self, 1688 environment=self._environment, 1689 ) 1690 1691 span.update_trace( 1692 name=name, 1693 user_id=user_id, 1694 session_id=session_id, 1695 version=version, 1696 input=input, 1697 output=output, 1698 metadata=metadata, 1699 tags=tags, 1700 public=public, 1701 )
Update the current trace with additional information.
This method updates the Langfuse trace that the current span belongs to. It's useful for adding trace-level metadata like user ID, session ID, or tags that apply to the entire Langfuse trace rather than just a single observation.
Arguments:
- name: Updated name for the Langfuse trace
- user_id: ID of the user who initiated the Langfuse trace
- session_id: Session identifier for grouping related Langfuse traces
- version: Version identifier for the application or service
- input: Input data for the overall Langfuse trace
- output: Output data from the overall Langfuse trace
- metadata: Additional metadata to associate with the Langfuse trace
- tags: List of tags to categorize the Langfuse trace
- public: Whether the Langfuse trace should be publicly accessible
Example:
with langfuse.start_as_current_span(name="handle-request") as span: # Get user information user = authenticate_user(request) # Update trace with user context langfuse.update_current_trace( user_id=user.id, session_id=request.session_id, tags=["production", "web-app"] ) # Continue processing response = process_request(request) # Update span with results span.update(output=response)
1703 def create_event( 1704 self, 1705 *, 1706 trace_context: Optional[TraceContext] = None, 1707 name: str, 1708 input: Optional[Any] = None, 1709 output: Optional[Any] = None, 1710 metadata: Optional[Any] = None, 1711 version: Optional[str] = None, 1712 level: Optional[SpanLevel] = None, 1713 status_message: Optional[str] = None, 1714 ) -> LangfuseEvent: 1715 """Create a new Langfuse observation of type 'EVENT'. 1716 1717 The created Langfuse Event observation will be the child of the current span in the context. 1718 1719 Args: 1720 trace_context: Optional context for connecting to an existing trace 1721 name: Name of the span (e.g., function or operation name) 1722 input: Input data for the operation (can be any JSON-serializable object) 1723 output: Output data from the operation (can be any JSON-serializable object) 1724 metadata: Additional metadata to associate with the span 1725 version: Version identifier for the code or component 1726 level: Importance level of the span (info, warning, error) 1727 status_message: Optional status message for the span 1728 1729 Returns: 1730 The Langfuse Event object 1731 1732 Example: 1733 ```python 1734 event = langfuse.create_event(name="process-event") 1735 ``` 1736 """ 1737 timestamp = time_ns() 1738 1739 if trace_context: 1740 trace_id = trace_context.get("trace_id", None) 1741 parent_span_id = trace_context.get("parent_span_id", None) 1742 1743 if trace_id: 1744 remote_parent_span = self._create_remote_parent_span( 1745 trace_id=trace_id, parent_span_id=parent_span_id 1746 ) 1747 1748 with otel_trace_api.use_span( 1749 cast(otel_trace_api.Span, remote_parent_span) 1750 ): 1751 otel_span = self._otel_tracer.start_span( 1752 name=name, start_time=timestamp 1753 ) 1754 otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) 1755 1756 return cast( 1757 LangfuseEvent, 1758 LangfuseEvent( 1759 otel_span=otel_span, 1760 langfuse_client=self, 1761 environment=self._environment, 1762 input=input, 1763 output=output, 1764 metadata=metadata, 1765 version=version, 1766 level=level, 1767 status_message=status_message, 1768 ).end(end_time=timestamp), 1769 ) 1770 1771 otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) 1772 1773 return cast( 1774 LangfuseEvent, 1775 LangfuseEvent( 1776 otel_span=otel_span, 1777 langfuse_client=self, 1778 environment=self._environment, 1779 input=input, 1780 output=output, 1781 metadata=metadata, 1782 version=version, 1783 level=level, 1784 status_message=status_message, 1785 ).end(end_time=timestamp), 1786 )
Create a new Langfuse observation of type 'EVENT'.
The created Langfuse Event observation will be the child of the current span in the context.
Arguments:
- trace_context: Optional context for connecting to an existing trace
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The Langfuse Event object
Example:
event = langfuse.create_event(name="process-event")
1875 @staticmethod 1876 def create_trace_id(*, seed: Optional[str] = None) -> str: 1877 """Create a unique trace ID for use with Langfuse. 1878 1879 This method generates a unique trace ID for use with various Langfuse APIs. 1880 It can either generate a random ID or create a deterministic ID based on 1881 a seed string. 1882 1883 Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. 1884 This method ensures the generated ID meets this requirement. If you need to 1885 correlate an external ID with a Langfuse trace ID, use the external ID as the 1886 seed to get a valid, deterministic Langfuse trace ID. 1887 1888 Args: 1889 seed: Optional string to use as a seed for deterministic ID generation. 1890 If provided, the same seed will always produce the same ID. 1891 If not provided, a random ID will be generated. 1892 1893 Returns: 1894 A 32-character lowercase hexadecimal string representing the Langfuse trace ID. 1895 1896 Example: 1897 ```python 1898 # Generate a random trace ID 1899 trace_id = langfuse.create_trace_id() 1900 1901 # Generate a deterministic ID based on a seed 1902 session_trace_id = langfuse.create_trace_id(seed="session-456") 1903 1904 # Correlate an external ID with a Langfuse trace ID 1905 external_id = "external-system-123456" 1906 correlated_trace_id = langfuse.create_trace_id(seed=external_id) 1907 1908 # Use the ID with trace context 1909 with langfuse.start_as_current_span( 1910 name="process-request", 1911 trace_context={"trace_id": trace_id} 1912 ) as span: 1913 # Operation will be part of the specific trace 1914 pass 1915 ``` 1916 """ 1917 if not seed: 1918 trace_id_int = RandomIdGenerator().generate_trace_id() 1919 1920 return Langfuse._format_otel_trace_id(trace_id_int) 1921 1922 return sha256(seed.encode("utf-8")).digest()[:16].hex()
Create a unique trace ID for use with Langfuse.
This method generates a unique trace ID for use with various Langfuse APIs. It can either generate a random ID or create a deterministic ID based on a seed string.
Trace IDs must be 32 lowercase hexadecimal characters, representing 16 bytes. This method ensures the generated ID meets this requirement. If you need to correlate an external ID with a Langfuse trace ID, use the external ID as the seed to get a valid, deterministic Langfuse trace ID.
Arguments:
- seed: Optional string to use as a seed for deterministic ID generation. If provided, the same seed will always produce the same ID. If not provided, a random ID will be generated.
Returns:
A 32-character lowercase hexadecimal string representing the Langfuse trace ID.
Example:
# Generate a random trace ID trace_id = langfuse.create_trace_id() # Generate a deterministic ID based on a seed session_trace_id = langfuse.create_trace_id(seed="session-456") # Correlate an external ID with a Langfuse trace ID external_id = "external-system-123456" correlated_trace_id = langfuse.create_trace_id(seed=external_id) # Use the ID with trace context with langfuse.start_as_current_span( name="process-request", trace_context={"trace_id": trace_id} ) as span: # Operation will be part of the specific trace pass
1998 def create_score( 1999 self, 2000 *, 2001 name: str, 2002 value: Union[float, str], 2003 session_id: Optional[str] = None, 2004 dataset_run_id: Optional[str] = None, 2005 trace_id: Optional[str] = None, 2006 observation_id: Optional[str] = None, 2007 score_id: Optional[str] = None, 2008 data_type: Optional[ScoreDataType] = None, 2009 comment: Optional[str] = None, 2010 config_id: Optional[str] = None, 2011 metadata: Optional[Any] = None, 2012 ) -> None: 2013 """Create a score for a specific trace or observation. 2014 2015 This method creates a score for evaluating a Langfuse trace or observation. Scores can be 2016 used to track quality metrics, user feedback, or automated evaluations. 2017 2018 Args: 2019 name: Name of the score (e.g., "relevance", "accuracy") 2020 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2021 session_id: ID of the Langfuse session to associate the score with 2022 dataset_run_id: ID of the Langfuse dataset run to associate the score with 2023 trace_id: ID of the Langfuse trace to associate the score with 2024 observation_id: Optional ID of the specific observation to score. Trace ID must be provided too. 2025 score_id: Optional custom ID for the score (auto-generated if not provided) 2026 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2027 comment: Optional comment or explanation for the score 2028 config_id: Optional ID of a score config defined in Langfuse 2029 metadata: Optional metadata to be attached to the score 2030 2031 Example: 2032 ```python 2033 # Create a numeric score for accuracy 2034 langfuse.create_score( 2035 name="accuracy", 2036 value=0.92, 2037 trace_id="abcdef1234567890abcdef1234567890", 2038 data_type="NUMERIC", 2039 comment="High accuracy with minor irrelevant details" 2040 ) 2041 2042 # Create a categorical score for sentiment 2043 langfuse.create_score( 2044 name="sentiment", 2045 value="positive", 2046 trace_id="abcdef1234567890abcdef1234567890", 2047 observation_id="abcdef1234567890", 2048 data_type="CATEGORICAL" 2049 ) 2050 ``` 2051 """ 2052 if not self._tracing_enabled: 2053 return 2054 2055 score_id = score_id or self._create_observation_id() 2056 2057 try: 2058 new_body = ScoreBody( 2059 id=score_id, 2060 sessionId=session_id, 2061 datasetRunId=dataset_run_id, 2062 traceId=trace_id, 2063 observationId=observation_id, 2064 name=name, 2065 value=value, 2066 dataType=data_type, # type: ignore 2067 comment=comment, 2068 configId=config_id, 2069 environment=self._environment, 2070 metadata=metadata, 2071 ) 2072 2073 event = { 2074 "id": self.create_trace_id(), 2075 "type": "score-create", 2076 "timestamp": _get_timestamp(), 2077 "body": new_body, 2078 } 2079 2080 if self._resources is not None: 2081 # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar 2082 force_sample = ( 2083 not self._is_valid_trace_id(trace_id) if trace_id else True 2084 ) 2085 2086 self._resources.add_score_task( 2087 event, 2088 force_sample=force_sample, 2089 ) 2090 2091 except Exception as e: 2092 langfuse_logger.exception( 2093 f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}" 2094 )
Create a score for a specific trace or observation.
This method creates a score for evaluating a Langfuse trace or observation. Scores can be used to track quality metrics, user feedback, or automated evaluations.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- session_id: ID of the Langfuse session to associate the score with
- dataset_run_id: ID of the Langfuse dataset run to associate the score with
- trace_id: ID of the Langfuse trace to associate the score with
- observation_id: Optional ID of the specific observation to score. Trace ID must be provided too.
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
- metadata: Optional metadata to be attached to the score
Example:
# Create a numeric score for accuracy langfuse.create_score( name="accuracy", value=0.92, trace_id="abcdef1234567890abcdef1234567890", data_type="NUMERIC", comment="High accuracy with minor irrelevant details" ) # Create a categorical score for sentiment langfuse.create_score( name="sentiment", value="positive", trace_id="abcdef1234567890abcdef1234567890", observation_id="abcdef1234567890", data_type="CATEGORICAL" )
2120 def score_current_span( 2121 self, 2122 *, 2123 name: str, 2124 value: Union[float, str], 2125 score_id: Optional[str] = None, 2126 data_type: Optional[ScoreDataType] = None, 2127 comment: Optional[str] = None, 2128 config_id: Optional[str] = None, 2129 ) -> None: 2130 """Create a score for the current active span. 2131 2132 This method scores the currently active span in the context. It's a convenient 2133 way to score the current operation without needing to know its trace and span IDs. 2134 2135 Args: 2136 name: Name of the score (e.g., "relevance", "accuracy") 2137 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2138 score_id: Optional custom ID for the score (auto-generated if not provided) 2139 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2140 comment: Optional comment or explanation for the score 2141 config_id: Optional ID of a score config defined in Langfuse 2142 2143 Example: 2144 ```python 2145 with langfuse.start_as_current_generation(name="answer-query") as generation: 2146 # Generate answer 2147 response = generate_answer(...) 2148 generation.update(output=response) 2149 2150 # Score the generation 2151 langfuse.score_current_span( 2152 name="relevance", 2153 value=0.85, 2154 data_type="NUMERIC", 2155 comment="Mostly relevant but contains some tangential information" 2156 ) 2157 ``` 2158 """ 2159 current_span = self._get_current_otel_span() 2160 2161 if current_span is not None: 2162 trace_id = self._get_otel_trace_id(current_span) 2163 observation_id = self._get_otel_span_id(current_span) 2164 2165 langfuse_logger.info( 2166 f"Score: Creating score name='{name}' value={value} for current span ({observation_id}) in trace {trace_id}" 2167 ) 2168 2169 self.create_score( 2170 trace_id=trace_id, 2171 observation_id=observation_id, 2172 name=name, 2173 value=cast(str, value), 2174 score_id=score_id, 2175 data_type=cast(Literal["CATEGORICAL"], data_type), 2176 comment=comment, 2177 config_id=config_id, 2178 )
Create a score for the current active span.
This method scores the currently active span in the context. It's a convenient way to score the current operation without needing to know its trace and span IDs.
Arguments:
- name: Name of the score (e.g., "relevance", "accuracy")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_generation(name="answer-query") as generation: # Generate answer response = generate_answer(...) generation.update(output=response) # Score the generation langfuse.score_current_span( name="relevance", value=0.85, data_type="NUMERIC", comment="Mostly relevant but contains some tangential information" )
2204 def score_current_trace( 2205 self, 2206 *, 2207 name: str, 2208 value: Union[float, str], 2209 score_id: Optional[str] = None, 2210 data_type: Optional[ScoreDataType] = None, 2211 comment: Optional[str] = None, 2212 config_id: Optional[str] = None, 2213 ) -> None: 2214 """Create a score for the current trace. 2215 2216 This method scores the trace of the currently active span. Unlike score_current_span, 2217 this method associates the score with the entire trace rather than a specific span. 2218 It's useful for scoring overall performance or quality of the entire operation. 2219 2220 Args: 2221 name: Name of the score (e.g., "user_satisfaction", "overall_quality") 2222 value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL) 2223 score_id: Optional custom ID for the score (auto-generated if not provided) 2224 data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL) 2225 comment: Optional comment or explanation for the score 2226 config_id: Optional ID of a score config defined in Langfuse 2227 2228 Example: 2229 ```python 2230 with langfuse.start_as_current_span(name="process-user-request") as span: 2231 # Process request 2232 result = process_complete_request() 2233 span.update(output=result) 2234 2235 # Score the overall trace 2236 langfuse.score_current_trace( 2237 name="overall_quality", 2238 value=0.95, 2239 data_type="NUMERIC", 2240 comment="High quality end-to-end response" 2241 ) 2242 ``` 2243 """ 2244 current_span = self._get_current_otel_span() 2245 2246 if current_span is not None: 2247 trace_id = self._get_otel_trace_id(current_span) 2248 2249 langfuse_logger.info( 2250 f"Score: Creating score name='{name}' value={value} for entire trace {trace_id}" 2251 ) 2252 2253 self.create_score( 2254 trace_id=trace_id, 2255 name=name, 2256 value=cast(str, value), 2257 score_id=score_id, 2258 data_type=cast(Literal["CATEGORICAL"], data_type), 2259 comment=comment, 2260 config_id=config_id, 2261 )
Create a score for the current trace.
This method scores the trace of the currently active span. Unlike score_current_span, this method associates the score with the entire trace rather than a specific span. It's useful for scoring overall performance or quality of the entire operation.
Arguments:
- name: Name of the score (e.g., "user_satisfaction", "overall_quality")
- value: Score value (can be numeric for NUMERIC/BOOLEAN types or string for CATEGORICAL)
- score_id: Optional custom ID for the score (auto-generated if not provided)
- data_type: Type of score (NUMERIC, BOOLEAN, or CATEGORICAL)
- comment: Optional comment or explanation for the score
- config_id: Optional ID of a score config defined in Langfuse
Example:
with langfuse.start_as_current_span(name="process-user-request") as span: # Process request result = process_complete_request() span.update(output=result) # Score the overall trace langfuse.score_current_trace( name="overall_quality", value=0.95, data_type="NUMERIC", comment="High quality end-to-end response" )
2263 def flush(self) -> None: 2264 """Force flush all pending spans and events to the Langfuse API. 2265 2266 This method manually flushes any pending spans, scores, and other events to the 2267 Langfuse API. It's useful in scenarios where you want to ensure all data is sent 2268 before proceeding, without waiting for the automatic flush interval. 2269 2270 Example: 2271 ```python 2272 # Record some spans and scores 2273 with langfuse.start_as_current_span(name="operation") as span: 2274 # Do work... 2275 pass 2276 2277 # Ensure all data is sent to Langfuse before proceeding 2278 langfuse.flush() 2279 2280 # Continue with other work 2281 ``` 2282 """ 2283 if self._resources is not None: 2284 self._resources.flush()
Force flush all pending spans and events to the Langfuse API.
This method manually flushes any pending spans, scores, and other events to the Langfuse API. It's useful in scenarios where you want to ensure all data is sent before proceeding, without waiting for the automatic flush interval.
Example:
# Record some spans and scores with langfuse.start_as_current_span(name="operation") as span: # Do work... pass # Ensure all data is sent to Langfuse before proceeding langfuse.flush() # Continue with other work
2286 def shutdown(self) -> None: 2287 """Shut down the Langfuse client and flush all pending data. 2288 2289 This method cleanly shuts down the Langfuse client, ensuring all pending data 2290 is flushed to the API and all background threads are properly terminated. 2291 2292 It's important to call this method when your application is shutting down to 2293 prevent data loss and resource leaks. For most applications, using the client 2294 as a context manager or relying on the automatic shutdown via atexit is sufficient. 2295 2296 Example: 2297 ```python 2298 # Initialize Langfuse 2299 langfuse = Langfuse(public_key="...", secret_key="...") 2300 2301 # Use Langfuse throughout your application 2302 # ... 2303 2304 # When application is shutting down 2305 langfuse.shutdown() 2306 ``` 2307 """ 2308 if self._resources is not None: 2309 self._resources.shutdown()
Shut down the Langfuse client and flush all pending data.
This method cleanly shuts down the Langfuse client, ensuring all pending data is flushed to the API and all background threads are properly terminated.
It's important to call this method when your application is shutting down to prevent data loss and resource leaks. For most applications, using the client as a context manager or relying on the automatic shutdown via atexit is sufficient.
Example:
# Initialize Langfuse langfuse = Langfuse(public_key="...", secret_key="...") # Use Langfuse throughout your application # ... # When application is shutting down langfuse.shutdown()
2311 def get_current_trace_id(self) -> Optional[str]: 2312 """Get the trace ID of the current active span. 2313 2314 This method retrieves the trace ID from the currently active span in the context. 2315 It can be used to get the trace ID for referencing in logs, external systems, 2316 or for creating related operations. 2317 2318 Returns: 2319 The current trace ID as a 32-character lowercase hexadecimal string, 2320 or None if there is no active span. 2321 2322 Example: 2323 ```python 2324 with langfuse.start_as_current_span(name="process-request") as span: 2325 # Get the current trace ID for reference 2326 trace_id = langfuse.get_current_trace_id() 2327 2328 # Use it for external correlation 2329 log.info(f"Processing request with trace_id: {trace_id}") 2330 2331 # Or pass to another system 2332 external_system.process(data, trace_id=trace_id) 2333 ``` 2334 """ 2335 if not self._tracing_enabled: 2336 langfuse_logger.debug( 2337 "Operation skipped: get_current_trace_id - Tracing is disabled or client is in no-op mode." 2338 ) 2339 return None 2340 2341 current_otel_span = self._get_current_otel_span() 2342 2343 return self._get_otel_trace_id(current_otel_span) if current_otel_span else None
Get the trace ID of the current active span.
This method retrieves the trace ID from the currently active span in the context. It can be used to get the trace ID for referencing in logs, external systems, or for creating related operations.
Returns:
The current trace ID as a 32-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Get the current trace ID for reference trace_id = langfuse.get_current_trace_id() # Use it for external correlation log.info(f"Processing request with trace_id: {trace_id}") # Or pass to another system external_system.process(data, trace_id=trace_id)
2345 def get_current_observation_id(self) -> Optional[str]: 2346 """Get the observation ID (span ID) of the current active span. 2347 2348 This method retrieves the observation ID from the currently active span in the context. 2349 It can be used to get the observation ID for referencing in logs, external systems, 2350 or for creating scores or other related operations. 2351 2352 Returns: 2353 The current observation ID as a 16-character lowercase hexadecimal string, 2354 or None if there is no active span. 2355 2356 Example: 2357 ```python 2358 with langfuse.start_as_current_span(name="process-user-query") as span: 2359 # Get the current observation ID 2360 observation_id = langfuse.get_current_observation_id() 2361 2362 # Store it for later reference 2363 cache.set(f"query_{query_id}_observation", observation_id) 2364 2365 # Process the query... 2366 ``` 2367 """ 2368 if not self._tracing_enabled: 2369 langfuse_logger.debug( 2370 "Operation skipped: get_current_observation_id - Tracing is disabled or client is in no-op mode." 2371 ) 2372 return None 2373 2374 current_otel_span = self._get_current_otel_span() 2375 2376 return self._get_otel_span_id(current_otel_span) if current_otel_span else None
Get the observation ID (span ID) of the current active span.
This method retrieves the observation ID from the currently active span in the context. It can be used to get the observation ID for referencing in logs, external systems, or for creating scores or other related operations.
Returns:
The current observation ID as a 16-character lowercase hexadecimal string, or None if there is no active span.
Example:
with langfuse.start_as_current_span(name="process-user-query") as span: # Get the current observation ID observation_id = langfuse.get_current_observation_id() # Store it for later reference cache.set(f"query_{query_id}_observation", observation_id) # Process the query...
2389 def get_trace_url(self, *, trace_id: Optional[str] = None) -> Optional[str]: 2390 """Get the URL to view a trace in the Langfuse UI. 2391 2392 This method generates a URL that links directly to a trace in the Langfuse UI. 2393 It's useful for providing links in logs, notifications, or debugging tools. 2394 2395 Args: 2396 trace_id: Optional trace ID to generate a URL for. If not provided, 2397 the trace ID of the current active span will be used. 2398 2399 Returns: 2400 A URL string pointing to the trace in the Langfuse UI, 2401 or None if the project ID couldn't be retrieved or no trace ID is available. 2402 2403 Example: 2404 ```python 2405 # Get URL for the current trace 2406 with langfuse.start_as_current_span(name="process-request") as span: 2407 trace_url = langfuse.get_trace_url() 2408 log.info(f"Processing trace: {trace_url}") 2409 2410 # Get URL for a specific trace 2411 specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") 2412 send_notification(f"Review needed for trace: {specific_trace_url}") 2413 ``` 2414 """ 2415 project_id = self._get_project_id() 2416 final_trace_id = trace_id or self.get_current_trace_id() 2417 2418 return ( 2419 f"{self._host}/project/{project_id}/traces/{final_trace_id}" 2420 if project_id and final_trace_id 2421 else None 2422 )
Get the URL to view a trace in the Langfuse UI.
This method generates a URL that links directly to a trace in the Langfuse UI. It's useful for providing links in logs, notifications, or debugging tools.
Arguments:
- trace_id: Optional trace ID to generate a URL for. If not provided, the trace ID of the current active span will be used.
Returns:
A URL string pointing to the trace in the Langfuse UI, or None if the project ID couldn't be retrieved or no trace ID is available.
Example:
# Get URL for the current trace with langfuse.start_as_current_span(name="process-request") as span: trace_url = langfuse.get_trace_url() log.info(f"Processing trace: {trace_url}") # Get URL for a specific trace specific_trace_url = langfuse.get_trace_url(trace_id="1234567890abcdef1234567890abcdef") send_notification(f"Review needed for trace: {specific_trace_url}")
2424 def get_dataset( 2425 self, name: str, *, fetch_items_page_size: Optional[int] = 50 2426 ) -> "DatasetClient": 2427 """Fetch a dataset by its name. 2428 2429 Args: 2430 name (str): The name of the dataset to fetch. 2431 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 2432 2433 Returns: 2434 DatasetClient: The dataset with the given name. 2435 """ 2436 try: 2437 langfuse_logger.debug(f"Getting datasets {name}") 2438 dataset = self.api.datasets.get(dataset_name=name) 2439 2440 dataset_items = [] 2441 page = 1 2442 2443 while True: 2444 new_items = self.api.dataset_items.list( 2445 dataset_name=self._url_encode(name, is_url_param=True), 2446 page=page, 2447 limit=fetch_items_page_size, 2448 ) 2449 dataset_items.extend(new_items.data) 2450 2451 if new_items.meta.total_pages <= page: 2452 break 2453 2454 page += 1 2455 2456 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 2457 2458 return DatasetClient(dataset, items=items) 2459 2460 except Error as e: 2461 handle_fern_exception(e) 2462 raise e
Fetch a dataset by its name.
Arguments:
- name (str): The name of the dataset to fetch.
- fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
Returns:
DatasetClient: The dataset with the given name.
2464 def run_experiment( 2465 self, 2466 *, 2467 name: str, 2468 run_name: Optional[str] = None, 2469 description: Optional[str] = None, 2470 data: ExperimentData, 2471 task: TaskFunction, 2472 evaluators: List[EvaluatorFunction] = [], 2473 run_evaluators: List[RunEvaluatorFunction] = [], 2474 max_concurrency: int = 50, 2475 metadata: Optional[Dict[str, Any]] = None, 2476 ) -> ExperimentResult: 2477 """Run an experiment on a dataset with automatic tracing and evaluation. 2478 2479 This method executes a task function on each item in the provided dataset, 2480 automatically traces all executions with Langfuse for observability, runs 2481 item-level and run-level evaluators on the outputs, and returns comprehensive 2482 results with evaluation metrics. 2483 2484 The experiment system provides: 2485 - Automatic tracing of all task executions 2486 - Concurrent processing with configurable limits 2487 - Comprehensive error handling that isolates failures 2488 - Integration with Langfuse datasets for experiment tracking 2489 - Flexible evaluation framework supporting both sync and async evaluators 2490 2491 Args: 2492 name: Human-readable name for the experiment. Used for identification 2493 in the Langfuse UI. 2494 run_name: Optional exact name for the experiment run. If provided, this will be 2495 used as the exact dataset run name if the `data` contains Langfuse dataset items. 2496 If not provided, this will default to the experiment name appended with an ISO timestamp. 2497 description: Optional description explaining the experiment's purpose, 2498 methodology, or expected outcomes. 2499 data: Array of data items to process. Can be either: 2500 - List of dict-like items with 'input', 'expected_output', 'metadata' keys 2501 - List of Langfuse DatasetItem objects from dataset.items 2502 task: Function that processes each data item and returns output. 2503 Must accept 'item' as keyword argument and can return sync or async results. 2504 The task function signature should be: task(*, item, **kwargs) -> Any 2505 evaluators: List of functions to evaluate each item's output individually. 2506 Each evaluator receives input, output, expected_output, and metadata. 2507 Can return single Evaluation dict or list of Evaluation dicts. 2508 run_evaluators: List of functions to evaluate the entire experiment run. 2509 Each run evaluator receives all item_results and can compute aggregate metrics. 2510 Useful for calculating averages, distributions, or cross-item comparisons. 2511 max_concurrency: Maximum number of concurrent task executions (default: 50). 2512 Controls the number of items processed simultaneously. Adjust based on 2513 API rate limits and system resources. 2514 metadata: Optional metadata dictionary to attach to all experiment traces. 2515 This metadata will be included in every trace created during the experiment. 2516 If `data` are Langfuse dataset items, the metadata will be attached to the dataset run, too. 2517 2518 Returns: 2519 ExperimentResult containing: 2520 - run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset. 2521 - item_results: List of results for each processed item with outputs and evaluations 2522 - run_evaluations: List of aggregate evaluation results for the entire run 2523 - dataset_run_id: ID of the dataset run (if using Langfuse datasets) 2524 - dataset_run_url: Direct URL to view results in Langfuse UI (if applicable) 2525 2526 Raises: 2527 ValueError: If required parameters are missing or invalid 2528 Exception: If experiment setup fails (individual item failures are handled gracefully) 2529 2530 Examples: 2531 Basic experiment with local data: 2532 ```python 2533 def summarize_text(*, item, **kwargs): 2534 return f"Summary: {item['input'][:50]}..." 2535 2536 def length_evaluator(*, input, output, expected_output=None, **kwargs): 2537 return { 2538 "name": "output_length", 2539 "value": len(output), 2540 "comment": f"Output contains {len(output)} characters" 2541 } 2542 2543 result = langfuse.run_experiment( 2544 name="Text Summarization Test", 2545 description="Evaluate summarization quality and length", 2546 data=[ 2547 {"input": "Long article text...", "expected_output": "Expected summary"}, 2548 {"input": "Another article...", "expected_output": "Another summary"} 2549 ], 2550 task=summarize_text, 2551 evaluators=[length_evaluator] 2552 ) 2553 2554 print(f"Processed {len(result.item_results)} items") 2555 for item_result in result.item_results: 2556 print(f"Input: {item_result.item['input']}") 2557 print(f"Output: {item_result.output}") 2558 print(f"Evaluations: {item_result.evaluations}") 2559 ``` 2560 2561 Advanced experiment with async task and multiple evaluators: 2562 ```python 2563 async def llm_task(*, item, **kwargs): 2564 # Simulate async LLM call 2565 response = await openai_client.chat.completions.create( 2566 model="gpt-4", 2567 messages=[{"role": "user", "content": item["input"]}] 2568 ) 2569 return response.choices[0].message.content 2570 2571 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 2572 if expected_output and expected_output.lower() in output.lower(): 2573 return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} 2574 return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} 2575 2576 def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): 2577 # Simulate toxicity check 2578 toxicity_score = check_toxicity(output) # Your toxicity checker 2579 return { 2580 "name": "toxicity", 2581 "value": toxicity_score, 2582 "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" 2583 } 2584 2585 def average_accuracy(*, item_results, **kwargs): 2586 accuracies = [ 2587 eval.value for result in item_results 2588 for eval in result.evaluations 2589 if eval.name == "accuracy" 2590 ] 2591 return { 2592 "name": "average_accuracy", 2593 "value": sum(accuracies) / len(accuracies) if accuracies else 0, 2594 "comment": f"Average accuracy across {len(accuracies)} items" 2595 } 2596 2597 result = langfuse.run_experiment( 2598 name="LLM Safety and Accuracy Test", 2599 description="Evaluate model accuracy and safety across diverse prompts", 2600 data=test_dataset, # Your dataset items 2601 task=llm_task, 2602 evaluators=[accuracy_evaluator, toxicity_evaluator], 2603 run_evaluators=[average_accuracy], 2604 max_concurrency=5, # Limit concurrent API calls 2605 metadata={"model": "gpt-4", "temperature": 0.7} 2606 ) 2607 ``` 2608 2609 Using with Langfuse datasets: 2610 ```python 2611 # Get dataset from Langfuse 2612 dataset = langfuse.get_dataset("my-eval-dataset") 2613 2614 result = dataset.run_experiment( 2615 name="Production Model Evaluation", 2616 description="Monthly evaluation of production model performance", 2617 task=my_production_task, 2618 evaluators=[accuracy_evaluator, latency_evaluator] 2619 ) 2620 2621 # Results automatically linked to dataset in Langfuse UI 2622 print(f"View results: {result['dataset_run_url']}") 2623 ``` 2624 2625 Note: 2626 - Task and evaluator functions can be either synchronous or asynchronous 2627 - Individual item failures are logged but don't stop the experiment 2628 - All executions are automatically traced and visible in Langfuse UI 2629 - When using Langfuse datasets, results are automatically linked for easy comparison 2630 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.) 2631 - Async execution is handled automatically with smart event loop detection 2632 """ 2633 return cast( 2634 ExperimentResult, 2635 run_async_safely( 2636 self._run_experiment_async( 2637 name=name, 2638 run_name=self._create_experiment_run_name( 2639 name=name, run_name=run_name 2640 ), 2641 description=description, 2642 data=data, 2643 task=task, 2644 evaluators=evaluators or [], 2645 run_evaluators=run_evaluators or [], 2646 max_concurrency=max_concurrency, 2647 metadata=metadata or {}, 2648 ), 2649 ), 2650 )
Run an experiment on a dataset with automatic tracing and evaluation.
This method executes a task function on each item in the provided dataset, automatically traces all executions with Langfuse for observability, runs item-level and run-level evaluators on the outputs, and returns comprehensive results with evaluation metrics.
The experiment system provides:
- Automatic tracing of all task executions
- Concurrent processing with configurable limits
- Comprehensive error handling that isolates failures
- Integration with Langfuse datasets for experiment tracking
- Flexible evaluation framework supporting both sync and async evaluators
Arguments:
- name: Human-readable name for the experiment. Used for identification in the Langfuse UI.
- run_name: Optional exact name for the experiment run. If provided, this will be
used as the exact dataset run name if the
data
contains Langfuse dataset items. If not provided, this will default to the experiment name appended with an ISO timestamp. - description: Optional description explaining the experiment's purpose, methodology, or expected outcomes.
- data: Array of data items to process. Can be either:
- List of dict-like items with 'input', 'expected_output', 'metadata' keys
- List of Langfuse DatasetItem objects from dataset.items
- task: Function that processes each data item and returns output. Must accept 'item' as keyword argument and can return sync or async results. The task function signature should be: task(, item, *kwargs) -> Any
- evaluators: List of functions to evaluate each item's output individually. Each evaluator receives input, output, expected_output, and metadata. Can return single Evaluation dict or list of Evaluation dicts.
- run_evaluators: List of functions to evaluate the entire experiment run. Each run evaluator receives all item_results and can compute aggregate metrics. Useful for calculating averages, distributions, or cross-item comparisons.
- max_concurrency: Maximum number of concurrent task executions (default: 50). Controls the number of items processed simultaneously. Adjust based on API rate limits and system resources.
- metadata: Optional metadata dictionary to attach to all experiment traces.
This metadata will be included in every trace created during the experiment.
If
data
are Langfuse dataset items, the metadata will be attached to the dataset run, too.
Returns:
ExperimentResult containing:
- run_name: The experiment run name. This is equal to the dataset run name if experiment was on Langfuse dataset.
- item_results: List of results for each processed item with outputs and evaluations
- run_evaluations: List of aggregate evaluation results for the entire run
- dataset_run_id: ID of the dataset run (if using Langfuse datasets)
- dataset_run_url: Direct URL to view results in Langfuse UI (if applicable)
Raises:
- ValueError: If required parameters are missing or invalid
- Exception: If experiment setup fails (individual item failures are handled gracefully)
Examples:
Basic experiment with local data:
def summarize_text(*, item, **kwargs): return f"Summary: {item['input'][:50]}..." def length_evaluator(*, input, output, expected_output=None, **kwargs): return { "name": "output_length", "value": len(output), "comment": f"Output contains {len(output)} characters" } result = langfuse.run_experiment( name="Text Summarization Test", description="Evaluate summarization quality and length", data=[ {"input": "Long article text...", "expected_output": "Expected summary"}, {"input": "Another article...", "expected_output": "Another summary"} ], task=summarize_text, evaluators=[length_evaluator] ) print(f"Processed {len(result.item_results)} items") for item_result in result.item_results: print(f"Input: {item_result.item['input']}") print(f"Output: {item_result.output}") print(f"Evaluations: {item_result.evaluations}")
Advanced experiment with async task and multiple evaluators:
async def llm_task(*, item, **kwargs): # Simulate async LLM call response = await openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": item["input"]}] ) return response.choices[0].message.content def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if expected_output and expected_output.lower() in output.lower(): return {"name": "accuracy", "value": 1.0, "comment": "Correct answer"} return {"name": "accuracy", "value": 0.0, "comment": "Incorrect answer"} def toxicity_evaluator(*, input, output, expected_output=None, **kwargs): # Simulate toxicity check toxicity_score = check_toxicity(output) # Your toxicity checker return { "name": "toxicity", "value": toxicity_score, "comment": f"Toxicity level: {'high' if toxicity_score > 0.7 else 'low'}" } def average_accuracy(*, item_results, **kwargs): accuracies = [ eval.value for result in item_results for eval in result.evaluations if eval.name == "accuracy" ] return { "name": "average_accuracy", "value": sum(accuracies) / len(accuracies) if accuracies else 0, "comment": f"Average accuracy across {len(accuracies)} items" } result = langfuse.run_experiment( name="LLM Safety and Accuracy Test", description="Evaluate model accuracy and safety across diverse prompts", data=test_dataset, # Your dataset items task=llm_task, evaluators=[accuracy_evaluator, toxicity_evaluator], run_evaluators=[average_accuracy], max_concurrency=5, # Limit concurrent API calls metadata={"model": "gpt-4", "temperature": 0.7} )
Using with Langfuse datasets:
# Get dataset from Langfuse dataset = langfuse.get_dataset("my-eval-dataset") result = dataset.run_experiment( name="Production Model Evaluation", description="Monthly evaluation of production model performance", task=my_production_task, evaluators=[accuracy_evaluator, latency_evaluator] ) # Results automatically linked to dataset in Langfuse UI print(f"View results: {result['dataset_run_url']}")
Note:
- Task and evaluator functions can be either synchronous or asynchronous
- Individual item failures are logged but don't stop the experiment
- All executions are automatically traced and visible in Langfuse UI
- When using Langfuse datasets, results are automatically linked for easy comparison
- This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
- Async execution is handled automatically with smart event loop detection
2892 def auth_check(self) -> bool: 2893 """Check if the provided credentials (public and secret key) are valid. 2894 2895 Raises: 2896 Exception: If no projects were found for the provided credentials. 2897 2898 Note: 2899 This method is blocking. It is discouraged to use it in production code. 2900 """ 2901 try: 2902 projects = self.api.projects.get() 2903 langfuse_logger.debug( 2904 f"Auth check successful, found {len(projects.data)} projects" 2905 ) 2906 if len(projects.data) == 0: 2907 raise Exception( 2908 "Auth check failed, no project found for the keys provided." 2909 ) 2910 return True 2911 2912 except AttributeError as e: 2913 langfuse_logger.warning( 2914 f"Auth check failed: Client not properly initialized. Error: {e}" 2915 ) 2916 return False 2917 2918 except Error as e: 2919 handle_fern_exception(e) 2920 raise e
Check if the provided credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
2922 def create_dataset( 2923 self, 2924 *, 2925 name: str, 2926 description: Optional[str] = None, 2927 metadata: Optional[Any] = None, 2928 ) -> Dataset: 2929 """Create a dataset with the given name on Langfuse. 2930 2931 Args: 2932 name: Name of the dataset to create. 2933 description: Description of the dataset. Defaults to None. 2934 metadata: Additional metadata. Defaults to None. 2935 2936 Returns: 2937 Dataset: The created dataset as returned by the Langfuse API. 2938 """ 2939 try: 2940 body = CreateDatasetRequest( 2941 name=name, description=description, metadata=metadata 2942 ) 2943 langfuse_logger.debug(f"Creating datasets {body}") 2944 2945 return self.api.datasets.create(request=body) 2946 2947 except Error as e: 2948 handle_fern_exception(e) 2949 raise e
Create a dataset with the given name on Langfuse.
Arguments:
- name: Name of the dataset to create.
- description: Description of the dataset. Defaults to None.
- metadata: Additional metadata. Defaults to None.
Returns:
Dataset: The created dataset as returned by the Langfuse API.
2951 def create_dataset_item( 2952 self, 2953 *, 2954 dataset_name: str, 2955 input: Optional[Any] = None, 2956 expected_output: Optional[Any] = None, 2957 metadata: Optional[Any] = None, 2958 source_trace_id: Optional[str] = None, 2959 source_observation_id: Optional[str] = None, 2960 status: Optional[DatasetStatus] = None, 2961 id: Optional[str] = None, 2962 ) -> DatasetItem: 2963 """Create a dataset item. 2964 2965 Upserts if an item with id already exists. 2966 2967 Args: 2968 dataset_name: Name of the dataset in which the dataset item should be created. 2969 input: Input data. Defaults to None. Can contain any dict, list or scalar. 2970 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 2971 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 2972 source_trace_id: Id of the source trace. Defaults to None. 2973 source_observation_id: Id of the source observation. Defaults to None. 2974 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 2975 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 2976 2977 Returns: 2978 DatasetItem: The created dataset item as returned by the Langfuse API. 2979 2980 Example: 2981 ```python 2982 from langfuse import Langfuse 2983 2984 langfuse = Langfuse() 2985 2986 # Uploading items to the Langfuse dataset named "capital_cities" 2987 langfuse.create_dataset_item( 2988 dataset_name="capital_cities", 2989 input={"input": {"country": "Italy"}}, 2990 expected_output={"expected_output": "Rome"}, 2991 metadata={"foo": "bar"} 2992 ) 2993 ``` 2994 """ 2995 try: 2996 body = CreateDatasetItemRequest( 2997 datasetName=dataset_name, 2998 input=input, 2999 expectedOutput=expected_output, 3000 metadata=metadata, 3001 sourceTraceId=source_trace_id, 3002 sourceObservationId=source_observation_id, 3003 status=status, 3004 id=id, 3005 ) 3006 langfuse_logger.debug(f"Creating dataset item {body}") 3007 return self.api.dataset_items.create(request=body) 3008 except Error as e: 3009 handle_fern_exception(e) 3010 raise e
Create a dataset item.
Upserts if an item with id already exists.
Arguments:
- dataset_name: Name of the dataset in which the dataset item should be created.
- input: Input data. Defaults to None. Can contain any dict, list or scalar.
- expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
- metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
- source_trace_id: Id of the source trace. Defaults to None.
- source_observation_id: Id of the source observation. Defaults to None.
- status: Status of the dataset item. Defaults to ACTIVE for newly created items.
- id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
Returns:
DatasetItem: The created dataset item as returned by the Langfuse API.
Example:
from langfuse import Langfuse langfuse = Langfuse() # Uploading items to the Langfuse dataset named "capital_cities" langfuse.create_dataset_item( dataset_name="capital_cities", input={"input": {"country": "Italy"}}, expected_output={"expected_output": "Rome"}, metadata={"foo": "bar"} )
3012 def resolve_media_references( 3013 self, 3014 *, 3015 obj: Any, 3016 resolve_with: Literal["base64_data_uri"], 3017 max_depth: int = 10, 3018 content_fetch_timeout_seconds: int = 5, 3019 ) -> Any: 3020 """Replace media reference strings in an object with base64 data URIs. 3021 3022 This method recursively traverses an object (up to max_depth) looking for media reference strings 3023 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 3024 the provided Langfuse client and replaces the reference string with a base64 data URI. 3025 3026 If fetching media content fails for a reference string, a warning is logged and the reference 3027 string is left unchanged. 3028 3029 Args: 3030 obj: The object to process. Can be a primitive value, array, or nested object. 3031 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 3032 resolve_with: The representation of the media content to replace the media reference string with. 3033 Currently only "base64_data_uri" is supported. 3034 max_depth: int: The maximum depth to traverse the object. Default is 10. 3035 content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5. 3036 3037 Returns: 3038 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 3039 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 3040 3041 Example: 3042 obj = { 3043 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 3044 "nested": { 3045 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 3046 } 3047 } 3048 3049 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 3050 3051 # Result: 3052 # { 3053 # "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", 3054 # "nested": { 3055 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 3056 # } 3057 # } 3058 """ 3059 return LangfuseMedia.resolve_media_references( 3060 langfuse_client=self, 3061 obj=obj, 3062 resolve_with=resolve_with, 3063 max_depth=max_depth, 3064 content_fetch_timeout_seconds=content_fetch_timeout_seconds, 3065 )
Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
Arguments:
- obj: The object to process. Can be a primitive value, array, or nested object. If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
- resolve_with: The representation of the media content to replace the media reference string with. Currently only "base64_data_uri" is supported.
- max_depth: int: The maximum depth to traverse the object. Default is 10.
- content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 5.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible. If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = { "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", "nested": { "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" } }
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
Result:
{
"image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...",
"nested": {
"pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
}
}
3095 def get_prompt( 3096 self, 3097 name: str, 3098 *, 3099 version: Optional[int] = None, 3100 label: Optional[str] = None, 3101 type: Literal["chat", "text"] = "text", 3102 cache_ttl_seconds: Optional[int] = None, 3103 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 3104 max_retries: Optional[int] = None, 3105 fetch_timeout_seconds: Optional[int] = None, 3106 ) -> PromptClient: 3107 """Get a prompt. 3108 3109 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 3110 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 3111 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 3112 return the expired prompt as a fallback. 3113 3114 Args: 3115 name (str): The name of the prompt to retrieve. 3116 3117 Keyword Args: 3118 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3119 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 3120 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 3121 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 3122 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 3123 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 3124 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 3125 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default. 3126 3127 Returns: 3128 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 3129 - TextPromptClient, if type argument is 'text'. 3130 - ChatPromptClient, if type argument is 'chat'. 3131 3132 Raises: 3133 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 3134 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 3135 """ 3136 if self._resources is None: 3137 raise Error( 3138 "SDK is not correctly initialized. Check the init logs for more details." 3139 ) 3140 if version is not None and label is not None: 3141 raise ValueError("Cannot specify both version and label at the same time.") 3142 3143 if not name: 3144 raise ValueError("Prompt name cannot be empty.") 3145 3146 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 3147 bounded_max_retries = self._get_bounded_max_retries( 3148 max_retries, default_max_retries=2, max_retries_upper_bound=4 3149 ) 3150 3151 langfuse_logger.debug(f"Getting prompt '{cache_key}'") 3152 cached_prompt = self._resources.prompt_cache.get(cache_key) 3153 3154 if cached_prompt is None or cache_ttl_seconds == 0: 3155 langfuse_logger.debug( 3156 f"Prompt '{cache_key}' not found in cache or caching disabled." 3157 ) 3158 try: 3159 return self._fetch_prompt_and_update_cache( 3160 name, 3161 version=version, 3162 label=label, 3163 ttl_seconds=cache_ttl_seconds, 3164 max_retries=bounded_max_retries, 3165 fetch_timeout_seconds=fetch_timeout_seconds, 3166 ) 3167 except Exception as e: 3168 if fallback: 3169 langfuse_logger.warning( 3170 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 3171 ) 3172 3173 fallback_client_args: Dict[str, Any] = { 3174 "name": name, 3175 "prompt": fallback, 3176 "type": type, 3177 "version": version or 0, 3178 "config": {}, 3179 "labels": [label] if label else [], 3180 "tags": [], 3181 } 3182 3183 if type == "text": 3184 return TextPromptClient( 3185 prompt=Prompt_Text(**fallback_client_args), 3186 is_fallback=True, 3187 ) 3188 3189 if type == "chat": 3190 return ChatPromptClient( 3191 prompt=Prompt_Chat(**fallback_client_args), 3192 is_fallback=True, 3193 ) 3194 3195 raise e 3196 3197 if cached_prompt.is_expired(): 3198 langfuse_logger.debug(f"Stale prompt '{cache_key}' found in cache.") 3199 try: 3200 # refresh prompt in background thread, refresh_prompt deduplicates tasks 3201 langfuse_logger.debug(f"Refreshing prompt '{cache_key}' in background.") 3202 3203 def refresh_task() -> None: 3204 self._fetch_prompt_and_update_cache( 3205 name, 3206 version=version, 3207 label=label, 3208 ttl_seconds=cache_ttl_seconds, 3209 max_retries=bounded_max_retries, 3210 fetch_timeout_seconds=fetch_timeout_seconds, 3211 ) 3212 3213 self._resources.prompt_cache.add_refresh_prompt_task( 3214 cache_key, 3215 refresh_task, 3216 ) 3217 langfuse_logger.debug( 3218 f"Returning stale prompt '{cache_key}' from cache." 3219 ) 3220 # return stale prompt 3221 return cached_prompt.value 3222 3223 except Exception as e: 3224 langfuse_logger.warning( 3225 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 3226 ) 3227 # creation of refresh prompt task failed, return stale prompt 3228 return cached_prompt.value 3229 3230 return cached_prompt.value
Get a prompt.
This method attempts to fetch the requested prompt from the local cache. If the prompt is not found in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will return the expired prompt as a fallback.
Arguments:
- name (str): The name of the prompt to retrieve.
Keyword Args:
version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the
production
label is returned. Specify either version or label, not both. label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, theproduction
label is returned. Specify either version or label, not both. cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 5 seconds per default.
Returns:
The prompt object retrieved from the cache or directly fetched if not cached or expired of type
- TextPromptClient, if type argument is 'text'.
- ChatPromptClient, if type argument is 'chat'.
Raises:
- Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
- expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
3324 def create_prompt( 3325 self, 3326 *, 3327 name: str, 3328 prompt: Union[ 3329 str, List[Union[ChatMessageDict, ChatMessageWithPlaceholdersDict]] 3330 ], 3331 labels: List[str] = [], 3332 tags: Optional[List[str]] = None, 3333 type: Optional[Literal["chat", "text"]] = "text", 3334 config: Optional[Any] = None, 3335 commit_message: Optional[str] = None, 3336 ) -> PromptClient: 3337 """Create a new prompt in Langfuse. 3338 3339 Keyword Args: 3340 name : The name of the prompt to be created. 3341 prompt : The content of the prompt to be created. 3342 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 3343 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 3344 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 3345 config: Additional structured data to be saved with the prompt. Defaults to None. 3346 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 3347 commit_message: Optional string describing the change. 3348 3349 Returns: 3350 TextPromptClient: The prompt if type argument is 'text'. 3351 ChatPromptClient: The prompt if type argument is 'chat'. 3352 """ 3353 try: 3354 langfuse_logger.debug(f"Creating prompt {name=}, {labels=}") 3355 3356 if type == "chat": 3357 if not isinstance(prompt, list): 3358 raise ValueError( 3359 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 3360 ) 3361 request: Union[CreatePromptRequest_Chat, CreatePromptRequest_Text] = ( 3362 CreatePromptRequest_Chat( 3363 name=name, 3364 prompt=cast(Any, prompt), 3365 labels=labels, 3366 tags=tags, 3367 config=config or {}, 3368 commitMessage=commit_message, 3369 type="chat", 3370 ) 3371 ) 3372 server_prompt = self.api.prompts.create(request=request) 3373 3374 if self._resources is not None: 3375 self._resources.prompt_cache.invalidate(name) 3376 3377 return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) 3378 3379 if not isinstance(prompt, str): 3380 raise ValueError("For 'text' type, 'prompt' must be a string.") 3381 3382 request = CreatePromptRequest_Text( 3383 name=name, 3384 prompt=prompt, 3385 labels=labels, 3386 tags=tags, 3387 config=config or {}, 3388 commitMessage=commit_message, 3389 type="text", 3390 ) 3391 3392 server_prompt = self.api.prompts.create(request=request) 3393 3394 if self._resources is not None: 3395 self._resources.prompt_cache.invalidate(name) 3396 3397 return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) 3398 3399 except Error as e: 3400 handle_fern_exception(e) 3401 raise e
Create a new prompt in Langfuse.
Keyword Args:
name : The name of the prompt to be created. prompt : The content of the prompt to be created. is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. config: Additional structured data to be saved with the prompt. Defaults to None. type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". commit_message: Optional string describing the change.
Returns:
TextPromptClient: The prompt if type argument is 'text'. ChatPromptClient: The prompt if type argument is 'chat'.
3403 def update_prompt( 3404 self, 3405 *, 3406 name: str, 3407 version: int, 3408 new_labels: List[str] = [], 3409 ) -> Any: 3410 """Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name. 3411 3412 Args: 3413 name (str): The name of the prompt to update. 3414 version (int): The version number of the prompt to update. 3415 new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to []. 3416 3417 Returns: 3418 Prompt: The updated prompt from the Langfuse API. 3419 3420 """ 3421 updated_prompt = self.api.prompt_version.update( 3422 name=self._url_encode(name), 3423 version=version, 3424 new_labels=new_labels, 3425 ) 3426 3427 if self._resources is not None: 3428 self._resources.prompt_cache.invalidate(name) 3429 3430 return updated_prompt
Update an existing prompt version in Langfuse. The Langfuse SDK prompt cache is invalidated for all prompts witht he specified name.
Arguments:
- name (str): The name of the prompt to update.
- version (int): The version number of the prompt to update.
- new_labels (List[str], optional): New labels to assign to the prompt version. Labels are unique across versions. The "latest" label is reserved and managed by Langfuse. Defaults to [].
Returns:
Prompt: The updated prompt from the Langfuse API.
3445 def clear_prompt_cache(self) -> None: 3446 """Clear the entire prompt cache, removing all cached prompts. 3447 3448 This method is useful when you want to force a complete refresh of all 3449 cached prompts, for example after major updates or when you need to 3450 ensure the latest versions are fetched from the server. 3451 """ 3452 if self._resources is not None: 3453 self._resources.prompt_cache.clear()
Clear the entire prompt cache, removing all cached prompts.
This method is useful when you want to force a complete refresh of all cached prompts, for example after major updates or when you need to ensure the latest versions are fetched from the server.
59def get_client(*, public_key: Optional[str] = None) -> Langfuse: 60 """Get or create a Langfuse client instance. 61 62 Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, 63 providing a public_key is required. Multi-project support is experimental - see Langfuse docs. 64 65 Behavior: 66 - Single project: Returns existing client or creates new one 67 - Multi-project: Requires public_key to return specific client 68 - No public_key in multi-project: Returns disabled client to prevent data leakage 69 70 The function uses a singleton pattern per public_key to conserve resources and maintain state. 71 72 Args: 73 public_key (Optional[str]): Project identifier 74 - With key: Returns client for that project 75 - Without key: Returns single client or disabled client if multiple exist 76 77 Returns: 78 Langfuse: Client instance in one of three states: 79 1. Client for specified public_key 80 2. Default client for single-project setup 81 3. Disabled client when multiple projects exist without key 82 83 Security: 84 Disables tracing when multiple projects exist without explicit key to prevent 85 cross-project data leakage. Multi-project setups are experimental. 86 87 Example: 88 ```python 89 # Single project 90 client = get_client() # Default client 91 92 # In multi-project usage: 93 client_a = get_client(public_key="project_a_key") # Returns project A's client 94 client_b = get_client(public_key="project_b_key") # Returns project B's client 95 96 # Without specific key in multi-project setup: 97 client = get_client() # Returns disabled client for safety 98 ``` 99 """ 100 with LangfuseResourceManager._lock: 101 active_instances = LangfuseResourceManager._instances 102 103 # If no explicit public_key provided, check execution context 104 if not public_key: 105 public_key = _current_public_key.get(None) 106 107 if not public_key: 108 if len(active_instances) == 0: 109 # No clients initialized yet, create default instance 110 return Langfuse() 111 112 if len(active_instances) == 1: 113 # Only one client exists, safe to use without specifying key 114 instance = list(active_instances.values())[0] 115 116 # Initialize with the credentials bound to the instance 117 # This is important if the original instance was instantiated 118 # via constructor arguments 119 return _create_client_from_instance(instance) 120 121 else: 122 # Multiple clients exist but no key specified - disable tracing 123 # to prevent cross-project data leakage 124 langfuse_logger.warning( 125 "No 'langfuse_public_key' passed to decorated function, but multiple langfuse clients are instantiated in current process. Skipping tracing for this function to avoid cross-project leakage." 126 ) 127 return Langfuse( 128 tracing_enabled=False, public_key="fake", secret_key="fake" 129 ) 130 131 else: 132 # Specific key provided, look up existing instance 133 target_instance: Optional[LangfuseResourceManager] = active_instances.get( 134 public_key, None 135 ) 136 137 if target_instance is None: 138 # No instance found with this key - client not initialized properly 139 langfuse_logger.warning( 140 f"No Langfuse client with public key {public_key} has been initialized. Skipping tracing for decorated function." 141 ) 142 return Langfuse( 143 tracing_enabled=False, public_key="fake", secret_key="fake" 144 ) 145 146 # target_instance is guaranteed to be not None at this point 147 return _create_client_from_instance(target_instance, public_key)
Get or create a Langfuse client instance.
Returns an existing Langfuse client or creates a new one if none exists. In multi-project setups, providing a public_key is required. Multi-project support is experimental - see Langfuse docs.
Behavior:
- Single project: Returns existing client or creates new one
- Multi-project: Requires public_key to return specific client
- No public_key in multi-project: Returns disabled client to prevent data leakage
The function uses a singleton pattern per public_key to conserve resources and maintain state.
Arguments:
- public_key (Optional[str]): Project identifier
- With key: Returns client for that project
- Without key: Returns single client or disabled client if multiple exist
Returns:
Langfuse: Client instance in one of three states: 1. Client for specified public_key 2. Default client for single-project setup 3. Disabled client when multiple projects exist without key
Security:
Disables tracing when multiple projects exist without explicit key to prevent cross-project data leakage. Multi-project setups are experimental.
Example:
# Single project client = get_client() # Default client # In multi-project usage: client_a = get_client(public_key="project_a_key") # Returns project A's client client_b = get_client(public_key="project_b_key") # Returns project B's client # Without specific key in multi-project setup: client = get_client() # Returns disabled client for safety
89 def observe( 90 self, 91 func: Optional[F] = None, 92 *, 93 name: Optional[str] = None, 94 as_type: Optional[ObservationTypeLiteralNoEvent] = None, 95 capture_input: Optional[bool] = None, 96 capture_output: Optional[bool] = None, 97 transform_to_string: Optional[Callable[[Iterable], str]] = None, 98 ) -> Union[F, Callable[[F], F]]: 99 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 100 101 This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates 102 spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator 103 intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints. 104 105 Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, 106 enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details. 107 108 Args: 109 func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None. 110 name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used. 111 as_type (Optional[Literal]): Set the observation type. Supported values: 112 "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". 113 Observation types are highlighted in the Langfuse UI for filtering and visualization. 114 The types "generation" and "embedding" create a span on which additional attributes such as model metrics 115 can be set. 116 117 Returns: 118 Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans. 119 120 Example: 121 For general function tracing with automatic naming: 122 ```python 123 @observe() 124 def process_user_request(user_id, query): 125 # Function is automatically traced with name "process_user_request" 126 return get_response(query) 127 ``` 128 129 For language model generation tracking: 130 ```python 131 @observe(name="answer-generation", as_type="generation") 132 async def generate_answer(query): 133 # Creates a generation-type span with extended LLM metrics 134 response = await openai.chat.completions.create( 135 model="gpt-4", 136 messages=[{"role": "user", "content": query}] 137 ) 138 return response.choices[0].message.content 139 ``` 140 141 For trace context propagation between functions: 142 ```python 143 @observe() 144 def main_process(): 145 # Parent span is created 146 return sub_process() # Child span automatically connected to parent 147 148 @observe() 149 def sub_process(): 150 # Automatically becomes a child span of main_process 151 return "result" 152 ``` 153 154 Raises: 155 Exception: Propagates any exceptions from the wrapped function after logging them in the trace. 156 157 Notes: 158 - The decorator preserves the original function's signature, docstring, and return type. 159 - Proper parent-child relationships between spans are automatically maintained. 160 - Special keyword arguments can be passed to control tracing: 161 - langfuse_trace_id: Explicitly set the trace ID for this function call 162 - langfuse_parent_observation_id: Explicitly set the parent span ID 163 - langfuse_public_key: Use a specific Langfuse project (when multiple clients exist) 164 - For async functions, the decorator returns an async function wrapper. 165 - For sync functions, the decorator returns a synchronous wrapper. 166 """ 167 valid_types = set(get_observation_types_list(ObservationTypeLiteralNoEvent)) 168 if as_type is not None and as_type not in valid_types: 169 self._log.warning( 170 f"Invalid as_type '{as_type}'. Valid types are: {', '.join(sorted(valid_types))}. Defaulting to 'span'." 171 ) 172 as_type = "span" 173 174 function_io_capture_enabled = os.environ.get( 175 LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, "True" 176 ).lower() not in ("false", "0") 177 178 should_capture_input = ( 179 capture_input if capture_input is not None else function_io_capture_enabled 180 ) 181 182 should_capture_output = ( 183 capture_output 184 if capture_output is not None 185 else function_io_capture_enabled 186 ) 187 188 def decorator(func: F) -> F: 189 return ( 190 self._async_observe( 191 func, 192 name=name, 193 as_type=as_type, 194 capture_input=should_capture_input, 195 capture_output=should_capture_output, 196 transform_to_string=transform_to_string, 197 ) 198 if asyncio.iscoroutinefunction(func) 199 else self._sync_observe( 200 func, 201 name=name, 202 as_type=as_type, 203 capture_input=should_capture_input, 204 capture_output=should_capture_output, 205 transform_to_string=transform_to_string, 206 ) 207 ) 208 209 """Handle decorator with or without parentheses. 210 211 This logic enables the decorator to work both with and without parentheses: 212 - @observe - Python passes the function directly to the decorator 213 - @observe() - Python calls the decorator first, which must return a function decorator 214 215 When called without arguments (@observe), the func parameter contains the function to decorate, 216 so we directly apply the decorator to it. When called with parentheses (@observe()), 217 func is None, so we return the decorator function itself for Python to apply in the next step. 218 """ 219 if func is None: 220 return decorator 221 else: 222 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
This decorator provides seamless integration of Langfuse observability into your codebase. It automatically creates spans or generations around function execution, capturing timing, inputs/outputs, and error states. The decorator intelligently handles both synchronous and asynchronous functions, preserving function signatures and type hints.
Using OpenTelemetry's distributed tracing system, it maintains proper trace context propagation throughout your application, enabling you to see hierarchical traces of function calls with detailed performance metrics and function-specific details.
Arguments:
- func (Optional[Callable]): The function to decorate. When used with parentheses @observe(), this will be None.
- name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used.
- as_type (Optional[Literal]): Set the observation type. Supported values: "generation", "span", "agent", "tool", "chain", "retriever", "embedding", "evaluator", "guardrail". Observation types are highlighted in the Langfuse UI for filtering and visualization. The types "generation" and "embedding" create a span on which additional attributes such as model metrics can be set.
Returns:
Callable: A wrapped version of the original function that automatically creates and manages Langfuse spans.
Example:
For general function tracing with automatic naming:
@observe() def process_user_request(user_id, query): # Function is automatically traced with name "process_user_request" return get_response(query)
For language model generation tracking:
@observe(name="answer-generation", as_type="generation") async def generate_answer(query): # Creates a generation-type span with extended LLM metrics response = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": query}] ) return response.choices[0].message.content
For trace context propagation between functions:
@observe() def main_process(): # Parent span is created return sub_process() # Child span automatically connected to parent @observe() def sub_process(): # Automatically becomes a child span of main_process return "result"
Raises:
- Exception: Propagates any exceptions from the wrapped function after logging them in the trace.
Notes:
- The decorator preserves the original function's signature, docstring, and return type.
- Proper parent-child relationships between spans are automatically maintained.
- Special keyword arguments can be passed to control tracing:
- langfuse_trace_id: Explicitly set the trace ID for this function call
- langfuse_parent_observation_id: Explicitly set the parent span ID
- langfuse_public_key: Use a specific Langfuse project (when multiple clients exist)
- For async functions, the decorator returns an async function wrapper.
- For sync functions, the decorator returns a synchronous wrapper.
1118class LangfuseSpan(LangfuseObservationWrapper): 1119 """Standard span implementation for general operations in Langfuse. 1120 1121 This class represents a general-purpose span that can be used to trace 1122 any operation in your application. It extends the base LangfuseObservationWrapper 1123 with specific methods for creating child spans, generations, and updating 1124 span-specific attributes. If possible, use a more specific type for 1125 better observability and insights. 1126 """ 1127 1128 def __init__( 1129 self, 1130 *, 1131 otel_span: otel_trace_api.Span, 1132 langfuse_client: "Langfuse", 1133 input: Optional[Any] = None, 1134 output: Optional[Any] = None, 1135 metadata: Optional[Any] = None, 1136 environment: Optional[str] = None, 1137 version: Optional[str] = None, 1138 level: Optional[SpanLevel] = None, 1139 status_message: Optional[str] = None, 1140 ): 1141 """Initialize a new LangfuseSpan. 1142 1143 Args: 1144 otel_span: The OpenTelemetry span to wrap 1145 langfuse_client: Reference to the parent Langfuse client 1146 input: Input data for the span (any JSON-serializable object) 1147 output: Output data from the span (any JSON-serializable object) 1148 metadata: Additional metadata to associate with the span 1149 environment: The tracing environment 1150 version: Version identifier for the code or component 1151 level: Importance level of the span (info, warning, error) 1152 status_message: Optional status message for the span 1153 """ 1154 super().__init__( 1155 otel_span=otel_span, 1156 as_type="span", 1157 langfuse_client=langfuse_client, 1158 input=input, 1159 output=output, 1160 metadata=metadata, 1161 environment=environment, 1162 version=version, 1163 level=level, 1164 status_message=status_message, 1165 ) 1166 1167 def start_span( 1168 self, 1169 name: str, 1170 input: Optional[Any] = None, 1171 output: Optional[Any] = None, 1172 metadata: Optional[Any] = None, 1173 version: Optional[str] = None, 1174 level: Optional[SpanLevel] = None, 1175 status_message: Optional[str] = None, 1176 ) -> "LangfuseSpan": 1177 """Create a new child span. 1178 1179 This method creates a new child span with this span as the parent. 1180 Unlike start_as_current_span(), this method does not set the new span 1181 as the current span in the context. 1182 1183 Args: 1184 name: Name of the span (e.g., function or operation name) 1185 input: Input data for the operation 1186 output: Output data from the operation 1187 metadata: Additional metadata to associate with the span 1188 version: Version identifier for the code or component 1189 level: Importance level of the span (info, warning, error) 1190 status_message: Optional status message for the span 1191 1192 Returns: 1193 A new LangfuseSpan that must be ended with .end() when complete 1194 1195 Example: 1196 ```python 1197 parent_span = langfuse.start_span(name="process-request") 1198 try: 1199 # Create a child span 1200 child_span = parent_span.start_span(name="validate-input") 1201 try: 1202 # Do validation work 1203 validation_result = validate(request_data) 1204 child_span.update(output=validation_result) 1205 finally: 1206 child_span.end() 1207 1208 # Continue with parent span 1209 result = process_validated_data(validation_result) 1210 parent_span.update(output=result) 1211 finally: 1212 parent_span.end() 1213 ``` 1214 """ 1215 return self.start_observation( 1216 name=name, 1217 as_type="span", 1218 input=input, 1219 output=output, 1220 metadata=metadata, 1221 version=version, 1222 level=level, 1223 status_message=status_message, 1224 ) 1225 1226 def start_as_current_span( 1227 self, 1228 *, 1229 name: str, 1230 input: Optional[Any] = None, 1231 output: Optional[Any] = None, 1232 metadata: Optional[Any] = None, 1233 version: Optional[str] = None, 1234 level: Optional[SpanLevel] = None, 1235 status_message: Optional[str] = None, 1236 ) -> _AgnosticContextManager["LangfuseSpan"]: 1237 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1238 1239 DEPRECATED: This method is deprecated and will be removed in a future version. 1240 Use start_as_current_observation(as_type='span') instead. 1241 1242 This method creates a new child span and sets it as the current span within 1243 a context manager. It should be used with a 'with' statement to automatically 1244 manage the span's lifecycle. 1245 1246 Args: 1247 name: Name of the span (e.g., function or operation name) 1248 input: Input data for the operation 1249 output: Output data from the operation 1250 metadata: Additional metadata to associate with the span 1251 version: Version identifier for the code or component 1252 level: Importance level of the span (info, warning, error) 1253 status_message: Optional status message for the span 1254 1255 Returns: 1256 A context manager that yields a new LangfuseSpan 1257 1258 Example: 1259 ```python 1260 with langfuse.start_as_current_span(name="process-request") as parent_span: 1261 # Parent span is active here 1262 1263 # Create a child span with context management 1264 with parent_span.start_as_current_span(name="validate-input") as child_span: 1265 # Child span is active here 1266 validation_result = validate(request_data) 1267 child_span.update(output=validation_result) 1268 1269 # Back to parent span context 1270 result = process_validated_data(validation_result) 1271 parent_span.update(output=result) 1272 ``` 1273 """ 1274 warnings.warn( 1275 "start_as_current_span is deprecated and will be removed in a future version. " 1276 "Use start_as_current_observation(as_type='span') instead.", 1277 DeprecationWarning, 1278 stacklevel=2, 1279 ) 1280 return self.start_as_current_observation( 1281 name=name, 1282 as_type="span", 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 ) 1290 1291 def start_generation( 1292 self, 1293 *, 1294 name: str, 1295 input: Optional[Any] = None, 1296 output: Optional[Any] = None, 1297 metadata: Optional[Any] = None, 1298 version: Optional[str] = None, 1299 level: Optional[SpanLevel] = None, 1300 status_message: Optional[str] = None, 1301 completion_start_time: Optional[datetime] = None, 1302 model: Optional[str] = None, 1303 model_parameters: Optional[Dict[str, MapValue]] = None, 1304 usage_details: Optional[Dict[str, int]] = None, 1305 cost_details: Optional[Dict[str, float]] = None, 1306 prompt: Optional[PromptClient] = None, 1307 ) -> "LangfuseGeneration": 1308 """[DEPRECATED] Create a new child generation span. 1309 1310 DEPRECATED: This method is deprecated and will be removed in a future version. 1311 Use start_observation(as_type='generation') instead. 1312 1313 This method creates a new child generation span with this span as the parent. 1314 Generation spans are specialized for AI/LLM operations and include additional 1315 fields for model information, usage stats, and costs. 1316 1317 Unlike start_as_current_generation(), this method does not set the new span 1318 as the current span in the context. 1319 1320 Args: 1321 name: Name of the generation operation 1322 input: Input data for the model (e.g., prompts) 1323 output: Output from the model (e.g., completions) 1324 metadata: Additional metadata to associate with the generation 1325 version: Version identifier for the model or component 1326 level: Importance level of the generation (info, warning, error) 1327 status_message: Optional status message for the generation 1328 completion_start_time: When the model started generating the response 1329 model: Name/identifier of the AI model used (e.g., "gpt-4") 1330 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1331 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1332 cost_details: Cost information for the model call 1333 prompt: Associated prompt template from Langfuse prompt management 1334 1335 Returns: 1336 A new LangfuseGeneration that must be ended with .end() when complete 1337 1338 Example: 1339 ```python 1340 span = langfuse.start_span(name="process-query") 1341 try: 1342 # Create a generation child span 1343 generation = span.start_generation( 1344 name="generate-answer", 1345 model="gpt-4", 1346 input={"prompt": "Explain quantum computing"} 1347 ) 1348 try: 1349 # Call model API 1350 response = llm.generate(...) 1351 1352 generation.update( 1353 output=response.text, 1354 usage_details={ 1355 "prompt_tokens": response.usage.prompt_tokens, 1356 "completion_tokens": response.usage.completion_tokens 1357 } 1358 ) 1359 finally: 1360 generation.end() 1361 1362 # Continue with parent span 1363 span.update(output={"answer": response.text, "source": "gpt-4"}) 1364 finally: 1365 span.end() 1366 ``` 1367 """ 1368 warnings.warn( 1369 "start_generation is deprecated and will be removed in a future version. " 1370 "Use start_observation(as_type='generation') instead.", 1371 DeprecationWarning, 1372 stacklevel=2, 1373 ) 1374 return self.start_observation( 1375 name=name, 1376 as_type="generation", 1377 input=input, 1378 output=output, 1379 metadata=metadata, 1380 version=version, 1381 level=level, 1382 status_message=status_message, 1383 completion_start_time=completion_start_time, 1384 model=model, 1385 model_parameters=model_parameters, 1386 usage_details=usage_details, 1387 cost_details=cost_details, 1388 prompt=prompt, 1389 ) 1390 1391 def start_as_current_generation( 1392 self, 1393 *, 1394 name: str, 1395 input: Optional[Any] = None, 1396 output: Optional[Any] = None, 1397 metadata: Optional[Any] = None, 1398 version: Optional[str] = None, 1399 level: Optional[SpanLevel] = None, 1400 status_message: Optional[str] = None, 1401 completion_start_time: Optional[datetime] = None, 1402 model: Optional[str] = None, 1403 model_parameters: Optional[Dict[str, MapValue]] = None, 1404 usage_details: Optional[Dict[str, int]] = None, 1405 cost_details: Optional[Dict[str, float]] = None, 1406 prompt: Optional[PromptClient] = None, 1407 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1408 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1409 1410 DEPRECATED: This method is deprecated and will be removed in a future version. 1411 Use start_as_current_observation(as_type='generation') instead. 1412 1413 This method creates a new child generation span and sets it as the current span 1414 within a context manager. Generation spans are specialized for AI/LLM operations 1415 and include additional fields for model information, usage stats, and costs. 1416 1417 Args: 1418 name: Name of the generation operation 1419 input: Input data for the model (e.g., prompts) 1420 output: Output from the model (e.g., completions) 1421 metadata: Additional metadata to associate with the generation 1422 version: Version identifier for the model or component 1423 level: Importance level of the generation (info, warning, error) 1424 status_message: Optional status message for the generation 1425 completion_start_time: When the model started generating the response 1426 model: Name/identifier of the AI model used (e.g., "gpt-4") 1427 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1428 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1429 cost_details: Cost information for the model call 1430 prompt: Associated prompt template from Langfuse prompt management 1431 1432 Returns: 1433 A context manager that yields a new LangfuseGeneration 1434 1435 Example: 1436 ```python 1437 with langfuse.start_as_current_span(name="process-request") as span: 1438 # Prepare data 1439 query = preprocess_user_query(user_input) 1440 1441 # Create a generation span with context management 1442 with span.start_as_current_generation( 1443 name="generate-answer", 1444 model="gpt-4", 1445 input={"query": query} 1446 ) as generation: 1447 # Generation span is active here 1448 response = llm.generate(query) 1449 1450 # Update with results 1451 generation.update( 1452 output=response.text, 1453 usage_details={ 1454 "prompt_tokens": response.usage.prompt_tokens, 1455 "completion_tokens": response.usage.completion_tokens 1456 } 1457 ) 1458 1459 # Back to parent span context 1460 span.update(output={"answer": response.text, "source": "gpt-4"}) 1461 ``` 1462 """ 1463 warnings.warn( 1464 "start_as_current_generation is deprecated and will be removed in a future version. " 1465 "Use start_as_current_observation(as_type='generation') instead.", 1466 DeprecationWarning, 1467 stacklevel=2, 1468 ) 1469 return self.start_as_current_observation( 1470 name=name, 1471 as_type="generation", 1472 input=input, 1473 output=output, 1474 metadata=metadata, 1475 version=version, 1476 level=level, 1477 status_message=status_message, 1478 completion_start_time=completion_start_time, 1479 model=model, 1480 model_parameters=model_parameters, 1481 usage_details=usage_details, 1482 cost_details=cost_details, 1483 prompt=prompt, 1484 ) 1485 1486 def create_event( 1487 self, 1488 *, 1489 name: str, 1490 input: Optional[Any] = None, 1491 output: Optional[Any] = None, 1492 metadata: Optional[Any] = None, 1493 version: Optional[str] = None, 1494 level: Optional[SpanLevel] = None, 1495 status_message: Optional[str] = None, 1496 ) -> "LangfuseEvent": 1497 """Create a new Langfuse observation of type 'EVENT'. 1498 1499 Args: 1500 name: Name of the span (e.g., function or operation name) 1501 input: Input data for the operation (can be any JSON-serializable object) 1502 output: Output data from the operation (can be any JSON-serializable object) 1503 metadata: Additional metadata to associate with the span 1504 version: Version identifier for the code or component 1505 level: Importance level of the span (info, warning, error) 1506 status_message: Optional status message for the span 1507 1508 Returns: 1509 The LangfuseEvent object 1510 1511 Example: 1512 ```python 1513 event = langfuse.create_event(name="process-event") 1514 ``` 1515 """ 1516 timestamp = time_ns() 1517 1518 with otel_trace_api.use_span(self._otel_span): 1519 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1520 name=name, start_time=timestamp 1521 ) 1522 1523 return cast( 1524 "LangfuseEvent", 1525 LangfuseEvent( 1526 otel_span=new_otel_span, 1527 langfuse_client=self._langfuse_client, 1528 input=input, 1529 output=output, 1530 metadata=metadata, 1531 environment=self._environment, 1532 version=version, 1533 level=level, 1534 status_message=status_message, 1535 ).end(end_time=timestamp), 1536 )
Standard span implementation for general operations in Langfuse.
This class represents a general-purpose span that can be used to trace any operation in your application. It extends the base LangfuseObservationWrapper with specific methods for creating child spans, generations, and updating span-specific attributes. If possible, use a more specific type for better observability and insights.
1128 def __init__( 1129 self, 1130 *, 1131 otel_span: otel_trace_api.Span, 1132 langfuse_client: "Langfuse", 1133 input: Optional[Any] = None, 1134 output: Optional[Any] = None, 1135 metadata: Optional[Any] = None, 1136 environment: Optional[str] = None, 1137 version: Optional[str] = None, 1138 level: Optional[SpanLevel] = None, 1139 status_message: Optional[str] = None, 1140 ): 1141 """Initialize a new LangfuseSpan. 1142 1143 Args: 1144 otel_span: The OpenTelemetry span to wrap 1145 langfuse_client: Reference to the parent Langfuse client 1146 input: Input data for the span (any JSON-serializable object) 1147 output: Output data from the span (any JSON-serializable object) 1148 metadata: Additional metadata to associate with the span 1149 environment: The tracing environment 1150 version: Version identifier for the code or component 1151 level: Importance level of the span (info, warning, error) 1152 status_message: Optional status message for the span 1153 """ 1154 super().__init__( 1155 otel_span=otel_span, 1156 as_type="span", 1157 langfuse_client=langfuse_client, 1158 input=input, 1159 output=output, 1160 metadata=metadata, 1161 environment=environment, 1162 version=version, 1163 level=level, 1164 status_message=status_message, 1165 )
Initialize a new LangfuseSpan.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the span (any JSON-serializable object)
- output: Output data from the span (any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- environment: The tracing environment
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
1167 def start_span( 1168 self, 1169 name: str, 1170 input: Optional[Any] = None, 1171 output: Optional[Any] = None, 1172 metadata: Optional[Any] = None, 1173 version: Optional[str] = None, 1174 level: Optional[SpanLevel] = None, 1175 status_message: Optional[str] = None, 1176 ) -> "LangfuseSpan": 1177 """Create a new child span. 1178 1179 This method creates a new child span with this span as the parent. 1180 Unlike start_as_current_span(), this method does not set the new span 1181 as the current span in the context. 1182 1183 Args: 1184 name: Name of the span (e.g., function or operation name) 1185 input: Input data for the operation 1186 output: Output data from the operation 1187 metadata: Additional metadata to associate with the span 1188 version: Version identifier for the code or component 1189 level: Importance level of the span (info, warning, error) 1190 status_message: Optional status message for the span 1191 1192 Returns: 1193 A new LangfuseSpan that must be ended with .end() when complete 1194 1195 Example: 1196 ```python 1197 parent_span = langfuse.start_span(name="process-request") 1198 try: 1199 # Create a child span 1200 child_span = parent_span.start_span(name="validate-input") 1201 try: 1202 # Do validation work 1203 validation_result = validate(request_data) 1204 child_span.update(output=validation_result) 1205 finally: 1206 child_span.end() 1207 1208 # Continue with parent span 1209 result = process_validated_data(validation_result) 1210 parent_span.update(output=result) 1211 finally: 1212 parent_span.end() 1213 ``` 1214 """ 1215 return self.start_observation( 1216 name=name, 1217 as_type="span", 1218 input=input, 1219 output=output, 1220 metadata=metadata, 1221 version=version, 1222 level=level, 1223 status_message=status_message, 1224 )
Create a new child span.
This method creates a new child span with this span as the parent. Unlike start_as_current_span(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A new LangfuseSpan that must be ended with .end() when complete
Example:
parent_span = langfuse.start_span(name="process-request") try: # Create a child span child_span = parent_span.start_span(name="validate-input") try: # Do validation work validation_result = validate(request_data) child_span.update(output=validation_result) finally: child_span.end() # Continue with parent span result = process_validated_data(validation_result) parent_span.update(output=result) finally: parent_span.end()
1226 def start_as_current_span( 1227 self, 1228 *, 1229 name: str, 1230 input: Optional[Any] = None, 1231 output: Optional[Any] = None, 1232 metadata: Optional[Any] = None, 1233 version: Optional[str] = None, 1234 level: Optional[SpanLevel] = None, 1235 status_message: Optional[str] = None, 1236 ) -> _AgnosticContextManager["LangfuseSpan"]: 1237 """[DEPRECATED] Create a new child span and set it as the current span in a context manager. 1238 1239 DEPRECATED: This method is deprecated and will be removed in a future version. 1240 Use start_as_current_observation(as_type='span') instead. 1241 1242 This method creates a new child span and sets it as the current span within 1243 a context manager. It should be used with a 'with' statement to automatically 1244 manage the span's lifecycle. 1245 1246 Args: 1247 name: Name of the span (e.g., function or operation name) 1248 input: Input data for the operation 1249 output: Output data from the operation 1250 metadata: Additional metadata to associate with the span 1251 version: Version identifier for the code or component 1252 level: Importance level of the span (info, warning, error) 1253 status_message: Optional status message for the span 1254 1255 Returns: 1256 A context manager that yields a new LangfuseSpan 1257 1258 Example: 1259 ```python 1260 with langfuse.start_as_current_span(name="process-request") as parent_span: 1261 # Parent span is active here 1262 1263 # Create a child span with context management 1264 with parent_span.start_as_current_span(name="validate-input") as child_span: 1265 # Child span is active here 1266 validation_result = validate(request_data) 1267 child_span.update(output=validation_result) 1268 1269 # Back to parent span context 1270 result = process_validated_data(validation_result) 1271 parent_span.update(output=result) 1272 ``` 1273 """ 1274 warnings.warn( 1275 "start_as_current_span is deprecated and will be removed in a future version. " 1276 "Use start_as_current_observation(as_type='span') instead.", 1277 DeprecationWarning, 1278 stacklevel=2, 1279 ) 1280 return self.start_as_current_observation( 1281 name=name, 1282 as_type="span", 1283 input=input, 1284 output=output, 1285 metadata=metadata, 1286 version=version, 1287 level=level, 1288 status_message=status_message, 1289 )
[DEPRECATED] Create a new child span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='span') instead.
This method creates a new child span and sets it as the current span within a context manager. It should be used with a 'with' statement to automatically manage the span's lifecycle.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation
- output: Output data from the operation
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
A context manager that yields a new LangfuseSpan
Example:
with langfuse.start_as_current_span(name="process-request") as parent_span: # Parent span is active here # Create a child span with context management with parent_span.start_as_current_span(name="validate-input") as child_span: # Child span is active here validation_result = validate(request_data) child_span.update(output=validation_result) # Back to parent span context result = process_validated_data(validation_result) parent_span.update(output=result)
1291 def start_generation( 1292 self, 1293 *, 1294 name: str, 1295 input: Optional[Any] = None, 1296 output: Optional[Any] = None, 1297 metadata: Optional[Any] = None, 1298 version: Optional[str] = None, 1299 level: Optional[SpanLevel] = None, 1300 status_message: Optional[str] = None, 1301 completion_start_time: Optional[datetime] = None, 1302 model: Optional[str] = None, 1303 model_parameters: Optional[Dict[str, MapValue]] = None, 1304 usage_details: Optional[Dict[str, int]] = None, 1305 cost_details: Optional[Dict[str, float]] = None, 1306 prompt: Optional[PromptClient] = None, 1307 ) -> "LangfuseGeneration": 1308 """[DEPRECATED] Create a new child generation span. 1309 1310 DEPRECATED: This method is deprecated and will be removed in a future version. 1311 Use start_observation(as_type='generation') instead. 1312 1313 This method creates a new child generation span with this span as the parent. 1314 Generation spans are specialized for AI/LLM operations and include additional 1315 fields for model information, usage stats, and costs. 1316 1317 Unlike start_as_current_generation(), this method does not set the new span 1318 as the current span in the context. 1319 1320 Args: 1321 name: Name of the generation operation 1322 input: Input data for the model (e.g., prompts) 1323 output: Output from the model (e.g., completions) 1324 metadata: Additional metadata to associate with the generation 1325 version: Version identifier for the model or component 1326 level: Importance level of the generation (info, warning, error) 1327 status_message: Optional status message for the generation 1328 completion_start_time: When the model started generating the response 1329 model: Name/identifier of the AI model used (e.g., "gpt-4") 1330 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1331 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1332 cost_details: Cost information for the model call 1333 prompt: Associated prompt template from Langfuse prompt management 1334 1335 Returns: 1336 A new LangfuseGeneration that must be ended with .end() when complete 1337 1338 Example: 1339 ```python 1340 span = langfuse.start_span(name="process-query") 1341 try: 1342 # Create a generation child span 1343 generation = span.start_generation( 1344 name="generate-answer", 1345 model="gpt-4", 1346 input={"prompt": "Explain quantum computing"} 1347 ) 1348 try: 1349 # Call model API 1350 response = llm.generate(...) 1351 1352 generation.update( 1353 output=response.text, 1354 usage_details={ 1355 "prompt_tokens": response.usage.prompt_tokens, 1356 "completion_tokens": response.usage.completion_tokens 1357 } 1358 ) 1359 finally: 1360 generation.end() 1361 1362 # Continue with parent span 1363 span.update(output={"answer": response.text, "source": "gpt-4"}) 1364 finally: 1365 span.end() 1366 ``` 1367 """ 1368 warnings.warn( 1369 "start_generation is deprecated and will be removed in a future version. " 1370 "Use start_observation(as_type='generation') instead.", 1371 DeprecationWarning, 1372 stacklevel=2, 1373 ) 1374 return self.start_observation( 1375 name=name, 1376 as_type="generation", 1377 input=input, 1378 output=output, 1379 metadata=metadata, 1380 version=version, 1381 level=level, 1382 status_message=status_message, 1383 completion_start_time=completion_start_time, 1384 model=model, 1385 model_parameters=model_parameters, 1386 usage_details=usage_details, 1387 cost_details=cost_details, 1388 prompt=prompt, 1389 )
[DEPRECATED] Create a new child generation span.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_observation(as_type='generation') instead.
This method creates a new child generation span with this span as the parent. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Unlike start_as_current_generation(), this method does not set the new span as the current span in the context.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A new LangfuseGeneration that must be ended with .end() when complete
Example:
span = langfuse.start_span(name="process-query") try: # Create a generation child span generation = span.start_generation( name="generate-answer", model="gpt-4", input={"prompt": "Explain quantum computing"} ) try: # Call model API response = llm.generate(...) generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) finally: generation.end() # Continue with parent span span.update(output={"answer": response.text, "source": "gpt-4"}) finally: span.end()
1391 def start_as_current_generation( 1392 self, 1393 *, 1394 name: str, 1395 input: Optional[Any] = None, 1396 output: Optional[Any] = None, 1397 metadata: Optional[Any] = None, 1398 version: Optional[str] = None, 1399 level: Optional[SpanLevel] = None, 1400 status_message: Optional[str] = None, 1401 completion_start_time: Optional[datetime] = None, 1402 model: Optional[str] = None, 1403 model_parameters: Optional[Dict[str, MapValue]] = None, 1404 usage_details: Optional[Dict[str, int]] = None, 1405 cost_details: Optional[Dict[str, float]] = None, 1406 prompt: Optional[PromptClient] = None, 1407 ) -> _AgnosticContextManager["LangfuseGeneration"]: 1408 """[DEPRECATED] Create a new child generation span and set it as the current span in a context manager. 1409 1410 DEPRECATED: This method is deprecated and will be removed in a future version. 1411 Use start_as_current_observation(as_type='generation') instead. 1412 1413 This method creates a new child generation span and sets it as the current span 1414 within a context manager. Generation spans are specialized for AI/LLM operations 1415 and include additional fields for model information, usage stats, and costs. 1416 1417 Args: 1418 name: Name of the generation operation 1419 input: Input data for the model (e.g., prompts) 1420 output: Output from the model (e.g., completions) 1421 metadata: Additional metadata to associate with the generation 1422 version: Version identifier for the model or component 1423 level: Importance level of the generation (info, warning, error) 1424 status_message: Optional status message for the generation 1425 completion_start_time: When the model started generating the response 1426 model: Name/identifier of the AI model used (e.g., "gpt-4") 1427 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1428 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1429 cost_details: Cost information for the model call 1430 prompt: Associated prompt template from Langfuse prompt management 1431 1432 Returns: 1433 A context manager that yields a new LangfuseGeneration 1434 1435 Example: 1436 ```python 1437 with langfuse.start_as_current_span(name="process-request") as span: 1438 # Prepare data 1439 query = preprocess_user_query(user_input) 1440 1441 # Create a generation span with context management 1442 with span.start_as_current_generation( 1443 name="generate-answer", 1444 model="gpt-4", 1445 input={"query": query} 1446 ) as generation: 1447 # Generation span is active here 1448 response = llm.generate(query) 1449 1450 # Update with results 1451 generation.update( 1452 output=response.text, 1453 usage_details={ 1454 "prompt_tokens": response.usage.prompt_tokens, 1455 "completion_tokens": response.usage.completion_tokens 1456 } 1457 ) 1458 1459 # Back to parent span context 1460 span.update(output={"answer": response.text, "source": "gpt-4"}) 1461 ``` 1462 """ 1463 warnings.warn( 1464 "start_as_current_generation is deprecated and will be removed in a future version. " 1465 "Use start_as_current_observation(as_type='generation') instead.", 1466 DeprecationWarning, 1467 stacklevel=2, 1468 ) 1469 return self.start_as_current_observation( 1470 name=name, 1471 as_type="generation", 1472 input=input, 1473 output=output, 1474 metadata=metadata, 1475 version=version, 1476 level=level, 1477 status_message=status_message, 1478 completion_start_time=completion_start_time, 1479 model=model, 1480 model_parameters=model_parameters, 1481 usage_details=usage_details, 1482 cost_details=cost_details, 1483 prompt=prompt, 1484 )
[DEPRECATED] Create a new child generation span and set it as the current span in a context manager.
DEPRECATED: This method is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.
This method creates a new child generation span and sets it as the current span within a context manager. Generation spans are specialized for AI/LLM operations and include additional fields for model information, usage stats, and costs.
Arguments:
- name: Name of the generation operation
- input: Input data for the model (e.g., prompts)
- output: Output from the model (e.g., completions)
- metadata: Additional metadata to associate with the generation
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
Returns:
A context manager that yields a new LangfuseGeneration
Example:
with langfuse.start_as_current_span(name="process-request") as span: # Prepare data query = preprocess_user_query(user_input) # Create a generation span with context management with span.start_as_current_generation( name="generate-answer", model="gpt-4", input={"query": query} ) as generation: # Generation span is active here response = llm.generate(query) # Update with results generation.update( output=response.text, usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens } ) # Back to parent span context span.update(output={"answer": response.text, "source": "gpt-4"})
1486 def create_event( 1487 self, 1488 *, 1489 name: str, 1490 input: Optional[Any] = None, 1491 output: Optional[Any] = None, 1492 metadata: Optional[Any] = None, 1493 version: Optional[str] = None, 1494 level: Optional[SpanLevel] = None, 1495 status_message: Optional[str] = None, 1496 ) -> "LangfuseEvent": 1497 """Create a new Langfuse observation of type 'EVENT'. 1498 1499 Args: 1500 name: Name of the span (e.g., function or operation name) 1501 input: Input data for the operation (can be any JSON-serializable object) 1502 output: Output data from the operation (can be any JSON-serializable object) 1503 metadata: Additional metadata to associate with the span 1504 version: Version identifier for the code or component 1505 level: Importance level of the span (info, warning, error) 1506 status_message: Optional status message for the span 1507 1508 Returns: 1509 The LangfuseEvent object 1510 1511 Example: 1512 ```python 1513 event = langfuse.create_event(name="process-event") 1514 ``` 1515 """ 1516 timestamp = time_ns() 1517 1518 with otel_trace_api.use_span(self._otel_span): 1519 new_otel_span = self._langfuse_client._otel_tracer.start_span( 1520 name=name, start_time=timestamp 1521 ) 1522 1523 return cast( 1524 "LangfuseEvent", 1525 LangfuseEvent( 1526 otel_span=new_otel_span, 1527 langfuse_client=self._langfuse_client, 1528 input=input, 1529 output=output, 1530 metadata=metadata, 1531 environment=self._environment, 1532 version=version, 1533 level=level, 1534 status_message=status_message, 1535 ).end(end_time=timestamp), 1536 )
Create a new Langfuse observation of type 'EVENT'.
Arguments:
- name: Name of the span (e.g., function or operation name)
- input: Input data for the operation (can be any JSON-serializable object)
- output: Output data from the operation (can be any JSON-serializable object)
- metadata: Additional metadata to associate with the span
- version: Version identifier for the code or component
- level: Importance level of the span (info, warning, error)
- status_message: Optional status message for the span
Returns:
The LangfuseEvent object
Example:
event = langfuse.create_event(name="process-event")
1539class LangfuseGeneration(LangfuseObservationWrapper): 1540 """Specialized span implementation for AI model generations in Langfuse. 1541 1542 This class represents a generation span specifically designed for tracking 1543 AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized 1544 attributes for model details, token usage, and costs. 1545 """ 1546 1547 def __init__( 1548 self, 1549 *, 1550 otel_span: otel_trace_api.Span, 1551 langfuse_client: "Langfuse", 1552 input: Optional[Any] = None, 1553 output: Optional[Any] = None, 1554 metadata: Optional[Any] = None, 1555 environment: Optional[str] = None, 1556 version: Optional[str] = None, 1557 level: Optional[SpanLevel] = None, 1558 status_message: Optional[str] = None, 1559 completion_start_time: Optional[datetime] = None, 1560 model: Optional[str] = None, 1561 model_parameters: Optional[Dict[str, MapValue]] = None, 1562 usage_details: Optional[Dict[str, int]] = None, 1563 cost_details: Optional[Dict[str, float]] = None, 1564 prompt: Optional[PromptClient] = None, 1565 ): 1566 """Initialize a new LangfuseGeneration span. 1567 1568 Args: 1569 otel_span: The OpenTelemetry span to wrap 1570 langfuse_client: Reference to the parent Langfuse client 1571 input: Input data for the generation (e.g., prompts) 1572 output: Output from the generation (e.g., completions) 1573 metadata: Additional metadata to associate with the generation 1574 environment: The tracing environment 1575 version: Version identifier for the model or component 1576 level: Importance level of the generation (info, warning, error) 1577 status_message: Optional status message for the generation 1578 completion_start_time: When the model started generating the response 1579 model: Name/identifier of the AI model used (e.g., "gpt-4") 1580 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1581 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1582 cost_details: Cost information for the model call 1583 prompt: Associated prompt template from Langfuse prompt management 1584 """ 1585 super().__init__( 1586 as_type="generation", 1587 otel_span=otel_span, 1588 langfuse_client=langfuse_client, 1589 input=input, 1590 output=output, 1591 metadata=metadata, 1592 environment=environment, 1593 version=version, 1594 level=level, 1595 status_message=status_message, 1596 completion_start_time=completion_start_time, 1597 model=model, 1598 model_parameters=model_parameters, 1599 usage_details=usage_details, 1600 cost_details=cost_details, 1601 prompt=prompt, 1602 )
Specialized span implementation for AI model generations in Langfuse.
This class represents a generation span specifically designed for tracking AI/LLM operations. It extends the base LangfuseObservationWrapper with specialized attributes for model details, token usage, and costs.
1547 def __init__( 1548 self, 1549 *, 1550 otel_span: otel_trace_api.Span, 1551 langfuse_client: "Langfuse", 1552 input: Optional[Any] = None, 1553 output: Optional[Any] = None, 1554 metadata: Optional[Any] = None, 1555 environment: Optional[str] = None, 1556 version: Optional[str] = None, 1557 level: Optional[SpanLevel] = None, 1558 status_message: Optional[str] = None, 1559 completion_start_time: Optional[datetime] = None, 1560 model: Optional[str] = None, 1561 model_parameters: Optional[Dict[str, MapValue]] = None, 1562 usage_details: Optional[Dict[str, int]] = None, 1563 cost_details: Optional[Dict[str, float]] = None, 1564 prompt: Optional[PromptClient] = None, 1565 ): 1566 """Initialize a new LangfuseGeneration span. 1567 1568 Args: 1569 otel_span: The OpenTelemetry span to wrap 1570 langfuse_client: Reference to the parent Langfuse client 1571 input: Input data for the generation (e.g., prompts) 1572 output: Output from the generation (e.g., completions) 1573 metadata: Additional metadata to associate with the generation 1574 environment: The tracing environment 1575 version: Version identifier for the model or component 1576 level: Importance level of the generation (info, warning, error) 1577 status_message: Optional status message for the generation 1578 completion_start_time: When the model started generating the response 1579 model: Name/identifier of the AI model used (e.g., "gpt-4") 1580 model_parameters: Parameters used for the model (e.g., temperature, max_tokens) 1581 usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) 1582 cost_details: Cost information for the model call 1583 prompt: Associated prompt template from Langfuse prompt management 1584 """ 1585 super().__init__( 1586 as_type="generation", 1587 otel_span=otel_span, 1588 langfuse_client=langfuse_client, 1589 input=input, 1590 output=output, 1591 metadata=metadata, 1592 environment=environment, 1593 version=version, 1594 level=level, 1595 status_message=status_message, 1596 completion_start_time=completion_start_time, 1597 model=model, 1598 model_parameters=model_parameters, 1599 usage_details=usage_details, 1600 cost_details=cost_details, 1601 prompt=prompt, 1602 )
Initialize a new LangfuseGeneration span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the generation (e.g., prompts)
- output: Output from the generation (e.g., completions)
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
- completion_start_time: When the model started generating the response
- model: Name/identifier of the AI model used (e.g., "gpt-4")
- model_parameters: Parameters used for the model (e.g., temperature, max_tokens)
- usage_details: Token usage information (e.g., prompt_tokens, completion_tokens)
- cost_details: Cost information for the model call
- prompt: Associated prompt template from Langfuse prompt management
1605class LangfuseEvent(LangfuseObservationWrapper): 1606 """Specialized span implementation for Langfuse Events.""" 1607 1608 def __init__( 1609 self, 1610 *, 1611 otel_span: otel_trace_api.Span, 1612 langfuse_client: "Langfuse", 1613 input: Optional[Any] = None, 1614 output: Optional[Any] = None, 1615 metadata: Optional[Any] = None, 1616 environment: Optional[str] = None, 1617 version: Optional[str] = None, 1618 level: Optional[SpanLevel] = None, 1619 status_message: Optional[str] = None, 1620 ): 1621 """Initialize a new LangfuseEvent span. 1622 1623 Args: 1624 otel_span: The OpenTelemetry span to wrap 1625 langfuse_client: Reference to the parent Langfuse client 1626 input: Input data for the event 1627 output: Output from the event 1628 metadata: Additional metadata to associate with the generation 1629 environment: The tracing environment 1630 version: Version identifier for the model or component 1631 level: Importance level of the generation (info, warning, error) 1632 status_message: Optional status message for the generation 1633 """ 1634 super().__init__( 1635 otel_span=otel_span, 1636 as_type="event", 1637 langfuse_client=langfuse_client, 1638 input=input, 1639 output=output, 1640 metadata=metadata, 1641 environment=environment, 1642 version=version, 1643 level=level, 1644 status_message=status_message, 1645 ) 1646 1647 def update( 1648 self, 1649 *, 1650 name: Optional[str] = None, 1651 input: Optional[Any] = None, 1652 output: Optional[Any] = None, 1653 metadata: Optional[Any] = None, 1654 version: Optional[str] = None, 1655 level: Optional[SpanLevel] = None, 1656 status_message: Optional[str] = None, 1657 completion_start_time: Optional[datetime] = None, 1658 model: Optional[str] = None, 1659 model_parameters: Optional[Dict[str, MapValue]] = None, 1660 usage_details: Optional[Dict[str, int]] = None, 1661 cost_details: Optional[Dict[str, float]] = None, 1662 prompt: Optional[PromptClient] = None, 1663 **kwargs: Any, 1664 ) -> "LangfuseEvent": 1665 """Update is not allowed for LangfuseEvent because events cannot be updated. 1666 1667 This method logs a warning and returns self without making changes. 1668 1669 Returns: 1670 self: Returns the unchanged LangfuseEvent instance 1671 """ 1672 langfuse_logger.warning( 1673 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1674 ) 1675 return self
Specialized span implementation for Langfuse Events.
1608 def __init__( 1609 self, 1610 *, 1611 otel_span: otel_trace_api.Span, 1612 langfuse_client: "Langfuse", 1613 input: Optional[Any] = None, 1614 output: Optional[Any] = None, 1615 metadata: Optional[Any] = None, 1616 environment: Optional[str] = None, 1617 version: Optional[str] = None, 1618 level: Optional[SpanLevel] = None, 1619 status_message: Optional[str] = None, 1620 ): 1621 """Initialize a new LangfuseEvent span. 1622 1623 Args: 1624 otel_span: The OpenTelemetry span to wrap 1625 langfuse_client: Reference to the parent Langfuse client 1626 input: Input data for the event 1627 output: Output from the event 1628 metadata: Additional metadata to associate with the generation 1629 environment: The tracing environment 1630 version: Version identifier for the model or component 1631 level: Importance level of the generation (info, warning, error) 1632 status_message: Optional status message for the generation 1633 """ 1634 super().__init__( 1635 otel_span=otel_span, 1636 as_type="event", 1637 langfuse_client=langfuse_client, 1638 input=input, 1639 output=output, 1640 metadata=metadata, 1641 environment=environment, 1642 version=version, 1643 level=level, 1644 status_message=status_message, 1645 )
Initialize a new LangfuseEvent span.
Arguments:
- otel_span: The OpenTelemetry span to wrap
- langfuse_client: Reference to the parent Langfuse client
- input: Input data for the event
- output: Output from the event
- metadata: Additional metadata to associate with the generation
- environment: The tracing environment
- version: Version identifier for the model or component
- level: Importance level of the generation (info, warning, error)
- status_message: Optional status message for the generation
1647 def update( 1648 self, 1649 *, 1650 name: Optional[str] = None, 1651 input: Optional[Any] = None, 1652 output: Optional[Any] = None, 1653 metadata: Optional[Any] = None, 1654 version: Optional[str] = None, 1655 level: Optional[SpanLevel] = None, 1656 status_message: Optional[str] = None, 1657 completion_start_time: Optional[datetime] = None, 1658 model: Optional[str] = None, 1659 model_parameters: Optional[Dict[str, MapValue]] = None, 1660 usage_details: Optional[Dict[str, int]] = None, 1661 cost_details: Optional[Dict[str, float]] = None, 1662 prompt: Optional[PromptClient] = None, 1663 **kwargs: Any, 1664 ) -> "LangfuseEvent": 1665 """Update is not allowed for LangfuseEvent because events cannot be updated. 1666 1667 This method logs a warning and returns self without making changes. 1668 1669 Returns: 1670 self: Returns the unchanged LangfuseEvent instance 1671 """ 1672 langfuse_logger.warning( 1673 "Attempted to update LangfuseEvent observation. Events cannot be updated after creation." 1674 ) 1675 return self
Update is not allowed for LangfuseEvent because events cannot be updated.
This method logs a warning and returns self without making changes.
Returns:
self: Returns the unchanged LangfuseEvent instance
28class LangfuseOtelSpanAttributes: 29 # Langfuse-Trace attributes 30 TRACE_NAME = "langfuse.trace.name" 31 TRACE_USER_ID = "user.id" 32 TRACE_SESSION_ID = "session.id" 33 TRACE_TAGS = "langfuse.trace.tags" 34 TRACE_PUBLIC = "langfuse.trace.public" 35 TRACE_METADATA = "langfuse.trace.metadata" 36 TRACE_INPUT = "langfuse.trace.input" 37 TRACE_OUTPUT = "langfuse.trace.output" 38 39 # Langfuse-observation attributes 40 OBSERVATION_TYPE = "langfuse.observation.type" 41 OBSERVATION_METADATA = "langfuse.observation.metadata" 42 OBSERVATION_LEVEL = "langfuse.observation.level" 43 OBSERVATION_STATUS_MESSAGE = "langfuse.observation.status_message" 44 OBSERVATION_INPUT = "langfuse.observation.input" 45 OBSERVATION_OUTPUT = "langfuse.observation.output" 46 47 # Langfuse-observation of type Generation attributes 48 OBSERVATION_COMPLETION_START_TIME = "langfuse.observation.completion_start_time" 49 OBSERVATION_MODEL = "langfuse.observation.model.name" 50 OBSERVATION_MODEL_PARAMETERS = "langfuse.observation.model.parameters" 51 OBSERVATION_USAGE_DETAILS = "langfuse.observation.usage_details" 52 OBSERVATION_COST_DETAILS = "langfuse.observation.cost_details" 53 OBSERVATION_PROMPT_NAME = "langfuse.observation.prompt.name" 54 OBSERVATION_PROMPT_VERSION = "langfuse.observation.prompt.version" 55 56 # General 57 ENVIRONMENT = "langfuse.environment" 58 RELEASE = "langfuse.release" 59 VERSION = "langfuse.version" 60 61 # Internal 62 AS_ROOT = "langfuse.internal.as_root"
1678class LangfuseAgent(LangfuseObservationWrapper): 1679 """Agent observation for reasoning blocks that act on tools using LLM guidance.""" 1680 1681 def __init__(self, **kwargs: Any) -> None: 1682 """Initialize a new LangfuseAgent span.""" 1683 kwargs["as_type"] = "agent" 1684 super().__init__(**kwargs)
Agent observation for reasoning blocks that act on tools using LLM guidance.
1687class LangfuseTool(LangfuseObservationWrapper): 1688 """Tool observation representing external tool calls, e.g., calling a weather API.""" 1689 1690 def __init__(self, **kwargs: Any) -> None: 1691 """Initialize a new LangfuseTool span.""" 1692 kwargs["as_type"] = "tool" 1693 super().__init__(**kwargs)
Tool observation representing external tool calls, e.g., calling a weather API.
1696class LangfuseChain(LangfuseObservationWrapper): 1697 """Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.""" 1698 1699 def __init__(self, **kwargs: Any) -> None: 1700 """Initialize a new LangfuseChain span.""" 1701 kwargs["as_type"] = "chain" 1702 super().__init__(**kwargs)
Chain observation for connecting LLM application steps, e.g. passing context from retriever to LLM.
1714class LangfuseEmbedding(LangfuseObservationWrapper): 1715 """Embedding observation for LLM embedding calls, typically used before retrieval.""" 1716 1717 def __init__(self, **kwargs: Any) -> None: 1718 """Initialize a new LangfuseEmbedding span.""" 1719 kwargs["as_type"] = "embedding" 1720 super().__init__(**kwargs)
Embedding observation for LLM embedding calls, typically used before retrieval.
1723class LangfuseEvaluator(LangfuseObservationWrapper): 1724 """Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.""" 1725 1726 def __init__(self, **kwargs: Any) -> None: 1727 """Initialize a new LangfuseEvaluator span.""" 1728 kwargs["as_type"] = "evaluator" 1729 super().__init__(**kwargs)
Evaluator observation for assessing relevance, correctness, or helpfulness of LLM outputs.
1705class LangfuseRetriever(LangfuseObservationWrapper): 1706 """Retriever observation for data retrieval steps, e.g. vector store or database queries.""" 1707 1708 def __init__(self, **kwargs: Any) -> None: 1709 """Initialize a new LangfuseRetriever span.""" 1710 kwargs["as_type"] = "retriever" 1711 super().__init__(**kwargs)
Retriever observation for data retrieval steps, e.g. vector store or database queries.
1732class LangfuseGuardrail(LangfuseObservationWrapper): 1733 """Guardrail observation for protection e.g. against jailbreaks or offensive content.""" 1734 1735 def __init__(self, **kwargs: Any) -> None: 1736 """Initialize a new LangfuseGuardrail span.""" 1737 kwargs["as_type"] = "guardrail" 1738 super().__init__(**kwargs)
Guardrail observation for protection e.g. against jailbreaks or offensive content.
97class Evaluation: 98 """Represents an evaluation result for an experiment item or an entire experiment run. 99 100 This class provides a strongly-typed way to create evaluation results in evaluator functions. 101 Users must use keyword arguments when instantiating this class. 102 103 Attributes: 104 name: Unique identifier for the evaluation metric. Should be descriptive 105 and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). 106 Used for aggregation and comparison across experiment runs. 107 value: The evaluation score or result. Can be: 108 - Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42) 109 - String: For categorical results like "positive", "negative", "neutral" 110 - Boolean: For binary assessments like "passes_safety_check" 111 - None: When evaluation cannot be computed (missing data, API errors, etc.) 112 comment: Optional human-readable explanation of the evaluation result. 113 Useful for providing context, explaining scoring rationale, or noting 114 special conditions. Displayed in Langfuse UI for interpretability. 115 metadata: Optional structured metadata about the evaluation process. 116 Can include confidence scores, intermediate calculations, model versions, 117 or any other relevant technical details. 118 data_type: Optional score data type. Required if value is not NUMERIC. 119 One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC. 120 config_id: Optional Langfuse score config ID. 121 122 Examples: 123 Basic accuracy evaluation: 124 ```python 125 from langfuse import Evaluation 126 127 def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): 128 if not expected_output: 129 return Evaluation(name="accuracy", value=None, comment="No expected output") 130 131 is_correct = output.strip().lower() == expected_output.strip().lower() 132 return Evaluation( 133 name="accuracy", 134 value=1.0 if is_correct else 0.0, 135 comment="Correct answer" if is_correct else "Incorrect answer" 136 ) 137 ``` 138 139 Multi-metric evaluator: 140 ```python 141 def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): 142 return [ 143 Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), 144 Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), 145 Evaluation( 146 name="quality", 147 value=0.85, 148 comment="High quality response", 149 metadata={"confidence": 0.92, "model": "gpt-4"} 150 ) 151 ] 152 ``` 153 154 Categorical evaluation: 155 ```python 156 def sentiment_evaluator(*, input, output, **kwargs): 157 sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" 158 return Evaluation( 159 name="sentiment", 160 value=sentiment, 161 comment=f"Response expresses {sentiment} sentiment", 162 data_type="CATEGORICAL" 163 ) 164 ``` 165 166 Failed evaluation with error handling: 167 ```python 168 def external_api_evaluator(*, input, output, **kwargs): 169 try: 170 score = external_api.evaluate(output) 171 return Evaluation(name="external_score", value=score) 172 except Exception as e: 173 return Evaluation( 174 name="external_score", 175 value=None, 176 comment=f"API unavailable: {e}", 177 metadata={"error": str(e), "retry_count": 3} 178 ) 179 ``` 180 181 Note: 182 All arguments must be passed as keywords. Positional arguments are not allowed 183 to ensure code clarity and prevent errors from argument reordering. 184 """ 185 186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Represents an evaluation result for an experiment item or an entire experiment run.
This class provides a strongly-typed way to create evaluation results in evaluator functions. Users must use keyword arguments when instantiating this class.
Attributes:
- name: Unique identifier for the evaluation metric. Should be descriptive and consistent across runs (e.g., "accuracy", "bleu_score", "toxicity"). Used for aggregation and comparison across experiment runs.
- value: The evaluation score or result. Can be:
- Numeric (int/float): For quantitative metrics like accuracy (0.85), BLEU (0.42)
- String: For categorical results like "positive", "negative", "neutral"
- Boolean: For binary assessments like "passes_safety_check"
- None: When evaluation cannot be computed (missing data, API errors, etc.)
- comment: Optional human-readable explanation of the evaluation result. Useful for providing context, explaining scoring rationale, or noting special conditions. Displayed in Langfuse UI for interpretability.
- metadata: Optional structured metadata about the evaluation process. Can include confidence scores, intermediate calculations, model versions, or any other relevant technical details.
- data_type: Optional score data type. Required if value is not NUMERIC. One of NUMERIC, CATEGORICAL, or BOOLEAN. Defaults to NUMERIC.
- config_id: Optional Langfuse score config ID.
Examples:
Basic accuracy evaluation:
from langfuse import Evaluation def accuracy_evaluator(*, input, output, expected_output=None, **kwargs): if not expected_output: return Evaluation(name="accuracy", value=None, comment="No expected output") is_correct = output.strip().lower() == expected_output.strip().lower() return Evaluation( name="accuracy", value=1.0 if is_correct else 0.0, comment="Correct answer" if is_correct else "Incorrect answer" )
Multi-metric evaluator:
def comprehensive_evaluator(*, input, output, expected_output=None, **kwargs): return [ Evaluation(name="length", value=len(output), comment=f"Output length: {len(output)} chars"), Evaluation(name="has_greeting", value="hello" in output.lower(), comment="Contains greeting"), Evaluation( name="quality", value=0.85, comment="High quality response", metadata={"confidence": 0.92, "model": "gpt-4"} ) ]
Categorical evaluation:
def sentiment_evaluator(*, input, output, **kwargs): sentiment = analyze_sentiment(output) # Returns "positive", "negative", or "neutral" return Evaluation( name="sentiment", value=sentiment, comment=f"Response expresses {sentiment} sentiment", data_type="CATEGORICAL" )
Failed evaluation with error handling:
def external_api_evaluator(*, input, output, **kwargs): try: score = external_api.evaluate(output) return Evaluation(name="external_score", value=score) except Exception as e: return Evaluation( name="external_score", value=None, comment=f"API unavailable: {e}", metadata={"error": str(e), "retry_count": 3} )
Note:
All arguments must be passed as keywords. Positional arguments are not allowed to ensure code clarity and prevent errors from argument reordering.
186 def __init__( 187 self, 188 *, 189 name: str, 190 value: Union[int, float, str, bool, None], 191 comment: Optional[str] = None, 192 metadata: Optional[Dict[str, Any]] = None, 193 data_type: Optional[ScoreDataType] = None, 194 config_id: Optional[str] = None, 195 ): 196 """Initialize an Evaluation with the provided data. 197 198 Args: 199 name: Unique identifier for the evaluation metric. 200 value: The evaluation score or result. 201 comment: Optional human-readable explanation of the result. 202 metadata: Optional structured metadata about the evaluation process. 203 data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN). 204 config_id: Optional Langfuse score config ID. 205 206 Note: 207 All arguments must be provided as keywords. Positional arguments will raise a TypeError. 208 """ 209 self.name = name 210 self.value = value 211 self.comment = comment 212 self.metadata = metadata 213 self.data_type = data_type 214 self.config_id = config_id
Initialize an Evaluation with the provided data.
Arguments:
- name: Unique identifier for the evaluation metric.
- value: The evaluation score or result.
- comment: Optional human-readable explanation of the result.
- metadata: Optional structured metadata about the evaluation process.
- data_type: Optional score data type (NUMERIC, CATEGORICAL, or BOOLEAN).
- config_id: Optional Langfuse score config ID.
Note:
All arguments must be provided as keywords. Positional arguments will raise a TypeError.