langfuse.decorators
Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe()
decorator.
Simple example (decorator + openai integration)
from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration
@observe()
def story():
return openai.chat.completions.create(
model="gpt-3.5-turbo",
max_tokens=100,
messages=[
{"role": "system", "content": "You are a great storyteller."},
{"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
],
).choices[0].message.content
@observe()
def main():
return story()
main()
See docs for more information.
1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator. 2 3*Simple example (decorator + openai integration)* 4 5```python 6from langfuse.decorators import observe 7from langfuse.openai import openai # OpenAI integration 8 9@observe() 10def story(): 11 return openai.chat.completions.create( 12 model="gpt-3.5-turbo", 13 max_tokens=100, 14 messages=[ 15 {"role": "system", "content": "You are a great storyteller."}, 16 {"role": "user", "content": "Once upon a time in a galaxy far, far away..."} 17 ], 18 ).choices[0].message.content 19 20@observe() 21def main(): 22 return story() 23 24main() 25``` 26 27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information. 28""" 29 30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator 31 32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
86 def observe( 87 self, 88 *, 89 name: Optional[str] = None, 90 as_type: Optional[Literal["generation"]] = None, 91 capture_input: bool = True, 92 capture_output: bool = True, 93 transform_to_string: Optional[Callable[[Iterable], str]] = None, 94 ) -> Callable[[F], F]: 95 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 96 97 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 98 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 99 100 Attributes: 101 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 102 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 103 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 104 capture_output (bool): If True, captures the return value of the function as output. Default is True. 105 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 106 107 Returns: 108 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 109 110 Example: 111 For general tracing (functions/methods): 112 ```python 113 @observe() 114 def your_function(args): 115 # Your implementation here 116 ``` 117 For observing language model generations: 118 ```python 119 @observe(as_type="generation") 120 def your_LLM_function(args): 121 # Your LLM invocation here 122 ``` 123 124 Raises: 125 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 126 127 Note: 128 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 129 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 130 """ 131 132 def decorator(func: F) -> F: 133 return ( 134 self._async_observe( 135 func, 136 name=name, 137 as_type=as_type, 138 capture_input=capture_input, 139 capture_output=capture_output, 140 transform_to_string=transform_to_string, 141 ) 142 if asyncio.iscoroutinefunction(func) 143 else self._sync_observe( 144 func, 145 name=name, 146 as_type=as_type, 147 capture_input=capture_input, 148 capture_output=capture_output, 149 transform_to_string=transform_to_string, 150 ) 151 ) 152 153 return decorator
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
83class LangfuseDecorator: 84 _log = logging.getLogger("langfuse") 85 86 def observe( 87 self, 88 *, 89 name: Optional[str] = None, 90 as_type: Optional[Literal["generation"]] = None, 91 capture_input: bool = True, 92 capture_output: bool = True, 93 transform_to_string: Optional[Callable[[Iterable], str]] = None, 94 ) -> Callable[[F], F]: 95 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 96 97 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 98 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 99 100 Attributes: 101 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 102 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 103 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 104 capture_output (bool): If True, captures the return value of the function as output. Default is True. 105 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 106 107 Returns: 108 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 109 110 Example: 111 For general tracing (functions/methods): 112 ```python 113 @observe() 114 def your_function(args): 115 # Your implementation here 116 ``` 117 For observing language model generations: 118 ```python 119 @observe(as_type="generation") 120 def your_LLM_function(args): 121 # Your LLM invocation here 122 ``` 123 124 Raises: 125 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 126 127 Note: 128 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 129 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 130 """ 131 132 def decorator(func: F) -> F: 133 return ( 134 self._async_observe( 135 func, 136 name=name, 137 as_type=as_type, 138 capture_input=capture_input, 139 capture_output=capture_output, 140 transform_to_string=transform_to_string, 141 ) 142 if asyncio.iscoroutinefunction(func) 143 else self._sync_observe( 144 func, 145 name=name, 146 as_type=as_type, 147 capture_input=capture_input, 148 capture_output=capture_output, 149 transform_to_string=transform_to_string, 150 ) 151 ) 152 153 return decorator 154 155 def _async_observe( 156 self, 157 func: F, 158 *, 159 name: Optional[str], 160 as_type: Optional[Literal["generation"]], 161 capture_input: bool, 162 capture_output: bool, 163 transform_to_string: Optional[Callable[[Iterable], str]] = None, 164 ) -> F: 165 @wraps(func) 166 async def async_wrapper(*args, **kwargs): 167 observation = self._prepare_call( 168 name=name or func.__name__, 169 as_type=as_type, 170 capture_input=capture_input, 171 is_method=self._is_method(func), 172 func_args=args, 173 func_kwargs=kwargs, 174 ) 175 result = None 176 177 try: 178 result = await func(*args, **kwargs) 179 except Exception as e: 180 self._handle_exception(observation, e) 181 finally: 182 result = self._finalize_call( 183 observation, result, capture_output, transform_to_string 184 ) 185 186 # Returning from finally block may swallow errors, so only return if result is not None 187 if result is not None: 188 return result 189 190 return cast(F, async_wrapper) 191 192 def _sync_observe( 193 self, 194 func: F, 195 *, 196 name: Optional[str], 197 as_type: Optional[Literal["generation"]], 198 capture_input: bool, 199 capture_output: bool, 200 transform_to_string: Optional[Callable[[Iterable], str]] = None, 201 ) -> F: 202 @wraps(func) 203 def sync_wrapper(*args, **kwargs): 204 observation = self._prepare_call( 205 name=name or func.__name__, 206 as_type=as_type, 207 capture_input=capture_input, 208 is_method=self._is_method(func), 209 func_args=args, 210 func_kwargs=kwargs, 211 ) 212 result = None 213 214 try: 215 result = func(*args, **kwargs) 216 except Exception as e: 217 self._handle_exception(observation, e) 218 finally: 219 result = self._finalize_call( 220 observation, result, capture_output, transform_to_string 221 ) 222 223 # Returning from finally block may swallow errors, so only return if result is not None 224 if result is not None: 225 return result 226 227 return cast(F, sync_wrapper) 228 229 @staticmethod 230 def _is_method(func: Callable) -> bool: 231 """Check if a callable is likely an class or instance method based on its signature. 232 233 This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method. 234 235 Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail. 236 237 Returns: 238 bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise. 239 """ 240 return ( 241 "self" in inspect.signature(func).parameters 242 or "cls" in inspect.signature(func).parameters 243 ) 244 245 def _prepare_call( 246 self, 247 *, 248 name: str, 249 as_type: Optional[Literal["generation"]], 250 capture_input: bool, 251 is_method: bool = False, 252 func_args: Tuple = (), 253 func_kwargs: Dict = {}, 254 ) -> Optional[ 255 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 256 ]: 257 try: 258 langfuse = self._get_langfuse() 259 stack = _observation_stack_context.get().copy() 260 parent = stack[-1] if stack else None 261 262 # Collect default observation data 263 observation_id = func_kwargs.pop("langfuse_observation_id", None) 264 id = str(observation_id) if observation_id else None 265 start_time = _get_timestamp() 266 267 input = ( 268 self._get_input_from_func_args( 269 is_method=is_method, 270 func_args=func_args, 271 func_kwargs=func_kwargs, 272 ) 273 if capture_input 274 else None 275 ) 276 277 params = { 278 "id": id, 279 "name": name, 280 "start_time": start_time, 281 "input": input, 282 } 283 284 # Create observation 285 if parent and as_type == "generation": 286 observation = parent.generation(**params) 287 elif as_type == "generation": 288 # Create wrapper trace if generation is top-level 289 # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again 290 trace = langfuse.trace(id=id, name=name, start_time=start_time) 291 observation = langfuse.generation( 292 name=name, start_time=start_time, input=input, trace_id=trace.id 293 ) 294 elif parent: 295 observation = parent.span(**params) 296 else: 297 observation = langfuse.trace(**params) 298 299 _observation_stack_context.set(stack + [observation]) 300 301 return observation 302 except Exception as e: 303 self._log.error(f"Failed to prepare observation: {e}") 304 305 def _get_input_from_func_args( 306 self, 307 *, 308 is_method: bool = False, 309 func_args: Tuple = (), 310 func_kwargs: Dict = {}, 311 ) -> Any: 312 # Remove implicitly passed "self" or "cls" argument for instance or class methods 313 logged_args = func_args[1:] if is_method else func_args 314 raw_input = { 315 "args": logged_args, 316 "kwargs": func_kwargs, 317 } 318 319 # Serialize and deserialize to ensure proper JSON serialization. 320 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 321 return json.loads(json.dumps(raw_input, cls=EventSerializer)) 322 323 def _finalize_call( 324 self, 325 observation: Optional[ 326 Union[ 327 StatefulSpanClient, 328 StatefulTraceClient, 329 StatefulGenerationClient, 330 ] 331 ], 332 result: Any, 333 capture_output: bool, 334 transform_to_string: Optional[Callable[[Iterable], str]] = None, 335 ): 336 if inspect.isgenerator(result): 337 return self._wrap_sync_generator_result( 338 observation, result, capture_output, transform_to_string 339 ) 340 elif inspect.isasyncgen(result): 341 return self._wrap_async_generator_result( 342 observation, result, capture_output, transform_to_string 343 ) 344 345 else: 346 return self._handle_call_result(observation, result, capture_output) 347 348 def _handle_call_result( 349 self, 350 observation: Optional[ 351 Union[ 352 StatefulSpanClient, 353 StatefulTraceClient, 354 StatefulGenerationClient, 355 ] 356 ], 357 result: Any, 358 capture_output: bool, 359 ): 360 try: 361 if observation is None: 362 raise ValueError("No observation found in the current context") 363 364 # Collect final observation data 365 observation_params = _observation_params_context.get()[ 366 observation.id 367 ].copy() 368 del _observation_params_context.get()[ 369 observation.id 370 ] # Remove observation params to avoid leaking 371 372 end_time = observation_params["end_time"] or _get_timestamp() 373 raw_output = observation_params["output"] or ( 374 result if result and capture_output else None 375 ) 376 377 # Serialize and deserialize to ensure proper JSON serialization. 378 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 379 output = json.loads(json.dumps(raw_output, cls=EventSerializer)) 380 observation_params.update(end_time=end_time, output=output) 381 382 if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)): 383 observation.end(**observation_params) 384 elif isinstance(observation, StatefulTraceClient): 385 observation.update(**observation_params) 386 387 # Remove observation from top of stack 388 stack = _observation_stack_context.get() 389 _observation_stack_context.set(stack[:-1]) 390 391 except Exception as e: 392 self._log.error(f"Failed to finalize observation: {e}") 393 394 finally: 395 return result 396 397 def _handle_exception( 398 self, 399 observation: Optional[ 400 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 401 ], 402 e: Exception, 403 ): 404 if observation: 405 _observation_params_context.get()[observation.id].update( 406 level="ERROR", status_message=str(e) 407 ) 408 raise e 409 410 def _wrap_sync_generator_result( 411 self, 412 observation: Optional[ 413 Union[ 414 StatefulSpanClient, 415 StatefulTraceClient, 416 StatefulGenerationClient, 417 ] 418 ], 419 generator: Generator, 420 capture_output: bool, 421 transform_to_string: Optional[Callable[[Iterable], str]] = None, 422 ): 423 items = [] 424 425 try: 426 for item in generator: 427 items.append(item) 428 429 yield item 430 431 finally: 432 output = items 433 434 if transform_to_string is not None: 435 output = transform_to_string(items) 436 437 elif all(isinstance(item, str) for item in items): 438 output = "".join(items) 439 440 self._handle_call_result(observation, output, capture_output) 441 442 async def _wrap_async_generator_result( 443 self, 444 observation: Optional[ 445 Union[ 446 StatefulSpanClient, 447 StatefulTraceClient, 448 StatefulGenerationClient, 449 ] 450 ], 451 generator: AsyncGenerator, 452 capture_output: bool, 453 transform_to_string: Optional[Callable[[Iterable], str]] = None, 454 ) -> AsyncGenerator: 455 items = [] 456 457 try: 458 async for item in generator: 459 items.append(item) 460 461 yield item 462 463 finally: 464 output = items 465 466 if transform_to_string is not None: 467 output = transform_to_string(items) 468 469 elif all(isinstance(item, str) for item in items): 470 output = "".join(items) 471 472 self._handle_call_result(observation, output, capture_output) 473 474 def get_current_llama_index_handler(self): 475 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 476 477 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 478 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 479 480 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 481 482 Returns: 483 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 484 485 Note: 486 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 487 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 488 """ 489 try: 490 from langfuse.llama_index import LlamaIndexCallbackHandler 491 except ImportError: 492 self._log.error( 493 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 494 ) 495 496 return None 497 498 observation = _observation_stack_context.get()[-1] 499 500 if observation is None: 501 self._log.warn("No observation found in the current context") 502 503 return None 504 505 if isinstance(observation, StatefulGenerationClient): 506 self._log.warn( 507 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 508 ) 509 510 return None 511 512 callback_handler = LlamaIndexCallbackHandler() 513 callback_handler.set_root(observation) 514 515 return callback_handler 516 517 def get_current_langchain_handler(self): 518 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 519 520 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 521 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 522 523 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 524 525 Returns: 526 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 527 528 Note: 529 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 530 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 531 """ 532 observation = _observation_stack_context.get()[-1] 533 534 if observation is None: 535 self._log.warn("No observation found in the current context") 536 537 return None 538 539 if isinstance(observation, StatefulGenerationClient): 540 self._log.warn( 541 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 542 ) 543 544 return None 545 546 return observation.get_langchain_handler() 547 548 def get_current_trace_id(self): 549 """Retrieve the ID of the current trace from the observation stack context. 550 551 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 552 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 553 representing the entry point of the traced execution context. 554 555 Returns: 556 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 557 possibly due to the method being called outside of any @observe-decorated function execution. 558 559 Note: 560 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 561 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 562 """ 563 stack = _observation_stack_context.get() 564 should_log_warning = self._get_caller_module_name() != "langfuse.openai" 565 566 if not stack: 567 if should_log_warning: 568 self._log.warn("No trace found in the current context") 569 570 return None 571 572 return stack[0].id 573 574 def _get_caller_module_name(self): 575 try: 576 caller_module = inspect.getmodule(inspect.stack()[2][0]) 577 except Exception as e: 578 self._log.warn(f"Failed to get caller module: {e}") 579 580 return None 581 582 return caller_module.__name__ if caller_module else None 583 584 def get_current_trace_url(self) -> Optional[str]: 585 """Retrieve the URL of the current trace in context. 586 587 Returns: 588 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 589 possibly due to the method being called outside of any @observe-decorated function execution. 590 591 Note: 592 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 593 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 594 """ 595 try: 596 trace_id = self.get_current_trace_id() 597 langfuse = self._get_langfuse() 598 599 if not trace_id: 600 raise ValueError("No trace found in the current context") 601 602 return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}" 603 604 except Exception as e: 605 self._log.error(f"Failed to get current trace URL: {e}") 606 607 return None 608 609 def get_current_observation_id(self): 610 """Retrieve the ID of the current observation in context. 611 612 Returns: 613 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 614 possibly due to the method being called outside of any @observe-decorated function execution. 615 616 Note: 617 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 618 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 619 - If called at the top level of a trace, it will return the trace ID. 620 """ 621 stack = _observation_stack_context.get() 622 should_log_warning = self._get_caller_module_name() != "langfuse.openai" 623 624 if not stack: 625 if should_log_warning: 626 self._log.warn("No observation found in the current context") 627 628 return None 629 630 return stack[-1].id 631 632 def update_current_trace( 633 self, 634 name: Optional[str] = None, 635 user_id: Optional[str] = None, 636 session_id: Optional[str] = None, 637 version: Optional[str] = None, 638 release: Optional[str] = None, 639 metadata: Optional[Any] = None, 640 tags: Optional[List[str]] = None, 641 public: Optional[bool] = None, 642 ): 643 """Set parameters for the current trace, updating the trace's metadata and context information. 644 645 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 646 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 647 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 648 649 Arguments: 650 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 651 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 652 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 653 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 654 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 655 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 656 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 657 658 Returns: 659 None 660 661 Note: 662 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 663 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 664 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 665 """ 666 trace_id = self.get_current_trace_id() 667 668 if trace_id is None: 669 self._log.warn("No trace found in the current context") 670 671 return 672 673 params_to_update = { 674 k: v 675 for k, v in { 676 "name": name, 677 "user_id": user_id, 678 "session_id": session_id, 679 "version": version, 680 "release": release, 681 "metadata": metadata, 682 "tags": tags, 683 "public": public, 684 }.items() 685 if v is not None 686 } 687 688 _observation_params_context.get()[trace_id].update(params_to_update) 689 690 def update_current_observation( 691 self, 692 *, 693 input: Optional[Any] = None, 694 output: Optional[Any] = None, 695 name: Optional[str] = None, 696 version: Optional[str] = None, 697 metadata: Optional[Any] = None, 698 start_time: Optional[datetime] = None, 699 end_time: Optional[datetime] = None, 700 release: Optional[str] = None, 701 tags: Optional[List[str]] = None, 702 user_id: Optional[str] = None, 703 session_id: Optional[str] = None, 704 level: Optional[SpanLevel] = None, 705 status_message: Optional[str] = None, 706 completion_start_time: Optional[datetime] = None, 707 model: Optional[str] = None, 708 model_parameters: Optional[Dict[str, MapValue]] = None, 709 usage: Optional[Union[BaseModel, ModelUsage]] = None, 710 prompt: Optional[PromptClient] = None, 711 public: Optional[bool] = None, 712 ): 713 """Update parameters for the current observation within an active trace context. 714 715 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 716 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 717 enhancing the observability and traceability of the execution context. 718 719 Note that if a param is not available on a specific observation type, it will be ignored. 720 721 Shared params: 722 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 723 - `output` (Optional[Any]): The output or result of the trace or observation 724 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 725 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 726 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 727 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 728 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 729 730 Trace-specific params: 731 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 732 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 733 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 734 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 735 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 736 737 Span-specific params: 738 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 739 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 740 741 Generation-specific params: 742 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 743 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 744 - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 745 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 746 747 Returns: 748 None 749 750 Raises: 751 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 752 753 Note: 754 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 755 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 756 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 757 """ 758 stack = _observation_stack_context.get() 759 observation = stack[-1] if stack else None 760 761 if not observation: 762 self._log.warn("No observation found in the current context") 763 764 return 765 766 update_params = { 767 k: v 768 for k, v in { 769 "input": input, 770 "output": output, 771 "name": name, 772 "version": version, 773 "metadata": metadata, 774 "start_time": start_time, 775 "end_time": end_time, 776 "release": release, 777 "tags": tags, 778 "user_id": user_id, 779 "session_id": session_id, 780 "level": level, 781 "status_message": status_message, 782 "completion_start_time": completion_start_time, 783 "model": model, 784 "model_parameters": model_parameters, 785 "usage": usage, 786 "prompt": prompt, 787 "public": public, 788 }.items() 789 if v is not None 790 } 791 792 _observation_params_context.get()[observation.id].update(update_params) 793 794 def score_current_observation( 795 self, 796 *, 797 name: str, 798 value: float, 799 comment: Optional[str] = None, 800 id: Optional[str] = None, 801 ): 802 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 803 804 Arguments: 805 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 806 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 807 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 808 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 809 810 Returns: 811 None 812 813 Note: 814 This method is intended to be used within the context of an active trace or observation. 815 """ 816 try: 817 langfuse = self._get_langfuse() 818 trace_id = self.get_current_trace_id() 819 current_observation_id = self.get_current_observation_id() 820 821 observation_id = ( 822 current_observation_id if current_observation_id != trace_id else None 823 ) 824 825 if trace_id: 826 langfuse.score( 827 trace_id=trace_id, 828 observation_id=observation_id, 829 name=name, 830 value=value, 831 comment=comment, 832 id=id, 833 ) 834 else: 835 raise ValueError("No trace or observation found in the current context") 836 837 except Exception as e: 838 self._log.error(f"Failed to score observation: {e}") 839 840 def score_current_trace( 841 self, 842 *, 843 name: str, 844 value: float, 845 comment: Optional[str] = None, 846 id: Optional[str] = None, 847 ): 848 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 849 850 Arguments: 851 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 852 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 853 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 854 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 855 856 Returns: 857 None 858 859 Note: 860 This method is intended to be used within the context of an active trace or observation. 861 """ 862 try: 863 langfuse = self._get_langfuse() 864 trace_id = self.get_current_trace_id() 865 866 if trace_id: 867 langfuse.score( 868 trace_id=trace_id, 869 name=name, 870 value=value, 871 comment=comment, 872 id=id, 873 ) 874 else: 875 raise ValueError("No trace found in the current context") 876 877 except Exception as e: 878 self._log.error(f"Failed to score observation: {e}") 879 880 @catch_and_log_errors 881 def flush(self): 882 """Force immediate flush of all buffered observations to the Langfuse backend. 883 884 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 885 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 886 887 Usage: 888 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 889 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 890 891 Returns: 892 None 893 894 Raises: 895 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 896 897 Note: 898 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 899 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 900 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 901 """ 902 langfuse = self._get_langfuse() 903 if langfuse: 904 langfuse.flush() 905 else: 906 self._log.warn("No langfuse object found in the current context") 907 908 def configure( 909 self, 910 *, 911 public_key: Optional[str] = None, 912 secret_key: Optional[str] = None, 913 host: Optional[str] = None, 914 release: Optional[str] = None, 915 debug: Optional[bool] = None, 916 threads: Optional[int] = None, 917 flush_at: Optional[int] = None, 918 flush_interval: Optional[int] = None, 919 max_retries: Optional[int] = None, 920 timeout: Optional[int] = None, 921 httpx_client: Optional[httpx.Client] = None, 922 enabled: Optional[bool] = None, 923 ): 924 """Configure the Langfuse client. 925 926 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 927 928 Args: 929 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 930 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 931 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 932 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 933 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 934 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 935 flush_at: Max batch size that's sent to the API. 936 flush_interval: Max delay until a new batch is sent to the API. 937 max_retries: Max number of retries in case of API/network errors. 938 timeout: Timeout of API requests in seconds. Default is 20 seconds. 939 httpx_client: Pass your own httpx client for more customizability of requests. 940 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 941 """ 942 langfuse_singleton = LangfuseSingleton() 943 langfuse_singleton.reset() 944 945 langfuse_singleton.get( 946 public_key=public_key, 947 secret_key=secret_key, 948 host=host, 949 release=release, 950 debug=debug, 951 threads=threads, 952 flush_at=flush_at, 953 flush_interval=flush_interval, 954 max_retries=max_retries, 955 timeout=timeout, 956 httpx_client=httpx_client, 957 enabled=enabled, 958 ) 959 960 def _get_langfuse(self) -> Langfuse: 961 return LangfuseSingleton().get() 962 963 def auth_check(self) -> bool: 964 """Check if the current Langfuse client is authenticated. 965 966 Returns: 967 bool: True if the client is authenticated, False otherwise 968 """ 969 try: 970 langfuse = self._get_langfuse() 971 972 return langfuse.auth_check() 973 except Exception as e: 974 self._log.error("No Langfuse object found in the current context", e) 975 976 return False
86 def observe( 87 self, 88 *, 89 name: Optional[str] = None, 90 as_type: Optional[Literal["generation"]] = None, 91 capture_input: bool = True, 92 capture_output: bool = True, 93 transform_to_string: Optional[Callable[[Iterable], str]] = None, 94 ) -> Callable[[F], F]: 95 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 96 97 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 98 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 99 100 Attributes: 101 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 102 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 103 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 104 capture_output (bool): If True, captures the return value of the function as output. Default is True. 105 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 106 107 Returns: 108 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 109 110 Example: 111 For general tracing (functions/methods): 112 ```python 113 @observe() 114 def your_function(args): 115 # Your implementation here 116 ``` 117 For observing language model generations: 118 ```python 119 @observe(as_type="generation") 120 def your_LLM_function(args): 121 # Your LLM invocation here 122 ``` 123 124 Raises: 125 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 126 127 Note: 128 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 129 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 130 """ 131 132 def decorator(func: F) -> F: 133 return ( 134 self._async_observe( 135 func, 136 name=name, 137 as_type=as_type, 138 capture_input=capture_input, 139 capture_output=capture_output, 140 transform_to_string=transform_to_string, 141 ) 142 if asyncio.iscoroutinefunction(func) 143 else self._sync_observe( 144 func, 145 name=name, 146 as_type=as_type, 147 capture_input=capture_input, 148 capture_output=capture_output, 149 transform_to_string=transform_to_string, 150 ) 151 ) 152 153 return decorator
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
474 def get_current_llama_index_handler(self): 475 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 476 477 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 478 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 479 480 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 481 482 Returns: 483 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 484 485 Note: 486 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 487 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 488 """ 489 try: 490 from langfuse.llama_index import LlamaIndexCallbackHandler 491 except ImportError: 492 self._log.error( 493 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 494 ) 495 496 return None 497 498 observation = _observation_stack_context.get()[-1] 499 500 if observation is None: 501 self._log.warn("No observation found in the current context") 502 503 return None 504 505 if isinstance(observation, StatefulGenerationClient): 506 self._log.warn( 507 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 508 ) 509 510 return None 511 512 callback_handler = LlamaIndexCallbackHandler() 513 callback_handler.set_root(observation) 514 515 return callback_handler
Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
Returns:
LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
517 def get_current_langchain_handler(self): 518 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 519 520 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 521 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 522 523 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 524 525 Returns: 526 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 527 528 Note: 529 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 530 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 531 """ 532 observation = _observation_stack_context.get()[-1] 533 534 if observation is None: 535 self._log.warn("No observation found in the current context") 536 537 return None 538 539 if isinstance(observation, StatefulGenerationClient): 540 self._log.warn( 541 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 542 ) 543 544 return None 545 546 return observation.get_langchain_handler()
Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
Returns:
LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
548 def get_current_trace_id(self): 549 """Retrieve the ID of the current trace from the observation stack context. 550 551 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 552 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 553 representing the entry point of the traced execution context. 554 555 Returns: 556 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 557 possibly due to the method being called outside of any @observe-decorated function execution. 558 559 Note: 560 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 561 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 562 """ 563 stack = _observation_stack_context.get() 564 should_log_warning = self._get_caller_module_name() != "langfuse.openai" 565 566 if not stack: 567 if should_log_warning: 568 self._log.warn("No trace found in the current context") 569 570 return None 571 572 return stack[0].id
Retrieve the ID of the current trace from the observation stack context.
This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context.
Returns:
str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
584 def get_current_trace_url(self) -> Optional[str]: 585 """Retrieve the URL of the current trace in context. 586 587 Returns: 588 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 589 possibly due to the method being called outside of any @observe-decorated function execution. 590 591 Note: 592 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 593 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 594 """ 595 try: 596 trace_id = self.get_current_trace_id() 597 langfuse = self._get_langfuse() 598 599 if not trace_id: 600 raise ValueError("No trace found in the current context") 601 602 return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}" 603 604 except Exception as e: 605 self._log.error(f"Failed to get current trace URL: {e}") 606 607 return None
Retrieve the URL of the current trace in context.
Returns:
str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
609 def get_current_observation_id(self): 610 """Retrieve the ID of the current observation in context. 611 612 Returns: 613 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 614 possibly due to the method being called outside of any @observe-decorated function execution. 615 616 Note: 617 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 618 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 619 - If called at the top level of a trace, it will return the trace ID. 620 """ 621 stack = _observation_stack_context.get() 622 should_log_warning = self._get_caller_module_name() != "langfuse.openai" 623 624 if not stack: 625 if should_log_warning: 626 self._log.warn("No observation found in the current context") 627 628 return None 629 630 return stack[-1].id
Retrieve the ID of the current observation in context.
Returns:
str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
- If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
- If called at the top level of a trace, it will return the trace ID.
632 def update_current_trace( 633 self, 634 name: Optional[str] = None, 635 user_id: Optional[str] = None, 636 session_id: Optional[str] = None, 637 version: Optional[str] = None, 638 release: Optional[str] = None, 639 metadata: Optional[Any] = None, 640 tags: Optional[List[str]] = None, 641 public: Optional[bool] = None, 642 ): 643 """Set parameters for the current trace, updating the trace's metadata and context information. 644 645 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 646 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 647 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 648 649 Arguments: 650 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 651 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 652 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 653 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 654 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 655 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 656 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 657 658 Returns: 659 None 660 661 Note: 662 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 663 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 664 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 665 """ 666 trace_id = self.get_current_trace_id() 667 668 if trace_id is None: 669 self._log.warn("No trace found in the current context") 670 671 return 672 673 params_to_update = { 674 k: v 675 for k, v in { 676 "name": name, 677 "user_id": user_id, 678 "session_id": session_id, 679 "version": version, 680 "release": release, 681 "metadata": metadata, 682 "tags": tags, 683 "public": public, 684 }.items() 685 if v is not None 686 } 687 688 _observation_params_context.get()[trace_id].update(params_to_update)
Set parameters for the current trace, updating the trace's metadata and context information.
This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
Arguments:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:
None
Note:
- This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
- The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
- If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
690 def update_current_observation( 691 self, 692 *, 693 input: Optional[Any] = None, 694 output: Optional[Any] = None, 695 name: Optional[str] = None, 696 version: Optional[str] = None, 697 metadata: Optional[Any] = None, 698 start_time: Optional[datetime] = None, 699 end_time: Optional[datetime] = None, 700 release: Optional[str] = None, 701 tags: Optional[List[str]] = None, 702 user_id: Optional[str] = None, 703 session_id: Optional[str] = None, 704 level: Optional[SpanLevel] = None, 705 status_message: Optional[str] = None, 706 completion_start_time: Optional[datetime] = None, 707 model: Optional[str] = None, 708 model_parameters: Optional[Dict[str, MapValue]] = None, 709 usage: Optional[Union[BaseModel, ModelUsage]] = None, 710 prompt: Optional[PromptClient] = None, 711 public: Optional[bool] = None, 712 ): 713 """Update parameters for the current observation within an active trace context. 714 715 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 716 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 717 enhancing the observability and traceability of the execution context. 718 719 Note that if a param is not available on a specific observation type, it will be ignored. 720 721 Shared params: 722 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 723 - `output` (Optional[Any]): The output or result of the trace or observation 724 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 725 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 726 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 727 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 728 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 729 730 Trace-specific params: 731 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 732 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 733 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 734 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 735 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 736 737 Span-specific params: 738 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 739 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 740 741 Generation-specific params: 742 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 743 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 744 - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 745 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 746 747 Returns: 748 None 749 750 Raises: 751 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 752 753 Note: 754 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 755 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 756 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 757 """ 758 stack = _observation_stack_context.get() 759 observation = stack[-1] if stack else None 760 761 if not observation: 762 self._log.warn("No observation found in the current context") 763 764 return 765 766 update_params = { 767 k: v 768 for k, v in { 769 "input": input, 770 "output": output, 771 "name": name, 772 "version": version, 773 "metadata": metadata, 774 "start_time": start_time, 775 "end_time": end_time, 776 "release": release, 777 "tags": tags, 778 "user_id": user_id, 779 "session_id": session_id, 780 "level": level, 781 "status_message": status_message, 782 "completion_start_time": completion_start_time, 783 "model": model, 784 "model_parameters": model_parameters, 785 "usage": usage, 786 "prompt": prompt, 787 "public": public, 788 }.items() 789 if v is not None 790 } 791 792 _observation_params_context.get()[observation.id].update(update_params)
Update parameters for the current observation within an active trace context.
This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.
Note that if a param is not available on a specific observation type, it will be ignored.
Shared params:
input
(Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.output
(Optional[Any]): The output or result of the trace or observationname
(Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.metadata
(Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.start_time
(Optional[datetime]): The start time of the observation, allowing for custom time range specification.end_time
(Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.version
(Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
Trace-specific params:
- user_id
(Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id
(Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- release
(Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- tags
(Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public
(Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Span-specific params:
- level
(Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
- status_message
(Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
Generation-specific params:
- completion_start_time
(Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
- model_parameters
(Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
- usage
(Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
- prompt
(Optional[PromptClient]): The prompt object used for the generation.
Returns:
None
Raises:
- ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
- This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
- It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
- Parameters set to
None
will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
794 def score_current_observation( 795 self, 796 *, 797 name: str, 798 value: float, 799 comment: Optional[str] = None, 800 id: Optional[str] = None, 801 ): 802 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 803 804 Arguments: 805 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 806 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 807 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 808 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 809 810 Returns: 811 None 812 813 Note: 814 This method is intended to be used within the context of an active trace or observation. 815 """ 816 try: 817 langfuse = self._get_langfuse() 818 trace_id = self.get_current_trace_id() 819 current_observation_id = self.get_current_observation_id() 820 821 observation_id = ( 822 current_observation_id if current_observation_id != trace_id else None 823 ) 824 825 if trace_id: 826 langfuse.score( 827 trace_id=trace_id, 828 observation_id=observation_id, 829 name=name, 830 value=value, 831 comment=comment, 832 id=id, 833 ) 834 else: 835 raise ValueError("No trace or observation found in the current context") 836 837 except Exception as e: 838 self._log.error(f"Failed to score observation: {e}")
Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
840 def score_current_trace( 841 self, 842 *, 843 name: str, 844 value: float, 845 comment: Optional[str] = None, 846 id: Optional[str] = None, 847 ): 848 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 849 850 Arguments: 851 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 852 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 853 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 854 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 855 856 Returns: 857 None 858 859 Note: 860 This method is intended to be used within the context of an active trace or observation. 861 """ 862 try: 863 langfuse = self._get_langfuse() 864 trace_id = self.get_current_trace_id() 865 866 if trace_id: 867 langfuse.score( 868 trace_id=trace_id, 869 name=name, 870 value=value, 871 comment=comment, 872 id=id, 873 ) 874 else: 875 raise ValueError("No trace found in the current context") 876 877 except Exception as e: 878 self._log.error(f"Failed to score observation: {e}")
Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
880 @catch_and_log_errors 881 def flush(self): 882 """Force immediate flush of all buffered observations to the Langfuse backend. 883 884 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 885 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 886 887 Usage: 888 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 889 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 890 891 Returns: 892 None 893 894 Raises: 895 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 896 897 Note: 898 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 899 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 900 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 901 """ 902 langfuse = self._get_langfuse() 903 if langfuse: 904 langfuse.flush() 905 else: 906 self._log.warn("No langfuse object found in the current context")
Force immediate flush of all buffered observations to the Langfuse backend.
This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
Usage:
- This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
- It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:
None
Raises:
- ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
- The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
- In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to
flush
can be beneficial in certain edge cases or for debugging purposes.
908 def configure( 909 self, 910 *, 911 public_key: Optional[str] = None, 912 secret_key: Optional[str] = None, 913 host: Optional[str] = None, 914 release: Optional[str] = None, 915 debug: Optional[bool] = None, 916 threads: Optional[int] = None, 917 flush_at: Optional[int] = None, 918 flush_interval: Optional[int] = None, 919 max_retries: Optional[int] = None, 920 timeout: Optional[int] = None, 921 httpx_client: Optional[httpx.Client] = None, 922 enabled: Optional[bool] = None, 923 ): 924 """Configure the Langfuse client. 925 926 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 927 928 Args: 929 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 930 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 931 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 932 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 933 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 934 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 935 flush_at: Max batch size that's sent to the API. 936 flush_interval: Max delay until a new batch is sent to the API. 937 max_retries: Max number of retries in case of API/network errors. 938 timeout: Timeout of API requests in seconds. Default is 20 seconds. 939 httpx_client: Pass your own httpx client for more customizability of requests. 940 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 941 """ 942 langfuse_singleton = LangfuseSingleton() 943 langfuse_singleton.reset() 944 945 langfuse_singleton.get( 946 public_key=public_key, 947 secret_key=secret_key, 948 host=host, 949 release=release, 950 debug=debug, 951 threads=threads, 952 flush_at=flush_at, 953 flush_interval=flush_interval, 954 max_retries=max_retries, 955 timeout=timeout, 956 httpx_client=httpx_client, 957 enabled=enabled, 958 )
Configure the Langfuse client.
If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
Arguments:
- public_key: Public API key of Langfuse project. Can be set via
LANGFUSE_PUBLIC_KEY
environment variable. - secret_key: Secret API key of Langfuse project. Can be set via
LANGFUSE_SECRET_KEY
environment variable. - host: Host of Langfuse API. Can be set via
LANGFUSE_HOST
environment variable. Defaults tohttps://cloud.langfuse.com
. - release: Release number/hash of the application to provide analytics grouped by release. Can be set via
LANGFUSE_RELEASE
environment variable. - debug: Enables debug mode for more verbose logging. Can be set via
LANGFUSE_DEBUG
environment variable. - threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
- flush_at: Max batch size that's sent to the API.
- flush_interval: Max delay until a new batch is sent to the API.
- max_retries: Max number of retries in case of API/network errors.
- timeout: Timeout of API requests in seconds. Default is 20 seconds.
- httpx_client: Pass your own httpx client for more customizability of requests.
- enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
963 def auth_check(self) -> bool: 964 """Check if the current Langfuse client is authenticated. 965 966 Returns: 967 bool: True if the client is authenticated, False otherwise 968 """ 969 try: 970 langfuse = self._get_langfuse() 971 972 return langfuse.auth_check() 973 except Exception as e: 974 self._log.error("No Langfuse object found in the current context", e) 975 976 return False
Check if the current Langfuse client is authenticated.
Returns:
bool: True if the client is authenticated, False otherwise