langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 90    def observe(
 91        self,
 92        *,
 93        name: Optional[str] = None,
 94        as_type: Optional[Literal["generation"]] = None,
 95        capture_input: bool = True,
 96        capture_output: bool = True,
 97        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 98    ) -> Callable[[F], F]:
 99        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
100
101        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
102        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
103
104        Attributes:
105            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
106            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
107            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
108            capture_output (bool): If True, captures the return value of the function as output. Default is True.
109            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
110
111        Returns:
112            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
113
114        Example:
115            For general tracing (functions/methods):
116            ```python
117            @observe()
118            def your_function(args):
119                # Your implementation here
120            ```
121            For observing language model generations:
122            ```python
123            @observe(as_type="generation")
124            def your_LLM_function(args):
125                # Your LLM invocation here
126            ```
127
128        Raises:
129            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
130
131        Note:
132        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
133        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
134        """
135
136        def decorator(func: F) -> F:
137            return (
138                self._async_observe(
139                    func,
140                    name=name,
141                    as_type=as_type,
142                    capture_input=capture_input,
143                    capture_output=capture_output,
144                    transform_to_string=transform_to_string,
145                )
146                if asyncio.iscoroutinefunction(func)
147                else self._sync_observe(
148                    func,
149                    name=name,
150                    as_type=as_type,
151                    capture_input=capture_input,
152                    capture_output=capture_output,
153                    transform_to_string=transform_to_string,
154                )
155            )
156
157        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  87class LangfuseDecorator:
  88    _log = logging.getLogger("langfuse")
  89
  90    def observe(
  91        self,
  92        *,
  93        name: Optional[str] = None,
  94        as_type: Optional[Literal["generation"]] = None,
  95        capture_input: bool = True,
  96        capture_output: bool = True,
  97        transform_to_string: Optional[Callable[[Iterable], str]] = None,
  98    ) -> Callable[[F], F]:
  99        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 100
 101        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 102        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 103
 104        Attributes:
 105            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 106            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 107            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 108            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 109            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 110
 111        Returns:
 112            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 113
 114        Example:
 115            For general tracing (functions/methods):
 116            ```python
 117            @observe()
 118            def your_function(args):
 119                # Your implementation here
 120            ```
 121            For observing language model generations:
 122            ```python
 123            @observe(as_type="generation")
 124            def your_LLM_function(args):
 125                # Your LLM invocation here
 126            ```
 127
 128        Raises:
 129            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 130
 131        Note:
 132        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 133        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 134        """
 135
 136        def decorator(func: F) -> F:
 137            return (
 138                self._async_observe(
 139                    func,
 140                    name=name,
 141                    as_type=as_type,
 142                    capture_input=capture_input,
 143                    capture_output=capture_output,
 144                    transform_to_string=transform_to_string,
 145                )
 146                if asyncio.iscoroutinefunction(func)
 147                else self._sync_observe(
 148                    func,
 149                    name=name,
 150                    as_type=as_type,
 151                    capture_input=capture_input,
 152                    capture_output=capture_output,
 153                    transform_to_string=transform_to_string,
 154                )
 155            )
 156
 157        return decorator
 158
 159    def _async_observe(
 160        self,
 161        func: F,
 162        *,
 163        name: Optional[str],
 164        as_type: Optional[Literal["generation"]],
 165        capture_input: bool,
 166        capture_output: bool,
 167        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 168    ) -> F:
 169        @wraps(func)
 170        async def async_wrapper(*args, **kwargs):
 171            observation = self._prepare_call(
 172                name=name or func.__name__,
 173                as_type=as_type,
 174                capture_input=capture_input,
 175                is_method=self._is_method(func),
 176                func_args=args,
 177                func_kwargs=kwargs,
 178            )
 179            result = None
 180
 181            try:
 182                result = await func(*args, **kwargs)
 183            except Exception as e:
 184                self._handle_exception(observation, e)
 185            finally:
 186                result = self._finalize_call(
 187                    observation, result, capture_output, transform_to_string
 188                )
 189
 190                # Returning from finally block may swallow errors, so only return if result is not None
 191                if result is not None:
 192                    return result
 193
 194        return cast(F, async_wrapper)
 195
 196    def _sync_observe(
 197        self,
 198        func: F,
 199        *,
 200        name: Optional[str],
 201        as_type: Optional[Literal["generation"]],
 202        capture_input: bool,
 203        capture_output: bool,
 204        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 205    ) -> F:
 206        @wraps(func)
 207        def sync_wrapper(*args, **kwargs):
 208            observation = self._prepare_call(
 209                name=name or func.__name__,
 210                as_type=as_type,
 211                capture_input=capture_input,
 212                is_method=self._is_method(func),
 213                func_args=args,
 214                func_kwargs=kwargs,
 215            )
 216            result = None
 217
 218            try:
 219                result = func(*args, **kwargs)
 220            except Exception as e:
 221                self._handle_exception(observation, e)
 222            finally:
 223                result = self._finalize_call(
 224                    observation, result, capture_output, transform_to_string
 225                )
 226
 227                # Returning from finally block may swallow errors, so only return if result is not None
 228                if result is not None:
 229                    return result
 230
 231        return cast(F, sync_wrapper)
 232
 233    @staticmethod
 234    def _is_method(func: Callable) -> bool:
 235        """Check if a callable is likely an class or instance method based on its signature.
 236
 237        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 238
 239        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 240
 241        Returns:
 242        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 243        """
 244        return (
 245            "self" in inspect.signature(func).parameters
 246            or "cls" in inspect.signature(func).parameters
 247        )
 248
 249    def _prepare_call(
 250        self,
 251        *,
 252        name: str,
 253        as_type: Optional[Literal["generation"]],
 254        capture_input: bool,
 255        is_method: bool = False,
 256        func_args: Tuple = (),
 257        func_kwargs: Dict = {},
 258    ) -> Optional[
 259        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 260    ]:
 261        try:
 262            langfuse = self._get_langfuse()
 263            stack = _observation_stack_context.get().copy()
 264            parent = stack[-1] if stack else None
 265
 266            # Collect default observation data
 267            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 268            id = str(observation_id) if observation_id else None
 269            start_time = _get_timestamp()
 270
 271            input = (
 272                self._get_input_from_func_args(
 273                    is_method=is_method,
 274                    func_args=func_args,
 275                    func_kwargs=func_kwargs,
 276                )
 277                if capture_input
 278                else None
 279            )
 280
 281            params = {
 282                "id": id,
 283                "name": name,
 284                "start_time": start_time,
 285                "input": input,
 286            }
 287
 288            # Create observation
 289            if parent and as_type == "generation":
 290                observation = parent.generation(**params)
 291            elif as_type == "generation":
 292                # Create wrapper trace if generation is top-level
 293                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 294                trace = langfuse.trace(id=id, name=name, start_time=start_time)
 295                observation = langfuse.generation(
 296                    name=name, start_time=start_time, input=input, trace_id=trace.id
 297                )
 298            elif parent:
 299                observation = parent.span(**params)
 300            else:
 301                params["id"] = self._get_context_trace_id() or params["id"]
 302                observation = langfuse.trace(**params)
 303
 304            _observation_stack_context.set(stack + [observation])
 305
 306            return observation
 307        except Exception as e:
 308            self._log.error(f"Failed to prepare observation: {e}")
 309
 310    def _get_input_from_func_args(
 311        self,
 312        *,
 313        is_method: bool = False,
 314        func_args: Tuple = (),
 315        func_kwargs: Dict = {},
 316    ) -> Any:
 317        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 318        logged_args = func_args[1:] if is_method else func_args
 319        raw_input = {
 320            "args": logged_args,
 321            "kwargs": func_kwargs,
 322        }
 323
 324        # Serialize and deserialize to ensure proper JSON serialization.
 325        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 326        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 327
 328    def _get_context_trace_id(self):
 329        context_trace_id = _root_trace_id_context.get()
 330
 331        if context_trace_id is not None:
 332            # Clear the context trace ID to avoid leaking it to other traces
 333            _root_trace_id_context.set(None)
 334
 335            return context_trace_id
 336
 337        return None
 338
 339    def _finalize_call(
 340        self,
 341        observation: Optional[
 342            Union[
 343                StatefulSpanClient,
 344                StatefulTraceClient,
 345                StatefulGenerationClient,
 346            ]
 347        ],
 348        result: Any,
 349        capture_output: bool,
 350        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 351    ):
 352        if inspect.isgenerator(result):
 353            return self._wrap_sync_generator_result(
 354                observation, result, capture_output, transform_to_string
 355            )
 356        elif inspect.isasyncgen(result):
 357            return self._wrap_async_generator_result(
 358                observation, result, capture_output, transform_to_string
 359            )
 360
 361        else:
 362            return self._handle_call_result(observation, result, capture_output)
 363
 364    def _handle_call_result(
 365        self,
 366        observation: Optional[
 367            Union[
 368                StatefulSpanClient,
 369                StatefulTraceClient,
 370                StatefulGenerationClient,
 371            ]
 372        ],
 373        result: Any,
 374        capture_output: bool,
 375    ):
 376        try:
 377            if observation is None:
 378                raise ValueError("No observation found in the current context")
 379
 380            # Collect final observation data
 381            observation_params = _observation_params_context.get()[
 382                observation.id
 383            ].copy()
 384            del _observation_params_context.get()[
 385                observation.id
 386            ]  # Remove observation params to avoid leaking
 387
 388            end_time = observation_params["end_time"] or _get_timestamp()
 389            raw_output = observation_params["output"] or (
 390                result if result and capture_output else None
 391            )
 392
 393            # Serialize and deserialize to ensure proper JSON serialization.
 394            # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 395            output = json.loads(json.dumps(raw_output, cls=EventSerializer))
 396            observation_params.update(end_time=end_time, output=output)
 397
 398            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 399                observation.end(**observation_params)
 400            elif isinstance(observation, StatefulTraceClient):
 401                observation.update(**observation_params)
 402
 403            # Remove observation from top of stack
 404            stack = _observation_stack_context.get()
 405            _observation_stack_context.set(stack[:-1])
 406
 407        except Exception as e:
 408            self._log.error(f"Failed to finalize observation: {e}")
 409
 410        finally:
 411            return result
 412
 413    def _handle_exception(
 414        self,
 415        observation: Optional[
 416            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 417        ],
 418        e: Exception,
 419    ):
 420        if observation:
 421            _observation_params_context.get()[observation.id].update(
 422                level="ERROR", status_message=str(e)
 423            )
 424        raise e
 425
 426    def _wrap_sync_generator_result(
 427        self,
 428        observation: Optional[
 429            Union[
 430                StatefulSpanClient,
 431                StatefulTraceClient,
 432                StatefulGenerationClient,
 433            ]
 434        ],
 435        generator: Generator,
 436        capture_output: bool,
 437        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 438    ):
 439        items = []
 440
 441        try:
 442            for item in generator:
 443                items.append(item)
 444
 445                yield item
 446
 447        finally:
 448            output = items
 449
 450            if transform_to_string is not None:
 451                output = transform_to_string(items)
 452
 453            elif all(isinstance(item, str) for item in items):
 454                output = "".join(items)
 455
 456            self._handle_call_result(observation, output, capture_output)
 457
 458    async def _wrap_async_generator_result(
 459        self,
 460        observation: Optional[
 461            Union[
 462                StatefulSpanClient,
 463                StatefulTraceClient,
 464                StatefulGenerationClient,
 465            ]
 466        ],
 467        generator: AsyncGenerator,
 468        capture_output: bool,
 469        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 470    ) -> AsyncGenerator:
 471        items = []
 472
 473        try:
 474            async for item in generator:
 475                items.append(item)
 476
 477                yield item
 478
 479        finally:
 480            output = items
 481
 482            if transform_to_string is not None:
 483                output = transform_to_string(items)
 484
 485            elif all(isinstance(item, str) for item in items):
 486                output = "".join(items)
 487
 488            self._handle_call_result(observation, output, capture_output)
 489
 490    def get_current_llama_index_handler(self):
 491        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 492
 493        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 494        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 495
 496        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 497
 498        Returns:
 499            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 500
 501        Note:
 502            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 503            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 504        """
 505        try:
 506            from langfuse.llama_index import LlamaIndexCallbackHandler
 507        except ImportError:
 508            self._log.error(
 509                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 510            )
 511
 512            return None
 513
 514        observation = _observation_stack_context.get()[-1]
 515
 516        if observation is None:
 517            self._log.warn("No observation found in the current context")
 518
 519            return None
 520
 521        if isinstance(observation, StatefulGenerationClient):
 522            self._log.warn(
 523                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 524            )
 525
 526            return None
 527
 528        callback_handler = LlamaIndexCallbackHandler()
 529        callback_handler.set_root(observation)
 530
 531        return callback_handler
 532
 533    def get_current_langchain_handler(self):
 534        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 535
 536        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 537        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 538
 539        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 540
 541        Returns:
 542            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 543
 544        Note:
 545            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 546            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 547        """
 548        observation = _observation_stack_context.get()[-1]
 549
 550        if observation is None:
 551            self._log.warn("No observation found in the current context")
 552
 553            return None
 554
 555        if isinstance(observation, StatefulGenerationClient):
 556            self._log.warn(
 557                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 558            )
 559
 560            return None
 561
 562        return observation.get_langchain_handler()
 563
 564    def get_current_trace_id(self):
 565        """Retrieve the ID of the current trace from the observation stack context.
 566
 567        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 568        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 569        representing the entry point of the traced execution context.
 570
 571        Returns:
 572            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 573            possibly due to the method being called outside of any @observe-decorated function execution.
 574
 575        Note:
 576            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 577            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 578        """
 579        stack = _observation_stack_context.get()
 580        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 581
 582        if not stack:
 583            if should_log_warning:
 584                self._log.warn("No trace found in the current context")
 585
 586            return None
 587
 588        return stack[0].id
 589
 590    def _get_caller_module_name(self):
 591        try:
 592            caller_module = inspect.getmodule(inspect.stack()[2][0])
 593        except Exception as e:
 594            self._log.warn(f"Failed to get caller module: {e}")
 595
 596            return None
 597
 598        return caller_module.__name__ if caller_module else None
 599
 600    def get_current_trace_url(self) -> Optional[str]:
 601        """Retrieve the URL of the current trace in context.
 602
 603        Returns:
 604            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 605            possibly due to the method being called outside of any @observe-decorated function execution.
 606
 607        Note:
 608            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 609            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 610        """
 611        try:
 612            trace_id = self.get_current_trace_id()
 613            langfuse = self._get_langfuse()
 614
 615            if not trace_id:
 616                raise ValueError("No trace found in the current context")
 617
 618            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
 619
 620        except Exception as e:
 621            self._log.error(f"Failed to get current trace URL: {e}")
 622
 623            return None
 624
 625    def get_current_observation_id(self):
 626        """Retrieve the ID of the current observation in context.
 627
 628        Returns:
 629            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 630            possibly due to the method being called outside of any @observe-decorated function execution.
 631
 632        Note:
 633            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 634            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 635            - If called at the top level of a trace, it will return the trace ID.
 636        """
 637        stack = _observation_stack_context.get()
 638        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 639
 640        if not stack:
 641            if should_log_warning:
 642                self._log.warn("No observation found in the current context")
 643
 644            return None
 645
 646        return stack[-1].id
 647
 648    def update_current_trace(
 649        self,
 650        name: Optional[str] = None,
 651        user_id: Optional[str] = None,
 652        session_id: Optional[str] = None,
 653        version: Optional[str] = None,
 654        release: Optional[str] = None,
 655        metadata: Optional[Any] = None,
 656        tags: Optional[List[str]] = None,
 657        public: Optional[bool] = None,
 658    ):
 659        """Set parameters for the current trace, updating the trace's metadata and context information.
 660
 661        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 662        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 663        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 664
 665        Arguments:
 666            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 667            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 668            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 669            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 670            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 671            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 672            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 673
 674        Returns:
 675            None
 676
 677        Note:
 678            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 679            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 680            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 681        """
 682        trace_id = self.get_current_trace_id()
 683
 684        if trace_id is None:
 685            self._log.warn("No trace found in the current context")
 686
 687            return
 688
 689        params_to_update = {
 690            k: v
 691            for k, v in {
 692                "name": name,
 693                "user_id": user_id,
 694                "session_id": session_id,
 695                "version": version,
 696                "release": release,
 697                "metadata": metadata,
 698                "tags": tags,
 699                "public": public,
 700            }.items()
 701            if v is not None
 702        }
 703
 704        _observation_params_context.get()[trace_id].update(params_to_update)
 705
 706    def update_current_observation(
 707        self,
 708        *,
 709        input: Optional[Any] = None,
 710        output: Optional[Any] = None,
 711        name: Optional[str] = None,
 712        version: Optional[str] = None,
 713        metadata: Optional[Any] = None,
 714        start_time: Optional[datetime] = None,
 715        end_time: Optional[datetime] = None,
 716        release: Optional[str] = None,
 717        tags: Optional[List[str]] = None,
 718        user_id: Optional[str] = None,
 719        session_id: Optional[str] = None,
 720        level: Optional[SpanLevel] = None,
 721        status_message: Optional[str] = None,
 722        completion_start_time: Optional[datetime] = None,
 723        model: Optional[str] = None,
 724        model_parameters: Optional[Dict[str, MapValue]] = None,
 725        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 726        prompt: Optional[PromptClient] = None,
 727        public: Optional[bool] = None,
 728    ):
 729        """Update parameters for the current observation within an active trace context.
 730
 731        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 732        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 733        enhancing the observability and traceability of the execution context.
 734
 735        Note that if a param is not available on a specific observation type, it will be ignored.
 736
 737        Shared params:
 738            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 739            - `output` (Optional[Any]): The output or result of the trace or observation
 740            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 741            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 742            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 743            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 744            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 745
 746        Trace-specific params:
 747            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 748            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 749            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 750            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 751            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 752
 753        Span-specific params:
 754            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 755            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 756
 757        Generation-specific params:
 758            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 759            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 760            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 761            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 762
 763        Returns:
 764            None
 765
 766        Raises:
 767            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 768
 769        Note:
 770            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 771            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 772            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 773        """
 774        stack = _observation_stack_context.get()
 775        observation = stack[-1] if stack else None
 776
 777        if not observation:
 778            self._log.warn("No observation found in the current context")
 779
 780            return
 781
 782        update_params = {
 783            k: v
 784            for k, v in {
 785                "input": input,
 786                "output": output,
 787                "name": name,
 788                "version": version,
 789                "metadata": metadata,
 790                "start_time": start_time,
 791                "end_time": end_time,
 792                "release": release,
 793                "tags": tags,
 794                "user_id": user_id,
 795                "session_id": session_id,
 796                "level": level,
 797                "status_message": status_message,
 798                "completion_start_time": completion_start_time,
 799                "model": model,
 800                "model_parameters": model_parameters,
 801                "usage": usage,
 802                "prompt": prompt,
 803                "public": public,
 804            }.items()
 805            if v is not None
 806        }
 807
 808        _observation_params_context.get()[observation.id].update(update_params)
 809
 810    def score_current_observation(
 811        self,
 812        *,
 813        name: str,
 814        value: Union[float, str],
 815        data_type: Optional[ScoreDataType] = None,
 816        comment: Optional[str] = None,
 817        id: Optional[str] = None,
 818        config_id: Optional[str] = None,
 819    ):
 820        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 821
 822        Arguments:
 823            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 824            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 825            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 826              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 827            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 828            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 829            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 830
 831        Returns:
 832            None
 833
 834        Note:
 835            This method is intended to be used within the context of an active trace or observation.
 836        """
 837        try:
 838            langfuse = self._get_langfuse()
 839            trace_id = self.get_current_trace_id()
 840            current_observation_id = self.get_current_observation_id()
 841
 842            observation_id = (
 843                current_observation_id if current_observation_id != trace_id else None
 844            )
 845
 846            if trace_id:
 847                langfuse.score(
 848                    trace_id=trace_id,
 849                    observation_id=observation_id,
 850                    name=name,
 851                    value=value,
 852                    data_type=data_type,
 853                    comment=comment,
 854                    id=id,
 855                    config_id=config_id,
 856                )
 857            else:
 858                raise ValueError("No trace or observation found in the current context")
 859
 860        except Exception as e:
 861            self._log.error(f"Failed to score observation: {e}")
 862
 863    def score_current_trace(
 864        self,
 865        *,
 866        name: str,
 867        value: Union[float, str],
 868        data_type: Optional[ScoreDataType] = None,
 869        comment: Optional[str] = None,
 870        id: Optional[str] = None,
 871        config_id: Optional[str] = None,
 872    ):
 873        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 874
 875        Arguments:
 876            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 877            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 878            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 879              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 880            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 881            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 882            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 883
 884        Returns:
 885            None
 886
 887        Note:
 888            This method is intended to be used within the context of an active trace or observation.
 889        """
 890        try:
 891            langfuse = self._get_langfuse()
 892            trace_id = self.get_current_trace_id()
 893
 894            if trace_id:
 895                langfuse.score(
 896                    trace_id=trace_id,
 897                    name=name,
 898                    value=value,
 899                    data_type=data_type,
 900                    comment=comment,
 901                    id=id,
 902                    config_id=config_id,
 903                )
 904            else:
 905                raise ValueError("No trace found in the current context")
 906
 907        except Exception as e:
 908            self._log.error(f"Failed to score observation: {e}")
 909
 910    @catch_and_log_errors
 911    def flush(self):
 912        """Force immediate flush of all buffered observations to the Langfuse backend.
 913
 914        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
 915        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
 916
 917        Usage:
 918            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
 919            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
 920
 921        Returns:
 922            None
 923
 924        Raises:
 925            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
 926
 927        Note:
 928            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
 929            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
 930            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
 931        """
 932        langfuse = self._get_langfuse()
 933        if langfuse:
 934            langfuse.flush()
 935        else:
 936            self._log.warn("No langfuse object found in the current context")
 937
 938    def configure(
 939        self,
 940        *,
 941        public_key: Optional[str] = None,
 942        secret_key: Optional[str] = None,
 943        host: Optional[str] = None,
 944        release: Optional[str] = None,
 945        debug: Optional[bool] = None,
 946        threads: Optional[int] = None,
 947        flush_at: Optional[int] = None,
 948        flush_interval: Optional[int] = None,
 949        max_retries: Optional[int] = None,
 950        timeout: Optional[int] = None,
 951        httpx_client: Optional[httpx.Client] = None,
 952        enabled: Optional[bool] = None,
 953    ):
 954        """Configure the Langfuse client.
 955
 956        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
 957
 958        Args:
 959            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 960            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 961            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 962            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 963            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 964            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 965            flush_at: Max batch size that's sent to the API.
 966            flush_interval: Max delay until a new batch is sent to the API.
 967            max_retries: Max number of retries in case of API/network errors.
 968            timeout: Timeout of API requests in seconds. Default is 20 seconds.
 969            httpx_client: Pass your own httpx client for more customizability of requests.
 970            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
 971        """
 972        langfuse_singleton = LangfuseSingleton()
 973        langfuse_singleton.reset()
 974
 975        langfuse_singleton.get(
 976            public_key=public_key,
 977            secret_key=secret_key,
 978            host=host,
 979            release=release,
 980            debug=debug,
 981            threads=threads,
 982            flush_at=flush_at,
 983            flush_interval=flush_interval,
 984            max_retries=max_retries,
 985            timeout=timeout,
 986            httpx_client=httpx_client,
 987            enabled=enabled,
 988        )
 989
 990    def _get_langfuse(self) -> Langfuse:
 991        return LangfuseSingleton().get()
 992
 993    def _set_root_trace_id(self, trace_id: str):
 994        if _observation_stack_context.get():
 995            self._log.warn(
 996                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
 997            )
 998            return
 999
