langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( *, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 84    def observe(
 85        self,
 86        *,
 87        as_type: Optional[Literal["generation"]] = None,
 88        capture_input: bool = True,
 89        capture_output: bool = True,
 90        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 91    ) -> Callable[[F], F]:
 92        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 93
 94        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 95        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 96
 97        Attributes:
 98            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 99            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
100            capture_output (bool): If True, captures the return value of the function as output. Default is True.
101            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
102
103        Returns:
104            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
105
106        Example:
107            For general tracing (functions/methods):
108            ```python
109            @observe()
110            def your_function(args):
111                # Your implementation here
112            ```
113            For observing language model generations:
114            ```python
115            @observe(as_type="generation")
116            def your_LLM_function(args):
117                # Your LLM invocation here
118            ```
119
120        Raises:
121            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
122
123        Note:
124        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
125        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
126        """
127
128        def decorator(func: F) -> F:
129            return (
130                self._async_observe(
131                    func,
132                    as_type=as_type,
133                    capture_input=capture_input,
134                    capture_output=capture_output,
135                    transform_to_string=transform_to_string,
136                )
137                if asyncio.iscoroutinefunction(func)
138                else self._sync_observe(
139                    func,
140                    as_type=as_type,
141                    capture_input=capture_input,
142                    capture_output=capture_output,
143                    transform_to_string=transform_to_string,
144                )
145            )
146
147        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
 81class LangfuseDecorator:
 82    _log = logging.getLogger("langfuse")
 83
 84    def observe(
 85        self,
 86        *,
 87        as_type: Optional[Literal["generation"]] = None,
 88        capture_input: bool = True,
 89        capture_output: bool = True,
 90        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 91    ) -> Callable[[F], F]:
 92        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 93
 94        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 95        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 96
 97        Attributes:
 98            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 99            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
100            capture_output (bool): If True, captures the return value of the function as output. Default is True.
101            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
102
103        Returns:
104            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
105
106        Example:
107            For general tracing (functions/methods):
108            ```python
109            @observe()
110            def your_function(args):
111                # Your implementation here
112            ```
113            For observing language model generations:
114            ```python
115            @observe(as_type="generation")
116            def your_LLM_function(args):
117                # Your LLM invocation here
118            ```
119
120        Raises:
121            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
122
123        Note:
124        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
125        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
126        """
127
128        def decorator(func: F) -> F:
129            return (
130                self._async_observe(
131                    func,
132                    as_type=as_type,
133                    capture_input=capture_input,
134                    capture_output=capture_output,
135                    transform_to_string=transform_to_string,
136                )
137                if asyncio.iscoroutinefunction(func)
138                else self._sync_observe(
139                    func,
140                    as_type=as_type,
141                    capture_input=capture_input,
142                    capture_output=capture_output,
143                    transform_to_string=transform_to_string,
144                )
145            )
146
147        return decorator
148
149    def _async_observe(
150        self,
151        func: F,
152        as_type: Optional[Literal["generation"]],
153        capture_input: bool,
154        capture_output: bool,
155        transform_to_string: Optional[Callable[[Iterable], str]] = None,
156    ) -> F:
157        @wraps(func)
158        async def async_wrapper(*args, **kwargs):
159            observation = self._prepare_call(
160                func_name=func.__name__,
161                as_type=as_type,
162                capture_input=capture_input,
163                is_instance_method=self._is_instance_method(func),
164                func_args=args,
165                func_kwargs=kwargs,
166            )
167            result = None
168
169            try:
170                result = await func(*args, **kwargs)
171            except Exception as e:
172                self._handle_exception(observation, e)
173            finally:
174                result = self._finalize_call(
175                    observation, result, capture_output, transform_to_string
176                )
177
178                # Returning from finally block may swallow errors, so only return if result is not None
179                if result is not None:
180                    return result
181
182        return cast(F, async_wrapper)
183
184    def _sync_observe(
185        self,
186        func: F,
187        as_type: Optional[Literal["generation"]],
188        capture_input: bool,
189        capture_output: bool,
190        transform_to_string: Optional[Callable[[Iterable], str]] = None,
191    ) -> F:
192        @wraps(func)
193        def sync_wrapper(*args, **kwargs):
194            observation = self._prepare_call(
195                func_name=func.__name__,
196                as_type=as_type,
197                capture_input=capture_input,
198                is_instance_method=self._is_instance_method(func),
199                func_args=args,
200                func_kwargs=kwargs,
201            )
202            result = None
203
204            try:
205                result = func(*args, **kwargs)
206            except Exception as e:
207                self._handle_exception(observation, e)
208            finally:
209                result = self._finalize_call(
210                    observation, result, capture_output, transform_to_string
211                )
212
213                # Returning from finally block may swallow errors, so only return if result is not None
214                if result is not None:
215                    return result
216
217        return cast(F, sync_wrapper)
218
219    @staticmethod
220    def _is_instance_method(func: Callable) -> bool:
221        """Check if a callable is likely an instance method based on its signature.
222
223        This method inspects the given callable's signature for the presence of a 'self' parameter, which is conventionally used for instance methods in Python classes. It returns True if 'self' is found among the parameters, suggesting the callable is an instance method.
224
225        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
226
227        Returns:
228        bool: True if 'self' is in the callable's parameters, False otherwise.
229        """
230        return "self" in inspect.signature(func).parameters
231
232    def _prepare_call(
233        self,
234        *,
235        func_name: str,
236        as_type: Optional[Literal["generation"]],
237        capture_input: bool,
238        is_instance_method: bool = False,
239        func_args: Tuple = (),
240        func_kwargs: Dict = {},
241    ) -> Optional[
242        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
243    ]:
244        try:
245            langfuse = self._get_langfuse()
246            stack = _observation_stack_context.get().copy()
247            parent = stack[-1] if stack else None
248
249            # Collect default observation data
250            name = func_name
251            observation_id = func_kwargs.pop("langfuse_observation_id", None)
252            id = str(observation_id) if observation_id else None
253            start_time = _get_timestamp()
254
255            # Remove implicitly passed "self" argument for instance methods
256            if is_instance_method:
257                logged_args = func_args[1:]
258            else:
259                logged_args = func_args
260
261            input = (
262                {"args": logged_args, "kwargs": func_kwargs} if capture_input else None
263            )
264
265            params = {
266                "id": id,
267                "name": name,
268                "start_time": start_time,
269                "input": input,
270            }
271
272            # Create observation
273            if parent and as_type == "generation":
274                observation = parent.generation(**params)
275            elif as_type == "generation":
276                # Create wrapper trace if generation is top-level
277                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
278                trace = langfuse.trace(id=id, name=name, start_time=start_time)
279                observation = langfuse.generation(
280                    name=name, start_time=start_time, input=input, trace_id=trace.id
281                )
282            elif parent:
283                observation = parent.span(**params)
284            else:
285                observation = langfuse.trace(**params)
286
287            _observation_stack_context.set(stack + [observation])
288
289            return observation
290        except Exception as e:
291            self._log.error(f"Failed to prepare observation: {e}")
292
293    def _finalize_call(
294        self,
295        observation: Optional[
296            Union[
297                StatefulSpanClient,
298                StatefulTraceClient,
299                StatefulGenerationClient,
300            ]
301        ],
302        result: Any,
303        capture_output: bool,
304        transform_to_string: Optional[Callable[[Iterable], str]] = None,
305    ):
306        if inspect.isgenerator(result):
307            return self._wrap_sync_generator_result(
308                observation, result, capture_output, transform_to_string
309            )
310        elif inspect.isasyncgen(result):
311            return self._wrap_async_generator_result(
312                observation, result, capture_output, transform_to_string
313            )
314
315        else:
316            return self._handle_call_result(observation, result, capture_output)
317
318    def _handle_call_result(
319        self,
320        observation: Optional[
321            Union[
322                StatefulSpanClient,
323                StatefulTraceClient,
324                StatefulGenerationClient,
325            ]
326        ],
327        result: Any,
328        capture_output: bool,
329    ):
330        try:
331            if observation is None:
332                raise ValueError("No observation found in the current context")
333
334            # Collect final observation data
335            observation_params = _observation_params_context.get()[
336                observation.id
337            ].copy()
338            del _observation_params_context.get()[
339                observation.id
340            ]  # Remove observation params to avoid leaking
341
342            end_time = observation_params["end_time"] or _get_timestamp()
343            output = observation_params["output"] or (
344                str(result) if result and capture_output else None
345            )
346            observation_params.update(end_time=end_time, output=output)
347
348            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
349                observation.end(**observation_params)
350            elif isinstance(observation, StatefulTraceClient):
351                observation.update(**observation_params)
352
353            # Remove observation from top of stack
354            stack = _observation_stack_context.get()
355            _observation_stack_context.set(stack[:-1])
356
357        except Exception as e:
358            self._log.error(f"Failed to finalize observation: {e}")
359
360        finally:
361            return result
362
363    def _handle_exception(
364        self,
365        observation: Optional[
366            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
367        ],
368        e: Exception,
369    ):
370        if observation:
371            _observation_params_context.get()[observation.id].update(
372                level="ERROR", status_message=str(e)
373            )
374        raise e
375
376    def _wrap_sync_generator_result(
377        self,
378        observation: Optional[
379            Union[
380                StatefulSpanClient,
381                StatefulTraceClient,
382                StatefulGenerationClient,
383            ]
384        ],
385        generator: Generator,
386        capture_output: bool,
387        transform_to_string: Optional[Callable[[Iterable], str]] = None,
388    ):
389        items = []
390
391        try:
392            for item in generator:
393                items.append(item)
394
395                yield item
396
397        finally:
398            output = items
399
400            if transform_to_string is not None:
401                output = transform_to_string(items)
402
403            elif all(isinstance(item, str) for item in items):
404                output = "".join(items)
405
406            self._handle_call_result(observation, output, capture_output)
407
408    async def _wrap_async_generator_result(
409        self,
410        observation: Optional[
411            Union[
412                StatefulSpanClient,
413                StatefulTraceClient,
414                StatefulGenerationClient,
415            ]
416        ],
417        generator: AsyncGenerator,
418        capture_output: bool,
419        transform_to_string: Optional[Callable[[Iterable], str]] = None,
420    ) -> AsyncGenerator:
421        items = []
422
423        try:
424            async for item in generator:
425                items.append(item)
426
427                yield item
428
429        finally:
430            output = items
431
432            if transform_to_string is not None:
433                output = transform_to_string(items)
434
435            elif all(isinstance(item, str) for item in items):
436                output = "".join(items)
437
438            self._handle_call_result(observation, output, capture_output)
439
440    def get_current_llama_index_handler(self):
441        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
442
443        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
444        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
445
446        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
447
448        Returns:
449            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
450
451        Note:
452            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
453            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
454        """
455        try:
456            from langfuse.llama_index import LlamaIndexCallbackHandler
457        except ImportError:
458            self._log.error(
459                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
460            )
461
462            return None
463
464        observation = _observation_stack_context.get()[-1]
465
466        if observation is None:
467            self._log.warn("No observation found in the current context")
468
469            return None
470
471        if isinstance(observation, StatefulGenerationClient):
472            self._log.warn(
473                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
474            )
475
476            return None
477
478        callback_handler = LlamaIndexCallbackHandler()
479        callback_handler.set_root(observation)
480
481        return callback_handler
482
483    def get_current_langchain_handler(self):
484        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
485
486        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
487        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
488
489        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
490
491        Returns:
492            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
493
494        Note:
495            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
496            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
497        """
498        observation = _observation_stack_context.get()[-1]
499
500        if observation is None:
501            self._log.warn("No observation found in the current context")
502
503            return None
504
505        if isinstance(observation, StatefulGenerationClient):
506            self._log.warn(
507                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
508            )
509
510            return None
511
512        return observation.get_langchain_handler()
513
514    def get_current_trace_id(self):
515        """Retrieve the ID of the current trace from the observation stack context.
516
517        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
518        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
519        representing the entry point of the traced execution context.
520
521        Returns:
522            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
523            possibly due to the method being called outside of any @observe-decorated function execution.
524
525        Note:
526            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
527            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
528        """
529        stack = _observation_stack_context.get()
530        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
531
532        if not stack:
533            if should_log_warning:
534                self._log.warn("No trace found in the current context")
535
536            return None
537
538        return stack[0].id
539
540    def _get_caller_module_name(self):
541        try:
542            caller_module = inspect.getmodule(inspect.stack()[2][0])
543        except Exception as e:
544            self._log.warn(f"Failed to get caller module: {e}")
545
546            return None
547
548        return caller_module.__name__ if caller_module else None
549
550    def get_current_trace_url(self) -> Optional[str]:
551        """Retrieve the URL of the current trace in context.
552
553        Returns:
554            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
555            possibly due to the method being called outside of any @observe-decorated function execution.
556
557        Note:
558            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
559            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
560        """
561        try:
562            trace_id = self.get_current_trace_id()
563            langfuse = self._get_langfuse()
564
565            if not trace_id:
566                raise ValueError("No trace found in the current context")
567
568            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
569
570        except Exception as e:
571            self._log.error(f"Failed to get current trace URL: {e}")
572
573            return None
574
575    def get_current_observation_id(self):
576        """Retrieve the ID of the current observation in context.
577
578        Returns:
579            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
580            possibly due to the method being called outside of any @observe-decorated function execution.
581
582        Note:
583            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
584            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
585            - If called at the top level of a trace, it will return the trace ID.
586        """
587        stack = _observation_stack_context.get()
588        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
589
590        if not stack:
591            if should_log_warning:
592                self._log.warn("No observation found in the current context")
593
594            return None
595
596        return stack[-1].id
597
598    def update_current_trace(
599        self,
600        name: Optional[str] = None,
601        user_id: Optional[str] = None,
602        session_id: Optional[str] = None,
603        version: Optional[str] = None,
604        release: Optional[str] = None,
605        metadata: Optional[Any] = None,
606        tags: Optional[List[str]] = None,
607        public: Optional[bool] = None,
608    ):
609        """Set parameters for the current trace, updating the trace's metadata and context information.
610
611        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
612        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
613        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
614
615        Arguments:
616            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
617            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
618            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
619            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
620            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
621            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
622            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
623
624        Returns:
625            None
626
627        Note:
628            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
629            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
630            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
631        """
632        trace_id = self.get_current_trace_id()
633
634        if trace_id is None:
635            self._log.warn("No trace found in the current context")
636
637            return
638
639        params_to_update = {
640            k: v
641            for k, v in {
642                "name": name,
643                "user_id": user_id,
644                "session_id": session_id,
645                "version": version,
646                "release": release,
647                "metadata": metadata,
648                "tags": tags,
649                "public": public,
650            }.items()
651            if v is not None
652        }
653
654        _observation_params_context.get()[trace_id].update(params_to_update)
655
656    def update_current_observation(
657        self,
658        *,
659        input: Optional[Any] = None,
660        output: Optional[Any] = None,
661        name: Optional[str] = None,
662        version: Optional[str] = None,
663        metadata: Optional[Any] = None,
664        start_time: Optional[datetime] = None,
665        end_time: Optional[datetime] = None,
666        release: Optional[str] = None,
667        tags: Optional[List[str]] = None,
668        user_id: Optional[str] = None,
669        session_id: Optional[str] = None,
670        level: Optional[SpanLevel] = None,
671        status_message: Optional[str] = None,
672        completion_start_time: Optional[datetime] = None,
673        model: Optional[str] = None,
674        model_parameters: Optional[Dict[str, MapValue]] = None,
675        usage: Optional[Union[BaseModel, ModelUsage]] = None,
676        prompt: Optional[PromptClient] = None,
677        public: Optional[bool] = None,
678    ):
679        """Update parameters for the current observation within an active trace context.
680
681        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
682        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
683        enhancing the observability and traceability of the execution context.
684
685        Note that if a param is not available on a specific observation type, it will be ignored.
686
687        Shared params:
688            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
689            - `output` (Optional[Any]): The output or result of the trace or observation
690            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
691            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
692            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
693            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
694            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
695
696        Trace-specific params:
697            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
698            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
699            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
700            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
701            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
702
703        Span-specific params:
704            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
705            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
706
707        Generation-specific params:
708            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
709            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
710            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
711            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
712
713        Returns:
714            None
715
716        Raises:
717            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
718
719        Note:
720            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
721            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
722            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
723        """
724        stack = _observation_stack_context.get()
725        observation = stack[-1] if stack else None
726
727        if not observation:
728            self._log.warn("No observation found in the current context")
729
730            return
731
732        update_params = {
733            k: v
734            for k, v in {
735                "input": input,
736                "output": output,
737                "name": name,
738                "version": version,
739                "metadata": metadata,
740                "start_time": start_time,
741                "end_time": end_time,
742                "release": release,
743                "tags": tags,
744                "user_id": user_id,
745                "session_id": session_id,
746                "level": level,
747                "status_message": status_message,
748                "completion_start_time": completion_start_time,
749                "model": model,
750                "model_parameters": model_parameters,
751                "usage": usage,
752                "prompt": prompt,
753                "public": public,
754            }.items()
755            if v is not None
756        }
757
758        _observation_params_context.get()[observation.id].update(update_params)
759
760    def score_current_observation(
761        self,
762        *,
763        name: str,
764        value: float,
765        comment: Optional[str] = None,
766        id: Optional[str] = None,
767    ):
768        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
769
770        Arguments:
771            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
772            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
773            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
774            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
775
776        Returns:
777            None
778
779        Note:
780            This method is intended to be used within the context of an active trace or observation.
781        """
782        try:
783            langfuse = self._get_langfuse()
784            trace_id = self.get_current_trace_id()
785            current_observation_id = self.get_current_observation_id()
786
787            observation_id = (
788                current_observation_id if current_observation_id != trace_id else None
789            )
790
791            if trace_id:
792                langfuse.score(
793                    trace_id=trace_id,
794                    observation_id=observation_id,
795                    name=name,
796                    value=value,
797                    comment=comment,
798                    id=id,
799                )
800            else:
801                raise ValueError("No trace or observation found in the current context")
802
803        except Exception as e:
804            self._log.error(f"Failed to score observation: {e}")
805
806    def score_current_trace(
807        self,
808        *,
809        name: str,
810        value: float,
811        comment: Optional[str] = None,
812        id: Optional[str] = None,
813    ):
814        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
815
816        Arguments:
817            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
818            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
819            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
820            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
821
822        Returns:
823            None
824
825        Note:
826            This method is intended to be used within the context of an active trace or observation.
827        """
828        try:
829            langfuse = self._get_langfuse()
830            trace_id = self.get_current_trace_id()
831
832            if trace_id:
833                langfuse.score(
834                    trace_id=trace_id,
835                    name=name,
836                    value=value,
837                    comment=comment,
838                    id=id,
839                )
840            else:
841                raise ValueError("No trace found in the current context")
842
843        except Exception as e:
844            self._log.error(f"Failed to score observation: {e}")
845
846    @catch_and_log_errors
847    def flush(self):
848        """Force immediate flush of all buffered observations to the Langfuse backend.
849
850        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
851        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
852
853        Usage:
854            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
855            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
856
857        Returns:
858            None
859
860        Raises:
861            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
862
863        Note:
864            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
865            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
866            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
867        """
868        langfuse = self._get_langfuse()
869        if langfuse:
870            langfuse.flush()
871        else:
872            self._log.warn("No langfuse object found in the current context")
873
874    def _get_langfuse(self) -> Langfuse:
875        return LangfuseSingleton().get()
876
877    def auth_check(self) -> bool:
878        """Check if the current Langfuse client is authenticated.
879
880        Returns:
881            bool: True if the client is authenticated, False otherwise
882        """
883        try:
884            langfuse = self._get_langfuse()
885
886            return langfuse.auth_check()
887        except Exception as e:
888            self._log.error("No Langfuse object found in the current context", e)
889
890            return False
def observe( self, *, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 84    def observe(
 85        self,
 86        *,
 87        as_type: Optional[Literal["generation"]] = None,
 88        capture_input: bool = True,
 89        capture_output: bool = True,
 90        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 91    ) -> Callable[[F], F]:
 92        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 93
 94        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 95        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 96
 97        Attributes:
 98            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 99            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
100            capture_output (bool): If True, captures the return value of the function as output. Default is True.
101            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
102
103        Returns:
104            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
105
106        Example:
107            For general tracing (functions/methods):
108            ```python
109            @observe()
110            def your_function(args):
111                # Your implementation here
112            ```
113            For observing language model generations:
114            ```python
115            @observe(as_type="generation")
116            def your_LLM_function(args):
117                # Your LLM invocation here
118            ```
119
120        Raises:
121            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
122
123        Note:
124        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
125        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
126        """
127
128        def decorator(func: F) -> F:
129            return (
130                self._async_observe(
131                    func,
132                    as_type=as_type,
133                    capture_input=capture_input,
134                    capture_output=capture_output,
135                    transform_to_string=transform_to_string,
136                )
137                if asyncio.iscoroutinefunction(func)
138                else self._sync_observe(
139                    func,
140                    as_type=as_type,
141                    capture_input=capture_input,
142                    capture_output=capture_output,
143                    transform_to_string=transform_to_string,
144                )
145            )
146
147        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
440    def get_current_llama_index_handler(self):
441        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
442
443        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
444        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
445
446        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
447
448        Returns:
449            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
450
451        Note:
452            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
453            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
454        """
455        try:
456            from langfuse.llama_index import LlamaIndexCallbackHandler
457        except ImportError:
458            self._log.error(
459                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
460            )
461
462            return None
463
464        observation = _observation_stack_context.get()[-1]
465
466        if observation is None:
467            self._log.warn("No observation found in the current context")
468
469            return None
470
471        if isinstance(observation, StatefulGenerationClient):
472            self._log.warn(
473                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
474            )
475
476            return None
477
478        callback_handler = LlamaIndexCallbackHandler()
479        callback_handler.set_root(observation)
480
481        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
483    def get_current_langchain_handler(self):
484        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
485
486        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
487        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
488
489        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
490
491        Returns:
492            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
493
494        Note:
495            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
496            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
497        """
498        observation = _observation_stack_context.get()[-1]
499
500        if observation is None:
501            self._log.warn("No observation found in the current context")
502
503            return None
504
505        if isinstance(observation, StatefulGenerationClient):
506            self._log.warn(
507                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
508            )
509
510            return None
511
512        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
514    def get_current_trace_id(self):
515        """Retrieve the ID of the current trace from the observation stack context.
516
517        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
518        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
519        representing the entry point of the traced execution context.
520
521        Returns:
522            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
523            possibly due to the method being called outside of any @observe-decorated function execution.
524
525        Note:
526            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
527            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
528        """
529        stack = _observation_stack_context.get()
530        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
531
532        if not stack:
533            if should_log_warning:
534                self._log.warn("No trace found in the current context")
535
536            return None
537
538        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
550    def get_current_trace_url(self) -> Optional[str]:
551        """Retrieve the URL of the current trace in context.
552
553        Returns:
554            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
555            possibly due to the method being called outside of any @observe-decorated function execution.
556
557        Note:
558            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
559            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
560        """
561        try:
562            trace_id = self.get_current_trace_id()
563            langfuse = self._get_langfuse()
564
565            if not trace_id:
566                raise ValueError("No trace found in the current context")
567
568            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
569
570        except Exception as e:
571            self._log.error(f"Failed to get current trace URL: {e}")
572
573            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
575    def get_current_observation_id(self):
576        """Retrieve the ID of the current observation in context.
577
578        Returns:
579            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
580            possibly due to the method being called outside of any @observe-decorated function execution.
581
582        Note:
583            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
584            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
585            - If called at the top level of a trace, it will return the trace ID.
586        """
587        stack = _observation_stack_context.get()
588        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
589
590        if not stack:
591            if should_log_warning:
592                self._log.warn("No observation found in the current context")
593
594            return None
595
596        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
598    def update_current_trace(
599        self,
600        name: Optional[str] = None,
601        user_id: Optional[str] = None,
602        session_id: Optional[str] = None,
603        version: Optional[str] = None,
604        release: Optional[str] = None,
605        metadata: Optional[Any] = None,
606        tags: Optional[List[str]] = None,
607        public: Optional[bool] = None,
608    ):
609        """Set parameters for the current trace, updating the trace's metadata and context information.
610
611        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
612        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
613        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
614
615        Arguments:
616            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
617            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
618            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
619            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
620            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
621            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
622            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
623
624        Returns:
625            None
626
627        Note:
628            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
629            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
630            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
631        """
632        trace_id = self.get_current_trace_id()
633
634        if trace_id is None:
635            self._log.warn("No trace found in the current context")
636
637            return
638
639        params_to_update = {
640            k: v
641            for k, v in {
642                "name": name,
643                "user_id": user_id,
644                "session_id": session_id,
645                "version": version,
646                "release": release,
647                "metadata": metadata,
648                "tags": tags,
649                "public": public,
650            }.items()
651            if v is not None
652        }
653
654        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
656    def update_current_observation(
657        self,
658        *,
659        input: Optional[Any] = None,
660        output: Optional[Any] = None,
661        name: Optional[str] = None,
662        version: Optional[str] = None,
663        metadata: Optional[Any] = None,
664        start_time: Optional[datetime] = None,
665        end_time: Optional[datetime] = None,
666        release: Optional[str] = None,
667        tags: Optional[List[str]] = None,
668        user_id: Optional[str] = None,
669        session_id: Optional[str] = None,
670        level: Optional[SpanLevel] = None,
671        status_message: Optional[str] = None,
672        completion_start_time: Optional[datetime] = None,
673        model: Optional[str] = None,
674        model_parameters: Optional[Dict[str, MapValue]] = None,
675        usage: Optional[Union[BaseModel, ModelUsage]] = None,
676        prompt: Optional[PromptClient] = None,
677        public: Optional[bool] = None,
678    ):
679        """Update parameters for the current observation within an active trace context.
680
681        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
682        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
683        enhancing the observability and traceability of the execution context.
684
685        Note that if a param is not available on a specific observation type, it will be ignored.
686
687        Shared params:
688            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
689            - `output` (Optional[Any]): The output or result of the trace or observation
690            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
691            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
692            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
693            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
694            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
695
696        Trace-specific params:
697            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
698            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
699            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
700            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
701            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
702
703        Span-specific params:
704            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
705            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
706
707        Generation-specific params:
708            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
709            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
710            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
711            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
712
713        Returns:
714            None
715
716        Raises:
717            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
718
719        Note:
720            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
721            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
722            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
723        """
724        stack = _observation_stack_context.get()
725        observation = stack[-1] if stack else None
726
727        if not observation:
728            self._log.warn("No observation found in the current context")
729
730            return
731
732        update_params = {
733            k: v
734            for k, v in {
735                "input": input,
736                "output": output,
737                "name": name,
738                "version": version,
739                "metadata": metadata,
740                "start_time": start_time,
741                "end_time": end_time,
742                "release": release,
743                "tags": tags,
744                "user_id": user_id,
745                "session_id": session_id,
746                "level": level,
747                "status_message": status_message,
748                "completion_start_time": completion_start_time,
749                "model": model,
750                "model_parameters": model_parameters,
751                "usage": usage,
752                "prompt": prompt,
753                "public": public,
754            }.items()
755            if v is not None
756        }
757
758        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: float, comment: Optional[str] = None, id: Optional[str] = None):
760    def score_current_observation(
761        self,
762        *,
763        name: str,
764        value: float,
765        comment: Optional[str] = None,
766        id: Optional[str] = None,
767    ):
768        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
769
770        Arguments:
771            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
772            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
773            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
774            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
775
776        Returns:
777            None
778
779        Note:
780            This method is intended to be used within the context of an active trace or observation.
781        """
782        try:
783            langfuse = self._get_langfuse()
784            trace_id = self.get_current_trace_id()
785            current_observation_id = self.get_current_observation_id()
786
787            observation_id = (
788                current_observation_id if current_observation_id != trace_id else None
789            )
790
791            if trace_id:
792                langfuse.score(
793                    trace_id=trace_id,
794                    observation_id=observation_id,
795                    name=name,
796                    value=value,
797                    comment=comment,
798                    id=id,
799                )
800            else:
801                raise ValueError("No trace or observation found in the current context")
802
803        except Exception as e:
804            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: float, comment: Optional[str] = None, id: Optional[str] = None):
806    def score_current_trace(
807        self,
808        *,
809        name: str,
810        value: float,
811        comment: Optional[str] = None,
812        id: Optional[str] = None,
813    ):
814        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
815
816        Arguments:
817            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
818            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
819            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
820            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
821
822        Returns:
823            None
824
825        Note:
826            This method is intended to be used within the context of an active trace or observation.
827        """
828        try:
829            langfuse = self._get_langfuse()
830            trace_id = self.get_current_trace_id()
831
832            if trace_id:
833                langfuse.score(
834                    trace_id=trace_id,
835                    name=name,
836                    value=value,
837                    comment=comment,
838                    id=id,
839                )
840            else:
841                raise ValueError("No trace found in the current context")
842
843        except Exception as e:
844            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
846    @catch_and_log_errors
847    def flush(self):
848        """Force immediate flush of all buffered observations to the Langfuse backend.
849
850        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
851        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
852
853        Usage:
854            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
855            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
856
857        Returns:
858            None
859
860        Raises:
861            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
862
863        Note:
864            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
865            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
866            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
867        """
868        langfuse = self._get_langfuse()
869        if langfuse:
870            langfuse.flush()
871        else:
872            self._log.warn("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def auth_check(self) -> bool:
877    def auth_check(self) -> bool:
878        """Check if the current Langfuse client is authenticated.
879
880        Returns:
881            bool: True if the client is authenticated, False otherwise
882        """
883        try:
884            langfuse = self._get_langfuse()
885
886            return langfuse.auth_check()
887        except Exception as e:
888            self._log.error("No Langfuse object found in the current context", e)
889
890            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise