langfuse.decorators
Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe()
decorator.
Simple example (decorator + openai integration)
from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration
@observe()
def story():
return openai.chat.completions.create(
model="gpt-3.5-turbo",
max_tokens=100,
messages=[
{"role": "system", "content": "You are a great storyteller."},
{"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
],
).choices[0].message.content
@observe()
def main():
return story()
main()
See docs for more information.
1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator. 2 3*Simple example (decorator + openai integration)* 4 5```python 6from langfuse.decorators import observe 7from langfuse.openai import openai # OpenAI integration 8 9@observe() 10def story(): 11 return openai.chat.completions.create( 12 model="gpt-3.5-turbo", 13 max_tokens=100, 14 messages=[ 15 {"role": "system", "content": "You are a great storyteller."}, 16 {"role": "user", "content": "Once upon a time in a galaxy far, far away..."} 17 ], 18 ).choices[0].message.content 19 20@observe() 21def main(): 22 return story() 23 24main() 25``` 26 27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information. 28""" 29 30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator 31 32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
96class LangfuseDecorator: 97 _log = logging.getLogger("langfuse") 98 99 # Type overload for observe decorator with no arguments 100 @overload 101 def observe(self, func: F) -> F: ... 102 103 # Type overload for observe decorator with arguments 104 @overload 105 def observe( 106 self, 107 func: None = None, 108 *, 109 name: Optional[str] = None, 110 as_type: Optional[Literal["generation"]] = None, 111 capture_input: bool = True, 112 capture_output: bool = True, 113 transform_to_string: Optional[Callable[[Iterable], str]] = None, 114 ) -> Callable[[Callable[P, R]], Callable[P, R]]: ... 115 116 # Implementation of observe decorator 117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func) 194 195 def _async_observe( 196 self, 197 func: F, 198 *, 199 name: Optional[str], 200 as_type: Optional[Literal["generation"]], 201 capture_input: bool, 202 capture_output: bool, 203 transform_to_string: Optional[Callable[[Iterable], str]] = None, 204 ) -> F: 205 @wraps(func) 206 async def async_wrapper(*args, **kwargs): 207 observation = self._prepare_call( 208 name=name or func.__name__, 209 as_type=as_type, 210 capture_input=capture_input, 211 is_method=self._is_method(func), 212 func_args=args, 213 func_kwargs=kwargs, 214 ) 215 result = None 216 217 try: 218 result = await func(*args, **kwargs) 219 except Exception as e: 220 self._handle_exception(observation, e) 221 finally: 222 result = self._finalize_call( 223 observation, result, capture_output, transform_to_string 224 ) 225 226 # Returning from finally block may swallow errors, so only return if result is not None 227 if result is not None: 228 return result 229 230 return cast(F, async_wrapper) 231 232 def _sync_observe( 233 self, 234 func: F, 235 *, 236 name: Optional[str], 237 as_type: Optional[Literal["generation"]], 238 capture_input: bool, 239 capture_output: bool, 240 transform_to_string: Optional[Callable[[Iterable], str]] = None, 241 ) -> F: 242 @wraps(func) 243 def sync_wrapper(*args, **kwargs): 244 observation = self._prepare_call( 245 name=name or func.__name__, 246 as_type=as_type, 247 capture_input=capture_input, 248 is_method=self._is_method(func), 249 func_args=args, 250 func_kwargs=kwargs, 251 ) 252 result = None 253 254 try: 255 result = func(*args, **kwargs) 256 except Exception as e: 257 self._handle_exception(observation, e) 258 finally: 259 result = self._finalize_call( 260 observation, result, capture_output, transform_to_string 261 ) 262 263 # Returning from finally block may swallow errors, so only return if result is not None 264 if result is not None: 265 return result 266 267 return cast(F, sync_wrapper) 268 269 @staticmethod 270 def _is_method(func: Callable) -> bool: 271 """Check if a callable is likely an class or instance method based on its signature. 272 273 This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method. 274 275 Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail. 276 277 Returns: 278 bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise. 279 """ 280 return ( 281 "self" in inspect.signature(func).parameters 282 or "cls" in inspect.signature(func).parameters 283 ) 284 285 def _prepare_call( 286 self, 287 *, 288 name: str, 289 as_type: Optional[Literal["generation"]], 290 capture_input: bool, 291 is_method: bool = False, 292 func_args: Tuple = (), 293 func_kwargs: Dict = {}, 294 ) -> Optional[ 295 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 296 ]: 297 try: 298 stack = _observation_stack_context.get().copy() 299 parent = stack[-1] if stack else None 300 301 # Collect default observation data 302 observation_id = func_kwargs.pop("langfuse_observation_id", None) 303 provided_parent_trace_id = func_kwargs.pop("langfuse_parent_trace_id", None) 304 provided_parent_observation_id = func_kwargs.pop( 305 "langfuse_parent_observation_id", None 306 ) 307 308 id = str(observation_id) if observation_id else None 309 start_time = _get_timestamp() 310 311 input = ( 312 self._get_input_from_func_args( 313 is_method=is_method, 314 func_args=func_args, 315 func_kwargs=func_kwargs, 316 ) 317 if capture_input 318 else None 319 ) 320 321 params = { 322 "id": id, 323 "name": name, 324 "start_time": start_time, 325 "input": input, 326 } 327 328 # Handle user-providedparent trace ID and observation ID 329 if parent and (provided_parent_trace_id or provided_parent_observation_id): 330 self._log.warning( 331 "Ignoring langfuse_parent_trace_id and/or langfuse_parent_observation_id as they can be only set in the top-level decorated function." 332 ) 333 334 elif provided_parent_observation_id and not provided_parent_trace_id: 335 self._log.warning( 336 "Ignoring langfuse_parent_observation_id as langfuse_parent_trace_id is not set." 337 ) 338 339 elif provided_parent_observation_id and ( 340 provided_parent_observation_id != provided_parent_trace_id 341 ): 342 parent = StatefulSpanClient( 343 id=provided_parent_observation_id, 344 trace_id=provided_parent_trace_id, 345 task_manager=self.client_instance.task_manager, 346 client=self.client_instance.client, 347 state_type=StateType.OBSERVATION, 348 ) 349 self._set_root_trace_id(provided_parent_trace_id) 350 351 elif provided_parent_trace_id: 352 parent = StatefulTraceClient( 353 id=provided_parent_trace_id, 354 trace_id=provided_parent_trace_id, 355 task_manager=self.client_instance.task_manager, 356 client=self.client_instance.client, 357 state_type=StateType.TRACE, 358 ) 359 self._set_root_trace_id(provided_parent_trace_id) 360 361 # Create observation 362 if parent and as_type == "generation": 363 observation = parent.generation(**params) 364 elif as_type == "generation": 365 # Create wrapper trace if generation is top-level 366 # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again 367 trace = self.client_instance.trace( 368 id=_root_trace_id_context.get() or id, 369 name=name, 370 start_time=start_time, 371 ) 372 self._set_root_trace_id(trace.id) 373 374 observation = self.client_instance.generation( 375 name=name, start_time=start_time, input=input, trace_id=trace.id 376 ) 377 elif parent: 378 observation = parent.span(**params) 379 else: 380 params["id"] = _root_trace_id_context.get() or params["id"] 381 observation = self.client_instance.trace(**params) 382 383 _observation_stack_context.set(stack + [observation]) 384 385 return observation 386 except Exception as e: 387 self._log.error(f"Failed to prepare observation: {e}") 388 389 def _get_input_from_func_args( 390 self, 391 *, 392 is_method: bool = False, 393 func_args: Tuple = (), 394 func_kwargs: Dict = {}, 395 ) -> Any: 396 # Remove implicitly passed "self" or "cls" argument for instance or class methods 397 logged_args = func_args[1:] if is_method else func_args 398 raw_input = { 399 "args": logged_args, 400 "kwargs": func_kwargs, 401 } 402 403 # Serialize and deserialize to ensure proper JSON serialization. 404 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 405 return json.loads(json.dumps(raw_input, cls=EventSerializer)) 406 407 def _finalize_call( 408 self, 409 observation: Optional[ 410 Union[ 411 StatefulSpanClient, 412 StatefulTraceClient, 413 StatefulGenerationClient, 414 ] 415 ], 416 result: Any, 417 capture_output: bool, 418 transform_to_string: Optional[Callable[[Iterable], str]] = None, 419 ): 420 if inspect.isgenerator(result): 421 return self._wrap_sync_generator_result( 422 observation, result, capture_output, transform_to_string 423 ) 424 elif inspect.isasyncgen(result): 425 return self._wrap_async_generator_result( 426 observation, result, capture_output, transform_to_string 427 ) 428 429 else: 430 return self._handle_call_result(observation, result, capture_output) 431 432 def _handle_call_result( 433 self, 434 observation: Optional[ 435 Union[ 436 StatefulSpanClient, 437 StatefulTraceClient, 438 StatefulGenerationClient, 439 ] 440 ], 441 result: Any, 442 capture_output: bool, 443 ): 444 try: 445 if observation is None: 446 raise ValueError("No observation found in the current context") 447 448 # Collect final observation data 449 observation_params = self._pop_observation_params_from_context( 450 observation.id 451 ) 452 453 end_time = observation_params["end_time"] or _get_timestamp() 454 455 output = observation_params["output"] or ( 456 # Serialize and deserialize to ensure proper JSON serialization. 457 # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes. 458 json.loads( 459 json.dumps( 460 result if result is not None and capture_output else None, 461 cls=EventSerializer, 462 ) 463 ) 464 ) 465 466 observation_params.update(end_time=end_time, output=output) 467 468 if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)): 469 observation.end(**observation_params) 470 elif isinstance(observation, StatefulTraceClient): 471 observation.update(**observation_params) 472 473 # Remove observation from top of stack 474 stack = _observation_stack_context.get() 475 _observation_stack_context.set(stack[:-1]) 476 477 # Update trace that was provided directly and not part of the observation stack 478 if not _observation_stack_context.get() and ( 479 provided_trace_id := _root_trace_id_context.get() 480 ): 481 observation_params = self._pop_observation_params_from_context( 482 provided_trace_id 483 ) 484 485 has_updates = any(observation_params.values()) 486 487 if has_updates: 488 trace_client = StatefulTraceClient( 489 id=provided_trace_id, 490 trace_id=provided_trace_id, 491 task_manager=self.client_instance.task_manager, 492 client=self.client_instance.client, 493 state_type=StateType.TRACE, 494 ) 495 trace_client.update(**observation_params) 496 497 except Exception as e: 498 self._log.error(f"Failed to finalize observation: {e}") 499 500 finally: 501 # Clear the context trace ID to avoid leaking to next execution 502 if not _observation_stack_context.get(): 503 _root_trace_id_context.set(None) 504 505 return result 506 507 def _handle_exception( 508 self, 509 observation: Optional[ 510 Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient] 511 ], 512 e: Exception, 513 ): 514 if observation: 515 _observation_params_context.get()[observation.id].update( 516 level="ERROR", status_message=str(e) 517 ) 518 raise e 519 520 def _wrap_sync_generator_result( 521 self, 522 observation: Optional[ 523 Union[ 524 StatefulSpanClient, 525 StatefulTraceClient, 526 StatefulGenerationClient, 527 ] 528 ], 529 generator: Generator, 530 capture_output: bool, 531 transform_to_string: Optional[Callable[[Iterable], str]] = None, 532 ): 533 items = [] 534 535 try: 536 for item in generator: 537 items.append(item) 538 539 yield item 540 541 finally: 542 output = items 543 544 if transform_to_string is not None: 545 output = transform_to_string(items) 546 547 elif all(isinstance(item, str) for item in items): 548 output = "".join(items) 549 550 self._handle_call_result(observation, output, capture_output) 551 552 async def _wrap_async_generator_result( 553 self, 554 observation: Optional[ 555 Union[ 556 StatefulSpanClient, 557 StatefulTraceClient, 558 StatefulGenerationClient, 559 ] 560 ], 561 generator: AsyncGenerator, 562 capture_output: bool, 563 transform_to_string: Optional[Callable[[Iterable], str]] = None, 564 ) -> AsyncGenerator: 565 items = [] 566 567 try: 568 async for item in generator: 569 items.append(item) 570 571 yield item 572 573 finally: 574 output = items 575 576 if transform_to_string is not None: 577 output = transform_to_string(items) 578 579 elif all(isinstance(item, str) for item in items): 580 output = "".join(items) 581 582 self._handle_call_result(observation, output, capture_output) 583 584 def get_current_llama_index_handler(self): 585 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 586 587 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 588 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 589 590 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 591 592 Returns: 593 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 594 595 Note: 596 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 597 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 598 """ 599 try: 600 from langfuse.llama_index import LlamaIndexCallbackHandler 601 except ImportError: 602 self._log.error( 603 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 604 ) 605 606 return None 607 608 stack = _observation_stack_context.get() 609 observation = stack[-1] if stack else None 610 611 if observation is None: 612 self._log.warning("No observation found in the current context") 613 614 return None 615 616 if isinstance(observation, StatefulGenerationClient): 617 self._log.warning( 618 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 619 ) 620 621 return None 622 623 callback_handler = LlamaIndexCallbackHandler() 624 callback_handler.set_root(observation) 625 626 return callback_handler 627 628 def get_current_langchain_handler(self): 629 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 630 631 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 632 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 633 634 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 635 636 Returns: 637 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 638 639 Note: 640 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 641 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 642 """ 643 stack = _observation_stack_context.get() 644 observation = stack[-1] if stack else None 645 646 if observation is None: 647 self._log.warning("No observation found in the current context") 648 649 return None 650 651 if isinstance(observation, StatefulGenerationClient): 652 self._log.warning( 653 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 654 ) 655 656 return None 657 658 return observation.get_langchain_handler() 659 660 def get_current_trace_id(self): 661 """Retrieve the ID of the current trace from the observation stack context. 662 663 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 664 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 665 representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead. 666 667 Returns: 668 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 669 possibly due to the method being called outside of any @observe-decorated function execution. 670 671 Note: 672 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 673 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 674 """ 675 context_trace_id = _root_trace_id_context.get() 676 if context_trace_id: 677 return context_trace_id 678 679 stack = _observation_stack_context.get() 680 681 if not stack: 682 return None 683 684 return stack[0].id 685 686 def get_current_trace_url(self) -> Optional[str]: 687 """Retrieve the URL of the current trace in context. 688 689 Returns: 690 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 691 possibly due to the method being called outside of any @observe-decorated function execution. 692 693 Note: 694 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 695 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 696 """ 697 try: 698 trace_id = self.get_current_trace_id() 699 700 if not trace_id: 701 raise ValueError("No trace found in the current context") 702 703 project_id = self.client_instance._get_project_id() 704 705 if not project_id: 706 return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}" 707 708 return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}" 709 710 except Exception as e: 711 self._log.error(f"Failed to get current trace URL: {e}") 712 713 return None 714 715 def get_current_observation_id(self): 716 """Retrieve the ID of the current observation in context. 717 718 Returns: 719 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 720 possibly due to the method being called outside of any @observe-decorated function execution. 721 722 Note: 723 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 724 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 725 - If called at the top level of a trace, it will return the trace ID. 726 """ 727 stack = _observation_stack_context.get() 728 729 if not stack: 730 return None 731 732 return stack[-1].id 733 734 def update_current_trace( 735 self, 736 name: Optional[str] = None, 737 input: Optional[Any] = None, 738 output: Optional[Any] = None, 739 user_id: Optional[str] = None, 740 session_id: Optional[str] = None, 741 version: Optional[str] = None, 742 release: Optional[str] = None, 743 metadata: Optional[Any] = None, 744 tags: Optional[List[str]] = None, 745 public: Optional[bool] = None, 746 ): 747 """Set parameters for the current trace, updating the trace's metadata and context information. 748 749 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 750 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 751 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 752 753 Arguments: 754 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 755 input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call. 756 output (Optional[Any]): The output or result of the trace 757 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 758 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 759 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 760 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 761 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 762 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 763 764 Returns: 765 None 766 767 Note: 768 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 769 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 770 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 771 """ 772 trace_id = self.get_current_trace_id() 773 774 if trace_id is None: 775 self._log.warning("No trace found in the current context") 776 777 return 778 779 params_to_update = { 780 k: v 781 for k, v in { 782 "name": name, 783 "input": input, 784 "output": output, 785 "user_id": user_id, 786 "session_id": session_id, 787 "version": version, 788 "release": release, 789 "metadata": metadata, 790 "tags": tags, 791 "public": public, 792 }.items() 793 if v is not None 794 } 795 796 # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side 797 server_merged_attributes = ["metadata", "tags"] 798 if any(attribute in params_to_update for attribute in server_merged_attributes): 799 self.client_instance.trace( 800 id=trace_id, 801 **{ 802 k: v 803 for k, v in params_to_update.items() 804 if k in server_merged_attributes 805 }, 806 ) 807 808 _observation_params_context.get()[trace_id].update(params_to_update) 809 810 def update_current_observation( 811 self, 812 *, 813 input: Optional[Any] = None, 814 output: Optional[Any] = None, 815 name: Optional[str] = None, 816 version: Optional[str] = None, 817 metadata: Optional[Any] = None, 818 start_time: Optional[datetime] = None, 819 end_time: Optional[datetime] = None, 820 release: Optional[str] = None, 821 tags: Optional[List[str]] = None, 822 user_id: Optional[str] = None, 823 session_id: Optional[str] = None, 824 level: Optional[SpanLevel] = None, 825 status_message: Optional[str] = None, 826 completion_start_time: Optional[datetime] = None, 827 model: Optional[str] = None, 828 model_parameters: Optional[Dict[str, MapValue]] = None, 829 usage: Optional[Union[BaseModel, ModelUsage]] = None, 830 usage_details: Optional[UsageDetails] = None, 831 cost_details: Optional[Dict[str, float]] = None, 832 prompt: Optional[PromptClient] = None, 833 public: Optional[bool] = None, 834 ): 835 """Update parameters for the current observation within an active trace context. 836 837 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 838 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 839 enhancing the observability and traceability of the execution context. 840 841 Note that if a param is not available on a specific observation type, it will be ignored. 842 843 Shared params: 844 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 845 - `output` (Optional[Any]): The output or result of the trace or observation 846 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 847 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 848 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 849 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 850 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 851 852 Trace-specific params: 853 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 854 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 855 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 856 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 857 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 858 859 Span-specific params: 860 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 861 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 862 863 Generation-specific params: 864 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 865 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 866 - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 867 - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. 868 - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. 869 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 870 871 Returns: 872 None 873 874 Raises: 875 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 876 877 Note: 878 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 879 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 880 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 881 """ 882 stack = _observation_stack_context.get() 883 observation = stack[-1] if stack else None 884 885 if not observation: 886 self._log.warning("No observation found in the current context") 887 888 return 889 890 update_params = { 891 k: v 892 for k, v in { 893 "input": input, 894 "output": output, 895 "name": name, 896 "version": version, 897 "metadata": metadata, 898 "start_time": start_time, 899 "end_time": end_time, 900 "release": release, 901 "tags": tags, 902 "user_id": user_id, 903 "session_id": session_id, 904 "level": level, 905 "status_message": status_message, 906 "completion_start_time": completion_start_time, 907 "model": model, 908 "model_parameters": model_parameters, 909 "usage": usage, 910 "usage_details": usage_details, 911 "cost_details": cost_details, 912 "prompt": prompt, 913 "public": public, 914 }.items() 915 if v is not None 916 } 917 918 _observation_params_context.get()[observation.id].update(update_params) 919 920 def score_current_observation( 921 self, 922 *, 923 name: str, 924 value: Union[float, str], 925 data_type: Optional[ScoreDataType] = None, 926 comment: Optional[str] = None, 927 id: Optional[str] = None, 928 config_id: Optional[str] = None, 929 ): 930 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 931 932 Arguments: 933 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 934 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 935 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 936 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 937 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 938 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 939 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 940 941 Returns: 942 None 943 944 Note: 945 This method is intended to be used within the context of an active trace or observation. 946 """ 947 try: 948 trace_id = self.get_current_trace_id() 949 current_observation_id = self.get_current_observation_id() 950 951 observation_id = ( 952 current_observation_id if current_observation_id != trace_id else None 953 ) 954 955 if trace_id: 956 self.client_instance.score( 957 trace_id=trace_id, 958 observation_id=observation_id, 959 name=name, 960 value=value, 961 data_type=data_type, 962 comment=comment, 963 id=id, 964 config_id=config_id, 965 ) 966 else: 967 raise ValueError("No trace or observation found in the current context") 968 969 except Exception as e: 970 self._log.error(f"Failed to score observation: {e}") 971 972 def score_current_trace( 973 self, 974 *, 975 name: str, 976 value: Union[float, str], 977 data_type: Optional[ScoreDataType] = None, 978 comment: Optional[str] = None, 979 id: Optional[str] = None, 980 config_id: Optional[str] = None, 981 ): 982 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 983 984 Arguments: 985 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 986 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure. 987 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 988 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 989 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 990 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 991 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 992 993 Returns: 994 None 995 996 Note: 997 This method is intended to be used within the context of an active trace or observation. 998 """ 999 try: 1000 trace_id = self.get_current_trace_id() 1001 1002 if trace_id: 1003 self.client_instance.score( 1004 trace_id=trace_id, 1005 name=name, 1006 value=value, 1007 data_type=data_type, 1008 comment=comment, 1009 id=id, 1010 config_id=config_id, 1011 ) 1012 else: 1013 raise ValueError("No trace found in the current context") 1014 1015 except Exception as e: 1016 self._log.error(f"Failed to score observation: {e}") 1017 1018 @catch_and_log_errors 1019 def flush(self): 1020 """Force immediate flush of all buffered observations to the Langfuse backend. 1021 1022 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 1023 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 1024 1025 Usage: 1026 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 1027 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 1028 1029 Returns: 1030 None 1031 1032 Raises: 1033 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 1034 1035 Note: 1036 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 1037 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 1038 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 1039 """ 1040 if self.client_instance: 1041 self.client_instance.flush() 1042 else: 1043 self._log.warning("No langfuse object found in the current context") 1044 1045 def configure( 1046 self, 1047 *, 1048 public_key: Optional[str] = None, 1049 secret_key: Optional[str] = None, 1050 host: Optional[str] = None, 1051 release: Optional[str] = None, 1052 debug: Optional[bool] = None, 1053 threads: Optional[int] = None, 1054 flush_at: Optional[int] = None, 1055 flush_interval: Optional[int] = None, 1056 max_retries: Optional[int] = None, 1057 timeout: Optional[int] = None, 1058 httpx_client: Optional[httpx.Client] = None, 1059 enabled: Optional[bool] = None, 1060 mask: Optional[Callable] = None, 1061 ): 1062 """Configure the Langfuse client. 1063 1064 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 1065 1066 Args: 1067 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 1068 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 1069 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 1070 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 1071 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 1072 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 1073 flush_at: Max batch size that's sent to the API. 1074 flush_interval: Max delay until a new batch is sent to the API. 1075 max_retries: Max number of retries in case of API/network errors. 1076 timeout: Timeout of API requests in seconds. Default is 20 seconds. 1077 httpx_client: Pass your own httpx client for more customizability of requests. 1078 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 1079 mask (Callable): Function that masks sensitive information from input and output in log messages. 1080 1081 """ 1082 langfuse_singleton = LangfuseSingleton() 1083 langfuse_singleton.reset() 1084 1085 langfuse_singleton.get( 1086 public_key=public_key, 1087 secret_key=secret_key, 1088 host=host, 1089 release=release, 1090 debug=debug, 1091 threads=threads, 1092 flush_at=flush_at, 1093 flush_interval=flush_interval, 1094 max_retries=max_retries, 1095 timeout=timeout, 1096 httpx_client=httpx_client, 1097 enabled=enabled, 1098 mask=mask, 1099 ) 1100 1101 @property 1102 def client_instance(self) -> Langfuse: 1103 """Get the Langfuse client instance for the current decorator context.""" 1104 return LangfuseSingleton().get() 1105 1106 def _set_root_trace_id(self, trace_id: str): 1107 if _observation_stack_context.get(): 1108 self._log.warning( 1109 "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment." 1110 ) 1111 return 1112 1113 _root_trace_id_context.set(trace_id) 1114 1115 def _pop_observation_params_from_context( 1116 self, observation_id: str 1117 ) -> ObservationParams: 1118 params = _observation_params_context.get()[observation_id].copy() 1119 1120 # Remove observation params to avoid leaking 1121 del _observation_params_context.get()[observation_id] 1122 1123 return params 1124 1125 def auth_check(self) -> bool: 1126 """Check if the current Langfuse client is authenticated. 1127 1128 Returns: 1129 bool: True if the client is authenticated, False otherwise 1130 """ 1131 try: 1132 return self.client_instance.auth_check() 1133 except Exception as e: 1134 self._log.error( 1135 "No Langfuse object found in the current context", exc_info=e 1136 ) 1137 1138 return False
117 def observe( 118 self, 119 func: Optional[Callable[P, R]] = None, 120 *, 121 name: Optional[str] = None, 122 as_type: Optional[Literal["generation"]] = None, 123 capture_input: bool = True, 124 capture_output: bool = True, 125 transform_to_string: Optional[Callable[[Iterable], str]] = None, 126 ) -> Callable[[Callable[P, R]], Callable[P, R]]: 127 """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions. 128 129 It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. 130 In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations. 131 132 Attributes: 133 name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name. 134 as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations. 135 capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True. 136 capture_output (bool): If True, captures the return value of the function as output. Default is True. 137 transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture 138 139 Returns: 140 Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse. 141 142 Example: 143 For general tracing (functions/methods): 144 ```python 145 @observe() 146 def your_function(args): 147 # Your implementation here 148 ``` 149 For observing language model generations: 150 ```python 151 @observe(as_type="generation") 152 def your_LLM_function(args): 153 # Your LLM invocation here 154 ``` 155 156 Raises: 157 Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details. 158 159 Note: 160 - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function. 161 - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function. 162 """ 163 164 def decorator(func: Callable[P, R]) -> Callable[P, R]: 165 return ( 166 self._async_observe( 167 func, 168 name=name, 169 as_type=as_type, 170 capture_input=capture_input, 171 capture_output=capture_output, 172 transform_to_string=transform_to_string, 173 ) 174 if asyncio.iscoroutinefunction(func) 175 else self._sync_observe( 176 func, 177 name=name, 178 as_type=as_type, 179 capture_input=capture_input, 180 capture_output=capture_output, 181 transform_to_string=transform_to_string, 182 ) 183 ) 184 185 """ 186 If the decorator is called without arguments, return the decorator function itself. 187 This allows the decorator to be used with or without arguments. 188 Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments. 189 """ 190 if func is None: 191 return decorator 192 else: 193 return decorator(func)
Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
Attributes:
- name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
- as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
- capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
- capture_output (bool): If True, captures the return value of the function as output. Default is True.
- transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:
Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
Example:
For general tracing (functions/methods):
@observe() def your_function(args): # Your implementation here
For observing language model generations:
@observe(as_type="generation") def your_LLM_function(args): # Your LLM invocation here
Raises:
- Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
Note:
- Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the
langfuse_observation_id
keyword when calling the wrapped function. - To update observation or trace parameters (e.g., metadata, session_id), use
langfuse.update_current_observation
andlangfuse.update_current_trace
methods within the wrapped function.
584 def get_current_llama_index_handler(self): 585 """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack. 586 587 This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. 588 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context. 589 590 See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler. 591 592 Returns: 593 LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 594 595 Note: 596 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 597 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 598 """ 599 try: 600 from langfuse.llama_index import LlamaIndexCallbackHandler 601 except ImportError: 602 self._log.error( 603 "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index" 604 ) 605 606 return None 607 608 stack = _observation_stack_context.get() 609 observation = stack[-1] if stack else None 610 611 if observation is None: 612 self._log.warning("No observation found in the current context") 613 614 return None 615 616 if isinstance(observation, StatefulGenerationClient): 617 self._log.warning( 618 "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation" 619 ) 620 621 return None 622 623 callback_handler = LlamaIndexCallbackHandler() 624 callback_handler.set_root(observation) 625 626 return callback_handler
Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
Returns:
LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
628 def get_current_langchain_handler(self): 629 """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack. 630 631 This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. 632 It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context. 633 634 See the Langfuse documentation for more information on integrating the LangchainCallbackHandler. 635 636 Returns: 637 LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found. 638 639 Note: 640 - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists. 641 - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None. 642 """ 643 stack = _observation_stack_context.get() 644 observation = stack[-1] if stack else None 645 646 if observation is None: 647 self._log.warning("No observation found in the current context") 648 649 return None 650 651 if isinstance(observation, StatefulGenerationClient): 652 self._log.warning( 653 "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation" 654 ) 655 656 return None 657 658 return observation.get_langchain_handler()
Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
Returns:
LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
Note:
- This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
- If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
660 def get_current_trace_id(self): 661 """Retrieve the ID of the current trace from the observation stack context. 662 663 This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, 664 such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, 665 representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead. 666 667 Returns: 668 str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 669 possibly due to the method being called outside of any @observe-decorated function execution. 670 671 Note: 672 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 673 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 674 """ 675 context_trace_id = _root_trace_id_context.get() 676 if context_trace_id: 677 return context_trace_id 678 679 stack = _observation_stack_context.get() 680 681 if not stack: 682 return None 683 684 return stack[0].id
Retrieve the ID of the current trace from the observation stack context.
This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
Returns:
str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
686 def get_current_trace_url(self) -> Optional[str]: 687 """Retrieve the URL of the current trace in context. 688 689 Returns: 690 str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, 691 possibly due to the method being called outside of any @observe-decorated function execution. 692 693 Note: 694 - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved. 695 - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 696 """ 697 try: 698 trace_id = self.get_current_trace_id() 699 700 if not trace_id: 701 raise ValueError("No trace found in the current context") 702 703 project_id = self.client_instance._get_project_id() 704 705 if not project_id: 706 return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}" 707 708 return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}" 709 710 except Exception as e: 711 self._log.error(f"Failed to get current trace URL: {e}") 712 713 return None
Retrieve the URL of the current trace in context.
Returns:
str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
- If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
715 def get_current_observation_id(self): 716 """Retrieve the ID of the current observation in context. 717 718 Returns: 719 str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, 720 possibly due to the method being called outside of any @observe-decorated function execution. 721 722 Note: 723 - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved. 724 - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context. 725 - If called at the top level of a trace, it will return the trace ID. 726 """ 727 stack = _observation_stack_context.get() 728 729 if not stack: 730 return None 731 732 return stack[-1].id
Retrieve the ID of the current observation in context.
Returns:
str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.
Note:
- This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
- If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
- If called at the top level of a trace, it will return the trace ID.
734 def update_current_trace( 735 self, 736 name: Optional[str] = None, 737 input: Optional[Any] = None, 738 output: Optional[Any] = None, 739 user_id: Optional[str] = None, 740 session_id: Optional[str] = None, 741 version: Optional[str] = None, 742 release: Optional[str] = None, 743 metadata: Optional[Any] = None, 744 tags: Optional[List[str]] = None, 745 public: Optional[bool] = None, 746 ): 747 """Set parameters for the current trace, updating the trace's metadata and context information. 748 749 This method allows for dynamically updating the trace parameters at any point during the execution of a trace. 750 It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, 751 and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI. 752 753 Arguments: 754 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.. 755 input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call. 756 output (Optional[Any]): The output or result of the trace 757 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 758 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 759 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 760 release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 761 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 762 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 763 764 Returns: 765 None 766 767 Note: 768 - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator. 769 - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context. 770 - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context. 771 """ 772 trace_id = self.get_current_trace_id() 773 774 if trace_id is None: 775 self._log.warning("No trace found in the current context") 776 777 return 778 779 params_to_update = { 780 k: v 781 for k, v in { 782 "name": name, 783 "input": input, 784 "output": output, 785 "user_id": user_id, 786 "session_id": session_id, 787 "version": version, 788 "release": release, 789 "metadata": metadata, 790 "tags": tags, 791 "public": public, 792 }.items() 793 if v is not None 794 } 795 796 # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side 797 server_merged_attributes = ["metadata", "tags"] 798 if any(attribute in params_to_update for attribute in server_merged_attributes): 799 self.client_instance.trace( 800 id=trace_id, 801 **{ 802 k: v 803 for k, v in params_to_update.items() 804 if k in server_merged_attributes 805 }, 806 ) 807 808 _observation_params_context.get()[trace_id].update(params_to_update)
Set parameters for the current trace, updating the trace's metadata and context information.
This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
Arguments:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
- input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
- output (Optional[Any]): The output or result of the trace
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:
None
Note:
- This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
- The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
- If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
810 def update_current_observation( 811 self, 812 *, 813 input: Optional[Any] = None, 814 output: Optional[Any] = None, 815 name: Optional[str] = None, 816 version: Optional[str] = None, 817 metadata: Optional[Any] = None, 818 start_time: Optional[datetime] = None, 819 end_time: Optional[datetime] = None, 820 release: Optional[str] = None, 821 tags: Optional[List[str]] = None, 822 user_id: Optional[str] = None, 823 session_id: Optional[str] = None, 824 level: Optional[SpanLevel] = None, 825 status_message: Optional[str] = None, 826 completion_start_time: Optional[datetime] = None, 827 model: Optional[str] = None, 828 model_parameters: Optional[Dict[str, MapValue]] = None, 829 usage: Optional[Union[BaseModel, ModelUsage]] = None, 830 usage_details: Optional[UsageDetails] = None, 831 cost_details: Optional[Dict[str, float]] = None, 832 prompt: Optional[PromptClient] = None, 833 public: Optional[bool] = None, 834 ): 835 """Update parameters for the current observation within an active trace context. 836 837 This method dynamically adjusts the parameters of the most recent observation on the observation stack. 838 It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, 839 enhancing the observability and traceability of the execution context. 840 841 Note that if a param is not available on a specific observation type, it will be ignored. 842 843 Shared params: 844 - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call. 845 - `output` (Optional[Any]): The output or result of the trace or observation 846 - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI. 847 - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 848 - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification. 849 - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration. 850 - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 851 852 Trace-specific params: 853 - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 854 - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 855 - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 856 - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 857 - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 858 859 Span-specific params: 860 - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". 861 - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting. 862 863 Generation-specific params: 864 - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 865 - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. 866 - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. 867 - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. 868 - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. 869 - `prompt`(Optional[PromptClient]): The prompt object used for the generation. 870 871 Returns: 872 None 873 874 Raises: 875 ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope. 876 877 Note: 878 - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator. 879 - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended. 880 - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information. 881 """ 882 stack = _observation_stack_context.get() 883 observation = stack[-1] if stack else None 884 885 if not observation: 886 self._log.warning("No observation found in the current context") 887 888 return 889 890 update_params = { 891 k: v 892 for k, v in { 893 "input": input, 894 "output": output, 895 "name": name, 896 "version": version, 897 "metadata": metadata, 898 "start_time": start_time, 899 "end_time": end_time, 900 "release": release, 901 "tags": tags, 902 "user_id": user_id, 903 "session_id": session_id, 904 "level": level, 905 "status_message": status_message, 906 "completion_start_time": completion_start_time, 907 "model": model, 908 "model_parameters": model_parameters, 909 "usage": usage, 910 "usage_details": usage_details, 911 "cost_details": cost_details, 912 "prompt": prompt, 913 "public": public, 914 }.items() 915 if v is not None 916 } 917 918 _observation_params_context.get()[observation.id].update(update_params)
Update parameters for the current observation within an active trace context.
This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.
Note that if a param is not available on a specific observation type, it will be ignored.
Shared params:
input
(Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.output
(Optional[Any]): The output or result of the trace or observationname
(Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.metadata
(Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.start_time
(Optional[datetime]): The start time of the observation, allowing for custom time range specification.end_time
(Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.version
(Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
Trace-specific params:
- user_id
(Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id
(Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- release
(Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
- tags
(Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public
(Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Span-specific params:
- level
(Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
- status_message
(Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
Generation-specific params:
- completion_start_time
(Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
- model_parameters
(Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
- usage
(Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use usage_details
and cost_details
instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
- usage_details
(Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
- cost_details
(Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
- prompt
(Optional[PromptClient]): The prompt object used for the generation.
Returns:
None
Raises:
- ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
- This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
- It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
- Parameters set to
None
will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
920 def score_current_observation( 921 self, 922 *, 923 name: str, 924 value: Union[float, str], 925 data_type: Optional[ScoreDataType] = None, 926 comment: Optional[str] = None, 927 id: Optional[str] = None, 928 config_id: Optional[str] = None, 929 ): 930 """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace. 931 932 Arguments: 933 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 934 value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure. 935 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 936 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 937 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 938 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 939 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 940 941 Returns: 942 None 943 944 Note: 945 This method is intended to be used within the context of an active trace or observation. 946 """ 947 try: 948 trace_id = self.get_current_trace_id() 949 current_observation_id = self.get_current_observation_id() 950 951 observation_id = ( 952 current_observation_id if current_observation_id != trace_id else None 953 ) 954 955 if trace_id: 956 self.client_instance.score( 957 trace_id=trace_id, 958 observation_id=observation_id, 959 name=name, 960 value=value, 961 data_type=data_type, 962 comment=comment, 963 id=id, 964 config_id=config_id, 965 ) 966 else: 967 raise ValueError("No trace or observation found in the current context") 968 969 except Exception as e: 970 self._log.error(f"Failed to score observation: {e}")
Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
- data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
- config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
972 def score_current_trace( 973 self, 974 *, 975 name: str, 976 value: Union[float, str], 977 data_type: Optional[ScoreDataType] = None, 978 comment: Optional[str] = None, 979 id: Optional[str] = None, 980 config_id: Optional[str] = None, 981 ): 982 """Score the current trace in context. This can be called anywhere in the nested trace to score the trace. 983 984 Arguments: 985 name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded. 986 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure. 987 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 988 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 989 comment (Optional[str]): An optional comment or description providing context or additional details about the score. 990 id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking. 991 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 992 993 Returns: 994 None 995 996 Note: 997 This method is intended to be used within the context of an active trace or observation. 998 """ 999 try: 1000 trace_id = self.get_current_trace_id() 1001 1002 if trace_id: 1003 self.client_instance.score( 1004 trace_id=trace_id, 1005 name=name, 1006 value=value, 1007 data_type=data_type, 1008 comment=comment, 1009 id=id, 1010 config_id=config_id, 1011 ) 1012 else: 1013 raise ValueError("No trace found in the current context") 1014 1015 except Exception as e: 1016 self._log.error(f"Failed to score observation: {e}")
Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
Arguments:
- name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
- value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
- data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
- comment (Optional[str]): An optional comment or description providing context or additional details about the score.
- id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
- config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:
None
Note:
This method is intended to be used within the context of an active trace or observation.
1018 @catch_and_log_errors 1019 def flush(self): 1020 """Force immediate flush of all buffered observations to the Langfuse backend. 1021 1022 This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. 1023 It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits. 1024 1025 Usage: 1026 - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform. 1027 - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data. 1028 1029 Returns: 1030 None 1031 1032 Raises: 1033 ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues. 1034 1035 Note: 1036 - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts. 1037 - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. 1038 However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes. 1039 """ 1040 if self.client_instance: 1041 self.client_instance.flush() 1042 else: 1043 self._log.warning("No langfuse object found in the current context")
Force immediate flush of all buffered observations to the Langfuse backend.
This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
Usage:
- This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
- It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:
None
Raises:
- ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
- The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
- In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to
flush
can be beneficial in certain edge cases or for debugging purposes.
1045 def configure( 1046 self, 1047 *, 1048 public_key: Optional[str] = None, 1049 secret_key: Optional[str] = None, 1050 host: Optional[str] = None, 1051 release: Optional[str] = None, 1052 debug: Optional[bool] = None, 1053 threads: Optional[int] = None, 1054 flush_at: Optional[int] = None, 1055 flush_interval: Optional[int] = None, 1056 max_retries: Optional[int] = None, 1057 timeout: Optional[int] = None, 1058 httpx_client: Optional[httpx.Client] = None, 1059 enabled: Optional[bool] = None, 1060 mask: Optional[Callable] = None, 1061 ): 1062 """Configure the Langfuse client. 1063 1064 If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings. 1065 1066 Args: 1067 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 1068 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 1069 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 1070 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 1071 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 1072 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 1073 flush_at: Max batch size that's sent to the API. 1074 flush_interval: Max delay until a new batch is sent to the API. 1075 max_retries: Max number of retries in case of API/network errors. 1076 timeout: Timeout of API requests in seconds. Default is 20 seconds. 1077 httpx_client: Pass your own httpx client for more customizability of requests. 1078 enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised. 1079 mask (Callable): Function that masks sensitive information from input and output in log messages. 1080 1081 """ 1082 langfuse_singleton = LangfuseSingleton() 1083 langfuse_singleton.reset() 1084 1085 langfuse_singleton.get( 1086 public_key=public_key, 1087 secret_key=secret_key, 1088 host=host, 1089 release=release, 1090 debug=debug, 1091 threads=threads, 1092 flush_at=flush_at, 1093 flush_interval=flush_interval, 1094 max_retries=max_retries, 1095 timeout=timeout, 1096 httpx_client=httpx_client, 1097 enabled=enabled, 1098 mask=mask, 1099 )
Configure the Langfuse client.
If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
Arguments:
- public_key: Public API key of Langfuse project. Can be set via
LANGFUSE_PUBLIC_KEY
environment variable. - secret_key: Secret API key of Langfuse project. Can be set via
LANGFUSE_SECRET_KEY
environment variable. - host: Host of Langfuse API. Can be set via
LANGFUSE_HOST
environment variable. Defaults tohttps://cloud.langfuse.com
. - release: Release number/hash of the application to provide analytics grouped by release. Can be set via
LANGFUSE_RELEASE
environment variable. - debug: Enables debug mode for more verbose logging. Can be set via
LANGFUSE_DEBUG
environment variable. - threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
- flush_at: Max batch size that's sent to the API.
- flush_interval: Max delay until a new batch is sent to the API.
- max_retries: Max number of retries in case of API/network errors.
- timeout: Timeout of API requests in seconds. Default is 20 seconds.
- httpx_client: Pass your own httpx client for more customizability of requests.
- enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
- mask (Callable): Function that masks sensitive information from input and output in log messages.
1101 @property 1102 def client_instance(self) -> Langfuse: 1103 """Get the Langfuse client instance for the current decorator context.""" 1104 return LangfuseSingleton().get()
Get the Langfuse client instance for the current decorator context.
1125 def auth_check(self) -> bool: 1126 """Check if the current Langfuse client is authenticated. 1127 1128 Returns: 1129 bool: True if the client is authenticated, False otherwise 1130 """ 1131 try: 1132 return self.client_instance.auth_check() 1133 except Exception as e: 1134 self._log.error( 1135 "No Langfuse object found in the current context", exc_info=e 1136 ) 1137 1138 return False
Check if the current Langfuse client is authenticated.
Returns:
bool: True if the client is authenticated, False otherwise