langfuse.decorators
Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe()
decorator.
Simple example (decorator + openai integration)
from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration
@observe()
def story():
return openai.chat.completions.create(
model="gpt-3.5-turbo",
max_tokens=100,
messages=[
{"role": "system", "content": "You are a great storyteller."},
{"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
],
).choices[0].message.content
@observe()
def main():
return story()
main()
See docs for more information.
1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator. 2 3*Simple example (decorator + openai integration)* 4 5```python 6from langfuse.decorators import observe 7from langfuse.openai import openai # OpenAI integration 8 9@observe() 10def story(): 11 return openai.chat.completions.create( 12 model="gpt-3.5-turbo", 13 max_tokens=100, 14 messages=[ 15 {"role": "system", "content": "You are a great storyteller."}, 16 {"role": "user", "content": "Once upon a time in a galaxy far, far away..."} 17 ], 18 ).choices[0].message.content 19 20@observe() 21def main(): 22 return story() 23 24main() 25``` 26 27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information. 28""" 29 30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator 31 32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
96class LangfuseDecorator: 97 _log = logging.getLogger("langfuse") 98 99 # Type overload for observe decorator with no arguments 100 @overload 101 def observe(self, func: F) -> F: ... 102 103 # Type overload for observe decorator with arguments 104 @overload 105 def observe( 106 self, 107 func: None = None, 108 *, 109 name: Optional[str] = None, 110 as_type: Optional[Literal["generation"]] = None, 111 capture_input: bool = True, 112 capture_output: bool = True, 113 transform_to_string: Optional[Callable[[Iterable], str]] = None, 114 ) -> Callable[[Callable[P, R]], Callable[P, R]]: ... 115 116 # Implementation of observe decorator 117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func) 194 195 def _async_observe( 196 self, 197 func: F, 198 *, 199 name: Optional[str], 200 as_type: Optional[Literal["generation"]], 201 capture_input: bool, 202 capture_output: bool, 203 transform_to_string: Optional[Callable[[Iterable], str]] = None, 204 ) -> F: 205 @wraps(func) 206 async def async_wrapper(*args, **kwargs): 207 observation = self._prepare_call( 208 name=name or func.__name__, 209 as_type=as_type, 210 capture_input=capture_input, 211 is_method=self._is_method(func), 212 func_args=args, 213 func_kwargs=kwargs, 214 ) 215 result = None 216 217 try: 218 result = await func(*args, **kwargs) 219 except Exception as e: 220 self._handle_exception(observation, e) 221 finally: 222 result = self._finalize_call( 223 observation, result, capture_output, transform_to_string 224 ) 225 226 # Returning from finally block may swallow errors, so only return if result is not None 227 if result is not None: 228 return result 229 230 return cast(F, async_wrapper) 231 232 def _sync_observe( 233 self, 234 func: F, 235 *, 236 name: Optional[str], 237 as_type: Optional[Literal["generation"]], 238 capture_input: bool, 239 capture_output: bool, 240 transform_to_string: Optional[Callable[[Iterable], str]] = None, 241 ) -> F: 242 @wraps(func) 243 def sync_wrapper(*args, **kwargs): 244 observation = self._prepare_call( 245 name=name or func.__name__, 246 as_type=as_type, 247 capture_input=capture_input, 248 is_method=self._is_method(func), 249 func_args=args, 250 func_kwargs=kwargs, 251 ) 252 result = None 253 254 try: 255 result = func(*args, **kwargs) 256 except Exception as e: 257 self._handle_exception(observation, e) 258 finally: 259 result = self._finalize_call( 260 observation, result, capture_output, transform_to_string 261 ) 262 263 # Returning from finally block may swallow errors, so only return if result is not None 264 if result is not None: 265 return result 266 267 return cast(F, sync_wrapper) 268 269 @staticmethod 270 def _is_method(func: Callable) -> bool: 271 """Check if a callable is likely an class or instance method based on its signature. 272 273 This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method. 274 275 Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail. 276 277 Returns: 278 bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise. 279 """ 280 return ( 281 "self" in inspect.signature(func).parameters 282 or "cls" in inspect.signature(func).parameters 283 ) 284 285 def _prepare_call( 286 self, 287 *, 288 name: str, 289 as_type: Optional[Literal["generation"]], 290 capture_input: bool, 291 is_method: bool = False, 292 func_args: Tuple = (), 293 func_kwargs: Dict = {}, 294 ) -> Optional[ 295 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 296 ]: 297 try: 298 stack = _observation_stack_context.get().copy() 299 parent = stack[-1] if stack else None 300 301 # Collect default observation data 302 observation_id = func_kwargs.pop("langfuse_observation_id", None) 303 provided_parent_trace_id = func_kwargs.pop("langfuse_parent_trace_id", None) 304 provided_parent_observation_id = func_kwargs.pop( 305 "langfuse_parent_observation_id", None 306 ) 307 308 id = str(observation_id) if observation_id else None 309 start_time = _get_timestamp() 310 311 input = ( 312 self._get_input_from_func_args( 313 is_method=is_method, 314 func_args=func_args, 315 func_kwargs=func_kwargs, 316 ) 317 if capture_input 318 else None 319 ) 320 321 params = { 322 "id": id, 323 "name": name, 324 "start_time": start_time, 325 "input": input, 326 } 327 328 # Handle user-providedparent trace ID and observation ID 329 if parent and (provided_parent_trace_id or provided_parent_observation_id): 330 self._log.warning( 331 "Ignoring langfuse_parent_trace_id and/or langfuse_parent_observation_id as they can be only set in the top-level decorated function." 332 ) 333 334 elif provided_parent_observation_id and not provided_parent_trace_id: 335 self._log.warning( 336 "Ignoring langfuse_parent_observation_id as langfuse_parent_trace_id is not set." 337 ) 338 339 elif provided_parent_observation_id and ( 340 provided_parent_observation_id != provided_parent_trace_id 341 ): 342 parent = StatefulSpanClient( 343 id=provided_parent_observation_id, 344 trace_id=provided_parent_trace_id, 345 task_manager=self.client_instance.task_manager, 346 client=self.client_instance.client, 347 state_type=StateType.OBSERVATION, 348 environment=self.client_instance.environment, 349 ) 350 self._set_root_trace_id(provided_parent_trace_id) 351 352 elif provided_parent_trace_id: 353 parent = StatefulTraceClient( 354 id=provided_parent_trace_id, 355 trace_id=provided_parent_trace_id, 356 task_manager=self.client_instance.task_manager, 357 client=self.client_instance.client, 358 state_type=StateType.TRACE, 359 environment=self.client_instance.environment, 360 ) 361 self._set_root_trace_id(provided_parent_trace_id) 362 363 # Create observation 364 if parent and as_type == "generation": 365 observation = parent.generation(**params) 366 elif as_type == "generation": 367 # Create wrapper trace if generation is top-level 368 # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again 369 trace = self.client_instance.trace( 370 id=_root_trace_id_context.get() or id, 371 name=name, 372 start_time=start_time, 373 ) 374 self._set_root_trace_id(trace.id) 375 376 observation = self.client_instance.generation( 377 name=name, start_time=start_time, input=input, trace_id=trace.id 378 ) 379 elif parent: 380 observation = parent.span(**params) 381 else: 382 params["id"] = _root_trace_id_context.get() or params["id"] 383 observation = self.client_instance.trace(**params) 384 385 _observation_stack_context.set(stack + [observation]) 386 387 return observation 388 except Exception as e: 389 self._log.error(f"Failed to prepare observation: {e}") 390 391 def _get_input_from_func_args( 392 self, 393 *, 394 is_method: bool = False, 395 func_args: Tuple = (), 396 func_kwargs: Dict = {}, 397 ) -> Any: 398 # Remove implicitly passed "self" or "cls" argument for instance or class methods 399 logged_args = func_args[1:] if is_method else func_args 400 raw_input = { 401 "args": logged_args, 402 "kwargs": func_kwargs, 403 } 404 405 # Serialize and deserialize to ensure proper JSON serialization. 406 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 407 return json.loads(json.dumps(raw_input, cls=EventSerializer)) 408 409 def _finalize_call( 410 self, 411 observation: Optional[ 412 Union[ 413 StatefulSpanClient, 414 StatefulTraceClient, 415 StatefulGenerationClient, 416 ] 417 ], 418 result: Any, 419 capture_output: bool, 420 transform_to_string: Optional[Callable[[Iterable], str]] = None, 421 ): 422 if inspect.isgenerator(result): 423 return self._wrap_sync_generator_result( 424 observation, result, capture_output, transform_to_string 425 ) 426 elif inspect.isasyncgen(result): 427 return self._wrap_async_generator_result( 428 observation, result, capture_output, transform_to_string 429 ) 430 431 else: 432 return self._handle_call_result(observation, result, capture_output) 433 434 def _handle_call_result( 435 self, 436 observation: Optional[ 437 Union[ 438 StatefulSpanClient, 439 StatefulTraceClient, 440 StatefulGenerationClient, 441 ] 442 ], 443 result: Any, 444 capture_output: bool, 445 ): 446 try: 447 if observation is None: 448 raise ValueError("No observation found in the current context") 449 450 # Collect final observation data 451 observation_params = self._pop_observation_params_from_context( 452 observation.id 453 ) 454 455 end_time = observation_params["end_time"] or _get_timestamp() 456 457 output = observation_params["output"] or ( 458 # Serialize and deserialize to ensure proper JSON serialization. 459 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 460 json.loads( 461 json.dumps( 462 result if result is not None and capture_output else None, 463 cls=EventSerializer, 464 ) 465 ) 466 ) 467 468 observation_params.update(end_time=end_time, output=output) 469 470 if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)): 471 observation.end(**observation_params) 472 elif isinstance(observation, StatefulTraceClient): 473 observation.update(**observation_params) 474 475 # Remove observation from top of stack 476 stack = _observation_stack_context.get() 477 _observation_stack_context.set(stack[:-1]) 478 479 # Update trace that was provided directly and not part of the observation stack 480 if not _observation_stack_context.get() and ( 481 provided_trace_id := _root_trace_id_context.get() 482 ): 483 observation_params = self._pop_observation_params_from_context( 484 provided_trace_id 485 ) 486 487 has_updates = any(observation_params.values()) 488 489 if has_updates: 490 trace_client = StatefulTraceClient( 491 id=provided_trace_id, 492 trace_id=provided_trace_id, 493 task_manager=self.client_instance.task_manager, 494 client=self.client_instance.client, 495 state_type=StateType.TRACE, 496 environment=self.client_instance.environment, 497 ) 498 trace_client.update(**observation_params) 499 500 except Exception as e: 501 self._log.error(f"Failed to finalize observation: {e}") 502 503 finally: 504 # Clear the context trace ID to avoid leaking to next execution 505 if not _observation_stack_context.get(): 506 _root_trace_id_context.set(None) 507 508 return result 509 510 def _handle_exception( 511 self, 512 observation: Optional[ 513 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 514 ], 515 e: Exception, 516 ): 517 if observation: 518 _observation_params_context.get()[observation.id].update( 519 level="ERROR", status_message=str(e) 520 ) 521 raise e 522 523 def _wrap_sync_generator_result( 524 self, 525 observation: Optional[ 526 Union[ 527 StatefulSpanClient, 528 StatefulTraceClient, 529 StatefulGenerationClient, 530 ] 531 ], 532 generator: Generator, 533 capture_output: bool, 534 transform_to_string: Optional[Callable[[Iterable], str]] = None, 535 ): 536 items = [] 537 538 try: 539 for item in generator: 540 items.append(item) 541 542 yield item 543 544 finally: 545 output = items 546 547 if transform_to_string is not None: 548 output = transform_to_string(items) 549 550 elif all(isinstance(item, str) for item in items): 551 output = "".join(items) 552 553 self._handle_call_result(observation, output, capture_output) 554 555 async def _wrap_async_generator_result( 556 self, 557 observation: Optional[ 558 Union[ 559 StatefulSpanClient, 560 StatefulTraceClient, 561 StatefulGenerationClient, 562 ] 563 ], 564 generator: AsyncGenerator, 565 capture_output: bool, 566 transform_to_string: Optional[Callable[[Iterable], str]] = None, 567 ) -> AsyncGenerator: 568 items = [] 569 570 try: 571 async for item in generator: 572 items.append(item) 573 574 yield item 575 576 finally: 577 output = items 578 579 if transform_to_string is not None: 580 output = transform_to_string(items) 581 582 elif all(isinstance(item, str) for item in items): 583 output = "".join(items) 584 585 self._handle_call_result(observation, output, capture_output) 586 587 def get_current_llama_index_handler(self): 588 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 589 590 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 591 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 592 593 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 594 595 Returns: 596 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 597 598 Note: 599 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 600 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 601 """ 602 try: 603 from langfuse.llama_index import LlamaIndexCallbackHandler 604 except ImportError: 605 self._log.error( 606 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 607 ) 608 609 return None 610 611 stack = _observation_stack_context.get() 612 observation = stack[-1] if stack else None 613 614 if observation is None: 615 self._log.warning("No observation found in the current context") 616 617 return None 618 619 if isinstance(observation, StatefulGenerationClient): 620 self._log.warning( 621 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 622 ) 623 624 return None 625 626 callback_handler = LlamaIndexCallbackHandler() 627 callback_handler.set_root(observation) 628 629 return callback_handler 630 631 def get_current_langchain_handler(self): 632 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 633 634 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 635 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 636 637 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 638 639 Returns: 640 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 641 642 Note: 643 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 644 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 645 """ 646 stack = _observation_stack_context.get() 647 observation = stack[-1] if stack else None 648 649 if observation is None: 650 self._log.warning("No observation found in the current context") 651 652 return None 653 654 if isinstance(observation, StatefulGenerationClient): 655 self._log.warning( 656 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 657 ) 658 659 return None 660 661 return observation.get_langchain_handler() 662 663 def get_current_trace_id(self): 664 """Retrieve the ID of the current trace from the observation stack context. 665 666 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 667 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 668 representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead. 669 670 Returns: 671 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 672 possibly due to the method being called outside of any @observe-decorated function execution. 673 674 Note: 675 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 676 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 677 """ 678 context_trace_id = _root_trace_id_context.get() 679 if context_trace_id: 680 return context_trace_id 681 682 stack = _observation_stack_context.get() 683 684 if not stack: 685 return None 686 687 return stack[0].id 688 689 def get_current_trace_url(self) -> Optional[str]: 690 """Retrieve the URL of the current trace in context. 691 692 Returns: 693 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 694 possibly due to the method being called outside of any @observe-decorated function execution. 695 696 Note: 697 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 698 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 699 """ 700 try: 701 trace_id = self.get_current_trace_id() 702 703 if not trace_id: 704 raise ValueError("No trace found in the current context") 705 706 project_id = self.client_instance._get_project_id() 707 708 if not project_id: 709 return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}" 710 711 return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}" 712 713 except Exception as e: 714 self._log.error(f"Failed to get current trace URL: {e}") 715 716 return None 717 718 def get_current_observation_id(self): 719 """Retrieve the ID of the current observation in context. 720 721 Returns: 722 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 723 possibly due to the method being called outside of any @observe-decorated function execution. 724 725 Note: 726 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 727 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 728 - If called at the top level of a trace, it will return the trace ID. 729 """ 730 stack = _observation_stack_context.get() 731 732 if not stack: 733 return None 734 735 return stack[-1].id 736 737 def update_current_trace( 738 self, 739 name: Optional[str] = None, 740 input: Optional[Any] = None, 741 output: Optional[Any] = None, 742 user_id: Optional[str] = None, 743 session_id: Optional[str] = None, 744 version: Optional[str] = None, 745 release: Optional[str] = None, 746 metadata: Optional[Any] = None, 747 tags: Optional[List[str]] = None, 748 public: Optional[bool] = None, 749 ): 750 """Set parameters for the current trace, updating the trace's metadata and context information. 751 752 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 753 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 754 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 755 756 Arguments: 757 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 758 input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call. 759 output (Optional[Any]): The output or result of the trace 760 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 761 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 762 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 763 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 764 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 765 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 766 767 Returns: 768 None 769 770 Note: 771 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 772 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 773 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 774 """ 775 trace_id = self.get_current_trace_id() 776 777 if trace_id is None: 778 self._log.warning("No trace found in the current context") 779 780 return 781 782 params_to_update = { 783 k: v 784 for k, v in { 785 "name": name, 786 "input": input, 787 "output": output, 788 "user_id": user_id, 789 "session_id": session_id, 790 "version": version, 791 "release": release, 792 "metadata": metadata, 793 "tags": tags, 794 "public": public, 795 }.items() 796 if v is not None 797 } 798 799 # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side 800 server_merged_attributes = ["metadata", "tags"] 801 if any(attribute in params_to_update for attribute in server_merged_attributes): 802 self.client_instance.trace( 803 id=trace_id, 804 **{ 805 k: v 806 for k, v in params_to_update.items() 807 if k in server_merged_attributes 808 }, 809 ) 810 811 _observation_params_context.get()[trace_id].update(params_to_update) 812 813 def update_current_observation( 814 self, 815 *, 816 input: Optional[Any] = None, 817 output: Optional[Any] = None, 818 name: Optional[str] = None, 819 version: Optional[str] = None, 820 metadata: Optional[Any] = None, 821 start_time: Optional[datetime] = None, 822 end_time: Optional[datetime] = None, 823 release: Optional[str] = None, 824 tags: Optional[List[str]] = None, 825 user_id: Optional[str] = None, 826 session_id: Optional[str] = None, 827 level: Optional[SpanLevel] = None, 828 status_message: Optional[str] = None, 829 completion_start_time: Optional[datetime] = None, 830 model: Optional[str] = None, 831 model_parameters: Optional[Dict[str, MapValue]] = None, 832 usage: Optional[Union[BaseModel, ModelUsage]] = None, 833 usage_details: Optional[UsageDetails] = None, 834 cost_details: Optional[Dict[str, float]] = None, 835 prompt: Optional[PromptClient] = None, 836 public: Optional[bool] = None, 837 ): 838 """Update parameters for the current observation within an active trace context. 839 840 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 841 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 842 enhancing the observability and traceability of the execution context. 843 844 Note that if a param is not available on a specific observation type, it will be ignored. 845 846 Shared params: 847 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 848 - `output` (Optional[Any]): The output or result of the trace or observation 849 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 850 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 851 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 852 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 853 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 854 855 Trace-specific params: 856 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 857 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 858 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 859 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 860 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 861 862 Span-specific params: 863 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 864 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 865 866 Generation-specific params: 867 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 868 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 869 - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 870 - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. 871 - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. 872 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 873 874 Returns: 875 None 876 877 Raises: 878 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 879 880 Note: 881 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 882 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 883 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 884 """ 885 stack = _observation_stack_context.get() 886 observation = stack[-1] if stack else None 887 888 if not observation: 889 self._log.warning("No observation found in the current context") 890 891 return 892 893 update_params = { 894 k: v 895 for k, v in { 896 "input": input, 897 "output": output, 898 "name": name, 899 "version": version, 900 "metadata": metadata, 901 "start_time": start_time, 902 "end_time": end_time, 903 "release": release, 904 "tags": tags, 905 "user_id": user_id, 906 "session_id": session_id, 907 "level": level, 908 "status_message": status_message, 909 "completion_start_time": completion_start_time, 910 "model": model, 911 "model_parameters": model_parameters, 912 "usage": usage, 913 "usage_details": usage_details, 914 "cost_details": cost_details, 915 "prompt": prompt, 916 "public": public, 917 }.items() 918 if v is not None 919 } 920 921 _observation_params_context.get()[observation.id].update(update_params) 922 923 def score_current_observation( 924 self, 925 *, 926 name: str, 927 value: Union[float, str], 928 data_type: Optional[ScoreDataType] = None, 929 comment: Optional[str] = None, 930 id: Optional[str] = None, 931 config_id: Optional[str] = None, 932 ): 933 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 934 935 Arguments: 936 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 937 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 938 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 939 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 940 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 941 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 942 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 943 944 Returns: 945 None 946 947 Note: 948 This method is intended to be used within the context of an active trace or observation. 949 """ 950 try: 951 trace_id = self.get_current_trace_id() 952 current_observation_id = self.get_current_observation_id() 953 954 observation_id = ( 955 current_observation_id if current_observation_id != trace_id else None 956 ) 957 958 if trace_id: 959 self.client_instance.score( 960 trace_id=trace_id, 961 observation_id=observation_id, 962 name=name, 963 value=value, 964 data_type=data_type, 965 comment=comment, 966 id=id, 967 config_id=config_id, 968 ) 969 else: 970 raise ValueError("No trace or observation found in the current context") 971 972 except Exception as e: 973 self._log.error(f"Failed to score observation: {e}") 974 975 def score_current_trace( 976 self, 977 *, 978 name: str, 979 value: Union[float, str], 980 data_type: Optional[ScoreDataType] = None, 981 comment: Optional[str] = None, 982 id: Optional[str] = None, 983 config_id: Optional[str] = None, 984 ): 985 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 986 987 Arguments: 988 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 989 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure. 990 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 991 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 992 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 993 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 994 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 995 996 Returns: 997 None 998 999 Note: 1000 This method is intended to be used within the context of an active trace or observation. 1001 """ 1002 try: 1003 trace_id = self.get_current_trace_id() 1004 1005 if trace_id: 1006 self.client_instance.score( 1007 trace_id=trace_id, 1008 name=name, 1009 value=value, 1010 data_type=data_type, 1011 comment=comment, 1012 id=id, 1013 config_id=config_id, 1014 ) 1015 else: 1016 raise ValueError("No trace found in the current context") 1017 1018 except Exception as e: 1019 self._log.error(f"Failed to score observation: {e}") 1020 1021 @catch_and_log_errors 1022 def flush(self): 1023 """Force immediate flush of all buffered observations to the Langfuse backend. 1024 1025 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 1026 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 1027 1028 Usage: 1029 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 1030 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 1031 1032 Returns: 1033 None 1034 1035 Raises: 1036 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 1037 1038 Note: 1039 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 1040 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 1041 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 1042 """ 1043 if self.client_instance: 1044 self.client_instance.flush() 1045 else: 1046 self._log.warning("No langfuse object found in the current context") 1047 1048 def configure( 1049 self, 1050 *, 1051 public_key: Optional[str] = None, 1052 secret_key: Optional[str] = None, 1053 host: Optional[str] = None, 1054 release: Optional[str] = None, 1055 debug: Optional[bool] = None, 1056 threads: Optional[int] = None, 1057 flush_at: Optional[int] = None, 1058 flush_interval: Optional[int] = None, 1059 max_retries: Optional[int] = None, 1060 timeout: Optional[int] = None, 1061 httpx_client: Optional[httpx.Client] = None, 1062 enabled: Optional[bool] = None, 1063 mask: Optional[Callable] = None, 1064 environment: Optional[str] = None, 1065 ): 1066 """Configure the Langfuse client. 1067 1068 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 1069 1070 Args: 1071 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 1072 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 1073 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 1074 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 1075 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 1076 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 1077 flush_at: Max batch size that's sent to the API. 1078 flush_interval: Max delay until a new batch is sent to the API. 1079 max_retries: Max number of retries in case of API/network errors. 1080 timeout: Timeout of API requests in seconds. Default is 20 seconds. 1081 httpx_client: Pass your own httpx client for more customizability of requests. 1082 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 1083 mask (Callable): Function that masks sensitive information from input and output in log messages. 1084 environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable. 1085 """ 1086 langfuse_singleton = LangfuseSingleton() 1087 langfuse_singleton.reset() 1088 1089 langfuse_singleton.get( 1090 public_key=public_key, 1091 secret_key=secret_key, 1092 host=host, 1093 release=release, 1094 debug=debug, 1095 threads=threads, 1096 flush_at=flush_at, 1097 flush_interval=flush_interval, 1098 max_retries=max_retries, 1099 timeout=timeout, 1100 httpx_client=httpx_client, 1101 enabled=enabled, 1102 mask=mask, 1103 environment=environment, 1104 ) 1105 1106 @property 1107 def client_instance(self) -> Langfuse: 1108 """Get the Langfuse client instance for the current decorator context.""" 1109 return LangfuseSingleton().get() 1110 1111 def _set_root_trace_id(self, trace_id: str): 1112 if _observation_stack_context.get(): 1113 self._log.warning( 1114 "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment." 1115 ) 1116 return 1117 1118 _root_trace_id_context.set(trace_id) 1119 1120 def _pop_observation_params_from_context( 1121 self, observation_id: str 1122 ) -> ObservationParams: 1123 params = _observation_params_context.get()[observation_id].copy() 1124 1125 # Remove observation params to avoid leaking 1126 del _observation_params_context.get()[observation_id] 1127 1128 return params 1129 1130 def auth_check(self) -> bool: 1131 """Check if the current Langfuse client is authenticated. 1132 1133 Returns: 1134 bool: True if the client is authenticated, False otherwise 1135 """ 1136 try: 1137 return self.client_instance.auth_check() 1138 except Exception as e: 1139 self._log.error( 1140 "No Langfuse object found in the current context", exc_info=e 1141 ) 1142 1143 return False
117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
587 def get_current_llama_index_handler(self): 588 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 589 590 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 591 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 592 593 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 594 595 Returns: 596 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 597 598 Note: 599 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 600 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 601 """ 602 try: 603 from langfuse.llama_index import LlamaIndexCallbackHandler 604 except ImportError: 605 self._log.error( 606 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 607 ) 608 609 return None 610 611 stack = _observation_stack_context.get() 612 observation = stack[-1] if stack else None 613 614 if observation is None: 615 self._log.warning("No observation found in the current context") 616 617 return None 618 619 if isinstance(observation, StatefulGenerationClient): 620 self._log.warning( 621 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 622 ) 623 624 return None 625 626 callback_handler = LlamaIndexCallbackHandler() 627 callback_handler.set_root(observation) 628 629 return callback_handler
Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
Returns:
LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
631 def get_current_langchain_handler(self): 632 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 633 634 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 635 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 636 637 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 638 639 Returns: 640 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 641 642 Note: 643 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 644 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 645 """ 646 stack = _observation_stack_context.get() 647 observation = stack[-1] if stack else None 648 649 if observation is None: 650 self._log.warning("No observation found in the current context") 651 652 return None 653 654 if isinstance(observation, StatefulGenerationClient): 655 self._log.warning( 656 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 657 ) 658 659 return None 660 661 return observation.get_langchain_handler()
Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
Returns:
LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
663 def get_current_trace_id(self): 664 """Retrieve the ID of the current trace from the observation stack context. 665 666 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 667 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 668 representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead. 669 670 Returns: 671 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 672 possibly due to the method being called outside of any @observe-decorated function execution. 673 674 Note: 675 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 676 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 677 """ 678 context_trace_id = _root_trace_id_context.get() 679 if context_trace_id: 680 return context_trace_id 681 682 stack = _observation_stack_context.get() 683 684 if not stack: 685 return None 686 687 return stack[0].id
Retrieve the ID of the current trace from the observation stack context.
This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
Returns:
str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
689 def get_current_trace_url(self) -> Optional[str]: 690 """Retrieve the URL of the current trace in context. 691 692 Returns: 693 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 694 possibly due to the method being called outside of any @observe-decorated function execution. 695 696 Note: 697 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 698 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 699 """ 700 try: 701 trace_id = self.get_current_trace_id() 702 703 if not trace_id: 704 raise ValueError("No trace found in the current context") 705 706 project_id = self.client_instance._get_project_id() 707 708 if not project_id: 709 return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}" 710 711 return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}" 712 713 except Exception as e: 714 self._log.error(f"Failed to get current trace URL: {e}") 715 716 return None
Retrieve the URL of the current trace in context.
Returns:
str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
718 def get_current_observation_id(self): 719 """Retrieve the ID of the current observation in context. 720 721 Returns: 722 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 723 possibly due to the method being called outside of any @observe-decorated function execution. 724 725 Note: 726 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 727 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 728 - If called at the top level of a trace, it will return the trace ID. 729 """ 730 stack = _observation_stack_context.get() 731 732 if not stack: 733 return None 734 735 return stack[-1].id
Retrieve the ID of the current observation in context.
Returns:
str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
- If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
- If called at the top level of a trace, it will return the trace ID.
737 def update_current_trace( 738 self, 739 name: Optional[str] = None, 740 input: Optional[Any] = None, 741 output: Optional[Any] = None, 742 user_id: Optional[str] = None, 743 session_id: Optional[str] = None, 744 version: Optional[str] = None, 745 release: Optional[str] = None, 746 metadata: Optional[Any] = None, 747 tags: Optional[List[str]] = None, 748 public: Optional[bool] = None, 749 ): 750 """Set parameters for the current trace, updating the trace's metadata and context information. 751 752 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 753 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 754 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 755 756 Arguments: 757 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 758 input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call. 759 output (Optional[Any]): The output or result of the trace 760 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 761 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 762 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 763 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 764 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 765 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 766 767 Returns: 768 None 769 770 Note: 771 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 772 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 773 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 774 """ 775 trace_id = self.get_current_trace_id() 776 777 if trace_id is None: 778 self._log.warning("No trace found in the current context") 779 780 return 781 782 params_to_update = { 783 k: v 784 for k, v in { 785 "name": name, 786 "input": input, 787 "output": output, 788 "user_id": user_id, 789 "session_id": session_id, 790 "version": version, 791 "release": release, 792 "metadata": metadata, 793 "tags": tags, 794 "public": public, 795 }.items() 796 if v is not None 797 } 798 799 # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side 800 server_merged_attributes = ["metadata", "tags"] 801 if any(attribute in params_to_update for attribute in server_merged_attributes): 802 self.client_instance.trace( 803 id=trace_id, 804 **{ 805 k: v 806 for k, v in params_to_update.items() 807 if k in server_merged_attributes 808 }, 809 ) 810 811 _observation_params_context.get()[trace_id].update(params_to_update)
Set parameters for the current trace, updating the trace's metadata and context information.
This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
Arguments:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
- input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
- output (Optional[Any]): The output or result of the trace
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:
None
Note:
- This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
- The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
- If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
813 def update_current_observation( 814 self, 815 *, 816 input: Optional[Any] = None, 817 output: Optional[Any] = None, 818 name: Optional[str] = None, 819 version: Optional[str] = None, 820 metadata: Optional[Any] = None, 821 start_time: Optional[datetime] = None, 822 end_time: Optional[datetime] = None, 823 release: Optional[str] = None, 824 tags: Optional[List[str]] = None, 825 user_id: Optional[str] = None, 826 session_id: Optional[str] = None, 827 level: Optional[SpanLevel] = None, 828 status_message: Optional[str] = None, 829 completion_start_time: Optional[datetime] = None, 830 model: Optional[str] = None, 831 model_parameters: Optional[Dict[str, MapValue]] = None, 832 usage: Optional[Union[BaseModel, ModelUsage]] = None, 833 usage_details: Optional[UsageDetails] = None, 834 cost_details: Optional[Dict[str, float]] = None, 835 prompt: Optional[PromptClient] = None, 836 public: Optional[bool] = None, 837 ): 838 """Update parameters for the current observation within an active trace context. 839 840 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 841 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 842 enhancing the observability and traceability of the execution context. 843 844 Note that if a param is not available on a specific observation type, it will be ignored. 845 846 Shared params: 847 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 848 - `output` (Optional[Any]): The output or result of the trace or observation 849 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 850 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 851 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 852 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 853 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 854 855 Trace-specific params: 856 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 857 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 858 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 859 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 860 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 861 862 Span-specific params: 863 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 864 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 865 866 Generation-specific params: 867 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 868 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 869 - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 870 - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. 871 - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. 872 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 873 874 Returns: 875 None 876 877 Raises: 878 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 879 880 Note: 881 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 882 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 883 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 884 """ 885 stack = _observation_stack_context.get() 886 observation = stack[-1] if stack else None 887 888 if not observation: 889 self._log.warning("No observation found in the current context") 890 891 return 892 893 update_params = { 894 k: v 895 for k, v in { 896 "input": input, 897 "output": output, 898 "name": name, 899 "version": version, 900 "metadata": metadata, 901 "start_time": start_time, 902 "end_time": end_time, 903 "release": release, 904 "tags": tags, 905 "user_id": user_id, 906 "session_id": session_id, 907 "level": level, 908 "status_message": status_message, 909 "completion_start_time": completion_start_time, 910 "model": model, 911 "model_parameters": model_parameters, 912 "usage": usage, 913 "usage_details": usage_details, 914 "cost_details": cost_details, 915 "prompt": prompt, 916 "public": public, 917 }.items() 918 if v is not None 919 } 920 921 _observation_params_context.get()[observation.id].update(update_params)
Update parameters for the current observation within an active trace context.
This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.
Note that if a param is not available on a specific observation type, it will be ignored.
Shared params:
input
(Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.output
(Optional[Any]): The output or result of the trace or observationname
(Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.metadata
(Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.start_time
(Optional[datetime]): The start time of the observation, allowing for custom time range specification.end_time
(Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.version
(Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
Trace-specific params:
- user_id
(Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id
(Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- release
(Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- tags
(Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public
(Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Span-specific params:
- level
(Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
- status_message
(Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
Generation-specific params:
- completion_start_time
(Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
- model_parameters
(Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
- usage
(Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use usage_details
and cost_details
instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
- usage_details
(Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
- cost_details
(Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
- prompt
(Optional[PromptClient]): The prompt object used for the generation.
Returns:
None
Raises:
- ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
- This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
- It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
- Parameters set to
None
will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
923 def score_current_observation( 924 self, 925 *, 926 name: str, 927 value: Union[float, str], 928 data_type: Optional[ScoreDataType] = None, 929 comment: Optional[str] = None, 930 id: Optional[str] = None, 931 config_id: Optional[str] = None, 932 ): 933 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 934 935 Arguments: 936 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 937 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 938 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 939 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 940 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 941 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 942 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 943 944 Returns: 945 None 946 947 Note: 948 This method is intended to be used within the context of an active trace or observation. 949 """ 950 try: 951 trace_id = self.get_current_trace_id() 952 current_observation_id = self.get_current_observation_id() 953 954 observation_id = ( 955 current_observation_id if current_observation_id != trace_id else None 956 ) 957 958 if trace_id: 959 self.client_instance.score( 960 trace_id=trace_id, 961 observation_id=observation_id, 962 name=name, 963 value=value, 964 data_type=data_type, 965 comment=comment, 966 id=id, 967 config_id=config_id, 968 ) 969 else: 970 raise ValueError("No trace or observation found in the current context") 971 972 except Exception as e: 973 self._log.error(f"Failed to score observation: {e}")
Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
- data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
- config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
975 def score_current_trace( 976 self, 977 *, 978 name: str, 979 value: Union[float, str], 980 data_type: Optional[ScoreDataType] = None, 981 comment: Optional[str] = None, 982 id: Optional[str] = None, 983 config_id: Optional[str] = None, 984 ): 985 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 986 987 Arguments: 988 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 989 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure. 990 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 991 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 992 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 993 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 994 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 995 996 Returns: 997 None 998 999 Note: 1000 This method is intended to be used within the context of an active trace or observation. 1001 """ 1002 try: 1003 trace_id = self.get_current_trace_id() 1004 1005 if trace_id: 1006 self.client_instance.score( 1007 trace_id=trace_id, 1008 name=name, 1009 value=value, 1010 data_type=data_type, 1011 comment=comment, 1012 id=id, 1013 config_id=config_id, 1014 ) 1015 else: 1016 raise ValueError("No trace found in the current context") 1017 1018 except Exception as e: 1019 self._log.error(f"Failed to score observation: {e}")
Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
- data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
- config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
1021 @catch_and_log_errors 1022 def flush(self): 1023 """Force immediate flush of all buffered observations to the Langfuse backend. 1024 1025 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 1026 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 1027 1028 Usage: 1029 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 1030 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 1031 1032 Returns: 1033 None 1034 1035 Raises: 1036 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 1037 1038 Note: 1039 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 1040 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 1041 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 1042 """ 1043 if self.client_instance: 1044 self.client_instance.flush() 1045 else: 1046 self._log.warning("No langfuse object found in the current context")
Force immediate flush of all buffered observations to the Langfuse backend.
This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
Usage:
- This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
- It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:
None
Raises:
- ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
- The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
- In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to
flush
can be beneficial in certain edge cases or for debugging purposes.
1048 def configure( 1049 self, 1050 *, 1051 public_key: Optional[str] = None, 1052 secret_key: Optional[str] = None, 1053 host: Optional[str] = None, 1054 release: Optional[str] = None, 1055 debug: Optional[bool] = None, 1056 threads: Optional[int] = None, 1057 flush_at: Optional[int] = None, 1058 flush_interval: Optional[int] = None, 1059 max_retries: Optional[int] = None, 1060 timeout: Optional[int] = None, 1061 httpx_client: Optional[httpx.Client] = None, 1062 enabled: Optional[bool] = None, 1063 mask: Optional[Callable] = None, 1064 environment: Optional[str] = None, 1065 ): 1066 """Configure the Langfuse client. 1067 1068 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 1069 1070 Args: 1071 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 1072 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 1073 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 1074 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 1075 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 1076 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 1077 flush_at: Max batch size that's sent to the API. 1078 flush_interval: Max delay until a new batch is sent to the API. 1079 max_retries: Max number of retries in case of API/network errors. 1080 timeout: Timeout of API requests in seconds. Default is 20 seconds. 1081 httpx_client: Pass your own httpx client for more customizability of requests. 1082 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 1083 mask (Callable): Function that masks sensitive information from input and output in log messages. 1084 environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable. 1085 """ 1086 langfuse_singleton = LangfuseSingleton() 1087 langfuse_singleton.reset() 1088 1089 langfuse_singleton.get( 1090 public_key=public_key, 1091 secret_key=secret_key, 1092 host=host, 1093 release=release, 1094 debug=debug, 1095 threads=threads, 1096 flush_at=flush_at, 1097 flush_interval=flush_interval, 1098 max_retries=max_retries, 1099 timeout=timeout, 1100 httpx_client=httpx_client, 1101 enabled=enabled, 1102 mask=mask, 1103 environment=environment, 1104 )
Configure the Langfuse client.
If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
Arguments:
- public_key: Public API key of Langfuse project. Can be set via
LANGFUSE_PUBLIC_KEY
environment variable. - secret_key: Secret API key of Langfuse project. Can be set via
LANGFUSE_SECRET_KEY
environment variable. - host: Host of Langfuse API. Can be set via
LANGFUSE_HOST
environment variable. Defaults tohttps://cloud.langfuse.com
. - release: Release number/hash of the application to provide analytics grouped by release. Can be set via
LANGFUSE_RELEASE
environment variable. - debug: Enables debug mode for more verbose logging. Can be set via
LANGFUSE_DEBUG
environment variable. - threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
- flush_at: Max batch size that's sent to the API.
- flush_interval: Max delay until a new batch is sent to the API.
- max_retries: Max number of retries in case of API/network errors.
- timeout: Timeout of API requests in seconds. Default is 20 seconds.
- httpx_client: Pass your own httpx client for more customizability of requests.
- enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
- mask (Callable): Function that masks sensitive information from input and output in log messages.
- environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via
LANGFUSE_TRACING_ENVIRONMENT
environment variable.
1106 @property 1107 def client_instance(self) -> Langfuse: 1108 """Get the Langfuse client instance for the current decorator context.""" 1109 return LangfuseSingleton().get()
Get the Langfuse client instance for the current decorator context.
1130 def auth_check(self) -> bool: 1131 """Check if the current Langfuse client is authenticated. 1132 1133 Returns: 1134 bool: True if the client is authenticated, False otherwise 1135 """ 1136 try: 1137 return self.client_instance.auth_check() 1138 except Exception as e: 1139 self._log.error( 1140 "No Langfuse object found in the current context", exc_info=e 1141 ) 1142 1143 return False
Check if the current Langfuse client is authenticated.
Returns:
bool: True if the client is authenticated, False otherwise