1000        _root_trace_id_context.set(trace_id)
1001
1002    def auth_check(self) -> bool:
1003        """Check if the current Langfuse client is authenticated.
1004
1005        Returns:
1006            bool: True if the client is authenticated, False otherwise
1007        """
1008        try:
1009            langfuse = self._get_langfuse()
1010
1011            return langfuse.auth_check()
1012        except Exception as e:
1013            self._log.error("No Langfuse object found in the current context", e)
1014
1015            return False
def observe( self, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 90    def observe(
 91        self,
 92        *,
 93        name: Optional[str] = None,
 94        as_type: Optional[Literal["generation"]] = None,
 95        capture_input: bool = True,
 96        capture_output: bool = True,
 97        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 98    ) -> Callable[[F], F]:
 99        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
100
101        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
102        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
103
104        Attributes:
105            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
106            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
107            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
108            capture_output (bool): If True, captures the return value of the function as output. Default is True.
109            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
110
111        Returns:
112            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
113
114        Example:
115            For general tracing (functions/methods):
116            ```python
117            @observe()
118            def your_function(args):
119                # Your implementation here
120            ```
121            For observing language model generations:
122            ```python
123            @observe(as_type="generation")
124            def your_LLM_function(args):
125                # Your LLM invocation here
126            ```
127
128        Raises:
129            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
130
131        Note:
132        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
133        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
134        """
135
136        def decorator(func: F) -> F:
137            return (
138                self._async_observe(
139                    func,
140                    name=name,
141                    as_type=as_type,
142                    capture_input=capture_input,
143                    capture_output=capture_output,
144                    transform_to_string=transform_to_string,
145                )
146                if asyncio.iscoroutinefunction(func)
147                else self._sync_observe(
148                    func,
149                    name=name,
150                    as_type=as_type,
151                    capture_input=capture_input,
152                    capture_output=capture_output,
153                    transform_to_string=transform_to_string,
154                )
155            )
156
157        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
490    def get_current_llama_index_handler(self):
491        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
492
493        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
494        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
495
496        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
497
498        Returns:
499            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
500
501        Note:
502            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
503            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
504        """
505        try:
506            from langfuse.llama_index import LlamaIndexCallbackHandler
507        except ImportError:
508            self._log.error(
509                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
510            )
511
512            return None
513
514        observation = _observation_stack_context.get()[-1]
515
516        if observation is None:
517            self._log.warn("No observation found in the current context")
518
519            return None
520
521        if isinstance(observation, StatefulGenerationClient):
522            self._log.warn(
523                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
524            )
525
526            return None
527
528        callback_handler = LlamaIndexCallbackHandler()
529        callback_handler.set_root(observation)
530
531        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
533    def get_current_langchain_handler(self):
534        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
535
536        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
537        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
538
539        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
540
541        Returns:
542            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
543
544        Note:
545            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
546            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
547        """
548        observation = _observation_stack_context.get()[-1]
549
550        if observation is None:
551            self._log.warn("No observation found in the current context")
552
553            return None
554
555        if isinstance(observation, StatefulGenerationClient):
556            self._log.warn(
557                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
558            )
559
560            return None
561
562        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
564    def get_current_trace_id(self):
565        """Retrieve the ID of the current trace from the observation stack context.
566
567        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
568        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
569        representing the entry point of the traced execution context.
570
571        Returns:
572            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
573            possibly due to the method being called outside of any @observe-decorated function execution.
574
575        Note:
576            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
577            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
578        """
579        stack = _observation_stack_context.get()
580        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
581
582        if not stack:
583            if should_log_warning:
584                self._log.warn("No trace found in the current context")
585
586            return None
587
588        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
600    def get_current_trace_url(self) -> Optional[str]:
601        """Retrieve the URL of the current trace in context.
602
603        Returns:
604            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
605            possibly due to the method being called outside of any @observe-decorated function execution.
606
607        Note:
608            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
609            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
610        """
611        try:
612            trace_id = self.get_current_trace_id()
613            langfuse = self._get_langfuse()
614
615            if not trace_id:
616                raise ValueError("No trace found in the current context")
617
618            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
619
620        except Exception as e:
621            self._log.error(f"Failed to get current trace URL: {e}")
622
623            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
625    def get_current_observation_id(self):
626        """Retrieve the ID of the current observation in context.
627
628        Returns:
629            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
630            possibly due to the method being called outside of any @observe-decorated function execution.
631
632        Note:
633            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
634            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
635            - If called at the top level of a trace, it will return the trace ID.
636        """
637        stack = _observation_stack_context.get()
638        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
639
640        if not stack:
641            if should_log_warning:
642                self._log.warn("No observation found in the current context")
643
644            return None
645
646        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
648    def update_current_trace(
649        self,
650        name: Optional[str] = None,
651        user_id: Optional[str] = None,
652        session_id: Optional[str] = None,
653        version: Optional[str] = None,
654        release: Optional[str] = None,
655        metadata: Optional[Any] = None,
656        tags: Optional[List[str]] = None,
657        public: Optional[bool] = None,
658    ):
659        """Set parameters for the current trace, updating the trace's metadata and context information.
660
661        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
662        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
663        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
664
665        Arguments:
666            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
667            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
668            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
669            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
670            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
671            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
672            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
673
674        Returns:
675            None
676
677        Note:
678            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
679            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
680            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
681        """
682        trace_id = self.get_current_trace_id()
683
684        if trace_id is None:
685            self._log.warn("No trace found in the current context")
686
687            return
688
689        params_to_update = {
690            k: v
691            for k, v in {
692                "name": name,
693                "user_id": user_id,
694                "session_id": session_id,
695                "version": version,
696                "release": release,
697                "metadata": metadata,
698                "tags": tags,
699                "public": public,
700            }.items()
701            if v is not None
702        }
703
704        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
706    def update_current_observation(
707        self,
708        *,
709        input: Optional[Any] = None,
710        output: Optional[Any] = None,
711        name: Optional[str] = None,
712        version: Optional[str] = None,
713        metadata: Optional[Any] = None,
714        start_time: Optional[datetime] = None,
715        end_time: Optional[datetime] = None,
716        release: Optional[str] = None,
717        tags: Optional[List[str]] = None,
718        user_id: Optional[str] = None,
719        session_id: Optional[str] = None,
720        level: Optional[SpanLevel] = None,
721        status_message: Optional[str] = None,
722        completion_start_time: Optional[datetime] = None,
723        model: Optional[str] = None,
724        model_parameters: Optional[Dict[str, MapValue]] = None,
725        usage: Optional[Union[BaseModel, ModelUsage]] = None,
726        prompt: Optional[PromptClient] = None,
727        public: Optional[bool] = None,
728    ):
729        """Update parameters for the current observation within an active trace context.
730
731        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
732        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
733        enhancing the observability and traceability of the execution context.
734
735        Note that if a param is not available on a specific observation type, it will be ignored.
736
737        Shared params:
738            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
739            - `output` (Optional[Any]): The output or result of the trace or observation
740            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
741            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
742            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
743            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
744            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
745
746        Trace-specific params:
747            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
748            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
749            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
750            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
751            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
752
753        Span-specific params:
754            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
755            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
756
757        Generation-specific params:
758            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
759            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
760            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
761            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
762
763        Returns:
764            None
765
766        Raises:
767            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
768
769        Note:
770            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
771            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
772            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
773        """
774        stack = _observation_stack_context.get()
775        observation = stack[-1] if stack else None
776
777        if not observation:
778            self._log.warn("No observation found in the current context")
779
780            return
781
782        update_params = {
783            k: v
784            for k, v in {
785                "input": input,
786                "output": output,
787                "name": name,
788                "version": version,
789                "metadata": metadata,
790                "start_time": start_time,
791                "end_time": end_time,
792                "release": release,
793                "tags": tags,
794                "user_id": user_id,
795                "session_id": session_id,
796                "level": level,
797                "status_message": status_message,
798                "completion_start_time": completion_start_time,
799                "model": model,
800                "model_parameters": model_parameters,
801                "usage": usage,
802                "prompt": prompt,
803                "public": public,
804            }.items()
805            if v is not None
806        }
807
808        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
810    def score_current_observation(
811        self,
812        *,
813        name: str,
814        value: Union[float, str],
815        data_type: Optional[ScoreDataType] = None,
816        comment: Optional[str] = None,
817        id: Optional[str] = None,
818        config_id: Optional[str] = None,
819    ):
820        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
821
822        Arguments:
823            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
824            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
825            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
826              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
827            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
828            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
829            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
830
831        Returns:
832            None
833
834        Note:
835            This method is intended to be used within the context of an active trace or observation.
836        """
837        try:
838            langfuse = self._get_langfuse()
839            trace_id = self.get_current_trace_id()
840            current_observation_id = self.get_current_observation_id()
841
842            observation_id = (
843                current_observation_id if current_observation_id != trace_id else None
844            )
845
846            if trace_id:
847                langfuse.score(
848                    trace_id=trace_id,
849                    observation_id=observation_id,
850                    name=name,
851                    value=value,
852                    data_type=data_type,
853                    comment=comment,
854                    id=id,
855                    config_id=config_id,
856                )
857            else:
858                raise ValueError("No trace or observation found in the current context")
859
860        except Exception as e:
861            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
863    def score_current_trace(
864        self,
865        *,
866        name: str,
867        value: Union[float, str],
868        data_type: Optional[ScoreDataType] = None,
869        comment: Optional[str] = None,
870        id: Optional[str] = None,
871        config_id: Optional[str] = None,
872    ):
873        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
874
875        Arguments:
876            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
877            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
878            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
879              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
880            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
881            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
882            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
883
884        Returns:
885            None
886
887        Note:
888            This method is intended to be used within the context of an active trace or observation.
889        """
890        try:
891            langfuse = self._get_langfuse()
892            trace_id = self.get_current_trace_id()
893
894            if trace_id:
895                langfuse.score(
896                    trace_id=trace_id,
897                    name=name,
898                    value=value,
899                    data_type=data_type,
900                    comment=comment,
901                    id=id,
902                    config_id=config_id,
903                )
904            else:
905                raise ValueError("No trace found in the current context")
906
907        except Exception as e:
908            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
910    @catch_and_log_errors
911    def flush(self):
912        """Force immediate flush of all buffered observations to the Langfuse backend.
913
914        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
915        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
916
917        Usage:
918            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
919            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
920
921        Returns:
922            None
923
924        Raises:
925            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
926
927        Note:
928            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
929            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
930            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
931        """
932        langfuse = self._get_langfuse()
933        if langfuse:
934            langfuse.flush()
935        else:
936            self._log.warn("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None):
938    def configure(
939        self,
940        *,
941        public_key: Optional[str] = None,
942        secret_key: Optional[str] = None,
943        host: Optional[str] = None,
944        release: Optional[str] = None,
945        debug: Optional[bool] = None,
946        threads: Optional[int] = None,
947        flush_at: Optional[int] = None,
948        flush_interval: Optional[int] = None,
949        max_retries: Optional[int] = None,
950        timeout: Optional[int] = None,
951        httpx_client: Optional[httpx.Client] = None,
952        enabled: Optional[bool] = None,
953    ):
954        """Configure the Langfuse client.
955
956        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
957
958        Args:
959            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
960            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
961            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
962            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
963            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
964            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
965            flush_at: Max batch size that's sent to the API.
966            flush_interval: Max delay until a new batch is sent to the API.
967            max_retries: Max number of retries in case of API/network errors.
968            timeout: Timeout of API requests in seconds. Default is 20 seconds.
969            httpx_client: Pass your own httpx client for more customizability of requests.
970            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
971        """
972        langfuse_singleton = LangfuseSingleton()
973        langfuse_singleton.reset()
974
975        langfuse_singleton.get(
976            public_key=public_key,
977            secret_key=secret_key,
978            host=host,
979            release=release,
980            debug=debug,
981            threads=threads,
982            flush_at=flush_at,
983            flush_interval=flush_interval,
984            max_retries=max_retries,
985            timeout=timeout,
986            httpx_client=httpx_client,
987            enabled=enabled,
988        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
def auth_check(self) -> bool:
1002    def auth_check(self) -> bool:
1003        """Check if the current Langfuse client is authenticated.
1004
1005        Returns:
1006            bool: True if the client is authenticated, False otherwise
1007        """
1008        try:
1009            langfuse = self._get_langfuse()
1010
1011            return langfuse.auth_check()
1012        except Exception as e:
1013            self._log.error("No Langfuse object found in the current context", e)
1014
1015            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise