langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
 95    def observe(
 96        self,
 97        *,
 98        name: Optional[str] = None,
 99        as_type: Optional[Literal["generation"]] = None,
100        capture_input: bool = True,
101        capture_output: bool = True,
102        transform_to_string: Optional[Callable[[Iterable], str]] = None,
103    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
104        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
105
106        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
107        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
108
109        Attributes:
110            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
111            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
112            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
113            capture_output (bool): If True, captures the return value of the function as output. Default is True.
114            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
115
116        Returns:
117            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
118
119        Example:
120            For general tracing (functions/methods):
121            ```python
122            @observe()
123            def your_function(args):
124                # Your implementation here
125            ```
126            For observing language model generations:
127            ```python
128            @observe(as_type="generation")
129            def your_LLM_function(args):
130                # Your LLM invocation here
131            ```
132
133        Raises:
134            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
135
136        Note:
137        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
138        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
139        """
140
141        def decorator(func: Callable[P, R]) -> Callable[P, R]:
142            return (
143                self._async_observe(
144                    func,
145                    name=name,
146                    as_type=as_type,
147                    capture_input=capture_input,
148                    capture_output=capture_output,
149                    transform_to_string=transform_to_string,
150                )
151                if asyncio.iscoroutinefunction(func)
152                else self._sync_observe(
153                    func,
154                    name=name,
155                    as_type=as_type,
156                    capture_input=capture_input,
157                    capture_output=capture_output,
158                    transform_to_string=transform_to_string,
159                )
160            )
161
162        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  92class LangfuseDecorator:
  93    _log = logging.getLogger("langfuse")
  94
  95    def observe(
  96        self,
  97        *,
  98        name: Optional[str] = None,
  99        as_type: Optional[Literal["generation"]] = None,
 100        capture_input: bool = True,
 101        capture_output: bool = True,
 102        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 103    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
 104        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 105
 106        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 107        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 108
 109        Attributes:
 110            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 111            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 112            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 113            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 114            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 115
 116        Returns:
 117            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 118
 119        Example:
 120            For general tracing (functions/methods):
 121            ```python
 122            @observe()
 123            def your_function(args):
 124                # Your implementation here
 125            ```
 126            For observing language model generations:
 127            ```python
 128            @observe(as_type="generation")
 129            def your_LLM_function(args):
 130                # Your LLM invocation here
 131            ```
 132
 133        Raises:
 134            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 135
 136        Note:
 137        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 138        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 139        """
 140
 141        def decorator(func: Callable[P, R]) -> Callable[P, R]:
 142            return (
 143                self._async_observe(
 144                    func,
 145                    name=name,
 146                    as_type=as_type,
 147                    capture_input=capture_input,
 148                    capture_output=capture_output,
 149                    transform_to_string=transform_to_string,
 150                )
 151                if asyncio.iscoroutinefunction(func)
 152                else self._sync_observe(
 153                    func,
 154                    name=name,
 155                    as_type=as_type,
 156                    capture_input=capture_input,
 157                    capture_output=capture_output,
 158                    transform_to_string=transform_to_string,
 159                )
 160            )
 161
 162        return decorator
 163
 164    def _async_observe(
 165        self,
 166        func: F,
 167        *,
 168        name: Optional[str],
 169        as_type: Optional[Literal["generation"]],
 170        capture_input: bool,
 171        capture_output: bool,
 172        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 173    ) -> F:
 174        @wraps(func)
 175        async def async_wrapper(*args, **kwargs):
 176            observation = self._prepare_call(
 177                name=name or func.__name__,
 178                as_type=as_type,
 179                capture_input=capture_input,
 180                is_method=self._is_method(func),
 181                func_args=args,
 182                func_kwargs=kwargs,
 183            )
 184            result = None
 185
 186            try:
 187                result = await func(*args, **kwargs)
 188            except Exception as e:
 189                self._handle_exception(observation, e)
 190            finally:
 191                result = self._finalize_call(
 192                    observation, result, capture_output, transform_to_string
 193                )
 194
 195                # Returning from finally block may swallow errors, so only return if result is not None
 196                if result is not None:
 197                    return result
 198
 199        return cast(F, async_wrapper)
 200
 201    def _sync_observe(
 202        self,
 203        func: F,
 204        *,
 205        name: Optional[str],
 206        as_type: Optional[Literal["generation"]],
 207        capture_input: bool,
 208        capture_output: bool,
 209        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 210    ) -> F:
 211        @wraps(func)
 212        def sync_wrapper(*args, **kwargs):
 213            observation = self._prepare_call(
 214                name=name or func.__name__,
 215                as_type=as_type,
 216                capture_input=capture_input,
 217                is_method=self._is_method(func),
 218                func_args=args,
 219                func_kwargs=kwargs,
 220            )
 221            result = None
 222
 223            try:
 224                result = func(*args, **kwargs)
 225            except Exception as e:
 226                self._handle_exception(observation, e)
 227            finally:
 228                result = self._finalize_call(
 229                    observation, result, capture_output, transform_to_string
 230                )
 231
 232                # Returning from finally block may swallow errors, so only return if result is not None
 233                if result is not None:
 234                    return result
 235
 236        return cast(F, sync_wrapper)
 237
 238    @staticmethod
 239    def _is_method(func: Callable) -> bool:
 240        """Check if a callable is likely an class or instance method based on its signature.
 241
 242        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 243
 244        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 245
 246        Returns:
 247        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 248        """
 249        return (
 250            "self" in inspect.signature(func).parameters
 251            or "cls" in inspect.signature(func).parameters
 252        )
 253
 254    def _prepare_call(
 255        self,
 256        *,
 257        name: str,
 258        as_type: Optional[Literal["generation"]],
 259        capture_input: bool,
 260        is_method: bool = False,
 261        func_args: Tuple = (),
 262        func_kwargs: Dict = {},
 263    ) -> Optional[
 264        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 265    ]:
 266        try:
 267            langfuse = self._get_langfuse()
 268            stack = _observation_stack_context.get().copy()
 269            parent = stack[-1] if stack else None
 270
 271            # Collect default observation data
 272            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 273            id = str(observation_id) if observation_id else None
 274            start_time = _get_timestamp()
 275
 276            input = (
 277                self._get_input_from_func_args(
 278                    is_method=is_method,
 279                    func_args=func_args,
 280                    func_kwargs=func_kwargs,
 281                )
 282                if capture_input
 283                else None
 284            )
 285
 286            params = {
 287                "id": id,
 288                "name": name,
 289                "start_time": start_time,
 290                "input": input,
 291            }
 292
 293            # Create observation
 294            if parent and as_type == "generation":
 295                observation = parent.generation(**params)
 296            elif as_type == "generation":
 297                # Create wrapper trace if generation is top-level
 298                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 299                trace = langfuse.trace(id=id, name=name, start_time=start_time)
 300                observation = langfuse.generation(
 301                    name=name, start_time=start_time, input=input, trace_id=trace.id
 302                )
 303            elif parent:
 304                observation = parent.span(**params)
 305            else:
 306                params["id"] = self._get_context_trace_id() or params["id"]
 307                observation = langfuse.trace(**params)
 308
 309            _observation_stack_context.set(stack + [observation])
 310
 311            return observation
 312        except Exception as e:
 313            self._log.error(f"Failed to prepare observation: {e}")
 314
 315    def _get_input_from_func_args(
 316        self,
 317        *,
 318        is_method: bool = False,
 319        func_args: Tuple = (),
 320        func_kwargs: Dict = {},
 321    ) -> Any:
 322        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 323        logged_args = func_args[1:] if is_method else func_args
 324        raw_input = {
 325            "args": logged_args,
 326            "kwargs": func_kwargs,
 327        }
 328
 329        # Serialize and deserialize to ensure proper JSON serialization.
 330        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 331        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 332
 333    def _get_context_trace_id(self):
 334        context_trace_id = _root_trace_id_context.get()
 335
 336        if context_trace_id is not None:
 337            # Clear the context trace ID to avoid leaking it to other traces
 338            _root_trace_id_context.set(None)
 339
 340            return context_trace_id
 341
 342        return None
 343
 344    def _finalize_call(
 345        self,
 346        observation: Optional[
 347            Union[
 348                StatefulSpanClient,
 349                StatefulTraceClient,
 350                StatefulGenerationClient,
 351            ]
 352        ],
 353        result: Any,
 354        capture_output: bool,
 355        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 356    ):
 357        if inspect.isgenerator(result):
 358            return self._wrap_sync_generator_result(
 359                observation, result, capture_output, transform_to_string
 360            )
 361        elif inspect.isasyncgen(result):
 362            return self._wrap_async_generator_result(
 363                observation, result, capture_output, transform_to_string
 364            )
 365
 366        else:
 367            return self._handle_call_result(observation, result, capture_output)
 368
 369    def _handle_call_result(
 370        self,
 371        observation: Optional[
 372            Union[
 373                StatefulSpanClient,
 374                StatefulTraceClient,
 375                StatefulGenerationClient,
 376            ]
 377        ],
 378        result: Any,
 379        capture_output: bool,
 380    ):
 381        try:
 382            if observation is None:
 383                raise ValueError("No observation found in the current context")
 384
 385            # Collect final observation data
 386            observation_params = _observation_params_context.get()[
 387                observation.id
 388            ].copy()
 389            del _observation_params_context.get()[
 390                observation.id
 391            ]  # Remove observation params to avoid leaking
 392
 393            end_time = observation_params["end_time"] or _get_timestamp()
 394            raw_output = observation_params["output"] or (
 395                result if result and capture_output else None
 396            )
 397
 398            # Serialize and deserialize to ensure proper JSON serialization.
 399            # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 400            output = json.loads(json.dumps(raw_output, cls=EventSerializer))
 401            observation_params.update(end_time=end_time, output=output)
 402
 403            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 404                observation.end(**observation_params)
 405            elif isinstance(observation, StatefulTraceClient):
 406                observation.update(**observation_params)
 407
 408            # Remove observation from top of stack
 409            stack = _observation_stack_context.get()
 410            _observation_stack_context.set(stack[:-1])
 411
 412        except Exception as e:
 413            self._log.error(f"Failed to finalize observation: {e}")
 414
 415        finally:
 416            return result
 417
 418    def _handle_exception(
 419        self,
 420        observation: Optional[
 421            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 422        ],
 423        e: Exception,
 424    ):
 425        if observation:
 426            _observation_params_context.get()[observation.id].update(
 427                level="ERROR", status_message=str(e)
 428            )
 429        raise e
 430
 431    def _wrap_sync_generator_result(
 432        self,
 433        observation: Optional[
 434            Union[
 435                StatefulSpanClient,
 436                StatefulTraceClient,
 437                StatefulGenerationClient,
 438            ]
 439        ],
 440        generator: Generator,
 441        capture_output: bool,
 442        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 443    ):
 444        items = []
 445
 446        try:
 447            for item in generator:
 448                items.append(item)
 449
 450                yield item
 451
 452        finally:
 453            output = items
 454
 455            if transform_to_string is not None:
 456                output = transform_to_string(items)
 457
 458            elif all(isinstance(item, str) for item in items):
 459                output = "".join(items)
 460
 461            self._handle_call_result(observation, output, capture_output)
 462
 463    async def _wrap_async_generator_result(
 464        self,
 465        observation: Optional[
 466            Union[
 467                StatefulSpanClient,
 468                StatefulTraceClient,
 469                StatefulGenerationClient,
 470            ]
 471        ],
 472        generator: AsyncGenerator,
 473        capture_output: bool,
 474        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 475    ) -> AsyncGenerator:
 476        items = []
 477
 478        try:
 479            async for item in generator:
 480                items.append(item)
 481
 482                yield item
 483
 484        finally:
 485            output = items
 486
 487            if transform_to_string is not None:
 488                output = transform_to_string(items)
 489
 490            elif all(isinstance(item, str) for item in items):
 491                output = "".join(items)
 492
 493            self._handle_call_result(observation, output, capture_output)
 494
 495    def get_current_llama_index_handler(self):
 496        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 497
 498        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 499        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 500
 501        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 502
 503        Returns:
 504            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 505
 506        Note:
 507            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 508            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 509        """
 510        try:
 511            from langfuse.llama_index import LlamaIndexCallbackHandler
 512        except ImportError:
 513            self._log.error(
 514                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 515            )
 516
 517            return None
 518
 519        observation = _observation_stack_context.get()[-1]
 520
 521        if observation is None:
 522            self._log.warning("No observation found in the current context")
 523
 524            return None
 525
 526        if isinstance(observation, StatefulGenerationClient):
 527            self._log.warning(
 528                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 529            )
 530
 531            return None
 532
 533        callback_handler = LlamaIndexCallbackHandler()
 534        callback_handler.set_root(observation)
 535
 536        return callback_handler
 537
 538    def get_current_langchain_handler(self):
 539        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 540
 541        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 542        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 543
 544        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 545
 546        Returns:
 547            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 548
 549        Note:
 550            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 551            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 552        """
 553        observation = _observation_stack_context.get()[-1]
 554
 555        if observation is None:
 556            self._log.warning("No observation found in the current context")
 557
 558            return None
 559
 560        if isinstance(observation, StatefulGenerationClient):
 561            self._log.warning(
 562                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 563            )
 564
 565            return None
 566
 567        return observation.get_langchain_handler()
 568
 569    def get_current_trace_id(self):
 570        """Retrieve the ID of the current trace from the observation stack context.
 571
 572        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 573        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 574        representing the entry point of the traced execution context.
 575
 576        Returns:
 577            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 578            possibly due to the method being called outside of any @observe-decorated function execution.
 579
 580        Note:
 581            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 582            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 583        """
 584        stack = _observation_stack_context.get()
 585        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 586
 587        if not stack:
 588            if should_log_warning:
 589                self._log.warning("No trace found in the current context")
 590
 591            return None
 592
 593        return stack[0].id
 594
 595    def _get_caller_module_name(self):
 596        try:
 597            caller_module = inspect.getmodule(inspect.stack()[2][0])
 598        except Exception as e:
 599            self._log.warning(f"Failed to get caller module: {e}")
 600
 601            return None
 602
 603        return caller_module.__name__ if caller_module else None
 604
 605    def get_current_trace_url(self) -> Optional[str]:
 606        """Retrieve the URL of the current trace in context.
 607
 608        Returns:
 609            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 610            possibly due to the method being called outside of any @observe-decorated function execution.
 611
 612        Note:
 613            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 614            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 615        """
 616        try:
 617            trace_id = self.get_current_trace_id()
 618            langfuse = self._get_langfuse()
 619
 620            if not trace_id:
 621                raise ValueError("No trace found in the current context")
 622
 623            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
 624
 625        except Exception as e:
 626            self._log.error(f"Failed to get current trace URL: {e}")
 627
 628            return None
 629
 630    def get_current_observation_id(self):
 631        """Retrieve the ID of the current observation in context.
 632
 633        Returns:
 634            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 635            possibly due to the method being called outside of any @observe-decorated function execution.
 636
 637        Note:
 638            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 639            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 640            - If called at the top level of a trace, it will return the trace ID.
 641        """
 642        stack = _observation_stack_context.get()
 643        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 644
 645        if not stack:
 646            if should_log_warning:
 647                self._log.warning("No observation found in the current context")
 648
 649            return None
 650
 651        return stack[-1].id
 652
 653    def update_current_trace(
 654        self,
 655        name: Optional[str] = None,
 656        input: Optional[Any] = None,
 657        output: Optional[Any] = None,
 658        user_id: Optional[str] = None,
 659        session_id: Optional[str] = None,
 660        version: Optional[str] = None,
 661        release: Optional[str] = None,
 662        metadata: Optional[Any] = None,
 663        tags: Optional[List[str]] = None,
 664        public: Optional[bool] = None,
 665    ):
 666        """Set parameters for the current trace, updating the trace's metadata and context information.
 667
 668        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 669        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 670        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 671
 672        Arguments:
 673            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 674            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
 675            output (Optional[Any]): The output or result of the trace
 676            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 677            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 678            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 679            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 680            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 681            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 682
 683        Returns:
 684            None
 685
 686        Note:
 687            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 688            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 689            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 690        """
 691        trace_id = self.get_current_trace_id()
 692
 693        if trace_id is None:
 694            self._log.warning("No trace found in the current context")
 695
 696            return
 697
 698        params_to_update = {
 699            k: v
 700            for k, v in {
 701                "name": name,
 702                "input": input,
 703                "output": output,
 704                "user_id": user_id,
 705                "session_id": session_id,
 706                "version": version,
 707                "release": release,
 708                "metadata": metadata,
 709                "tags": tags,
 710                "public": public,
 711            }.items()
 712            if v is not None
 713        }
 714
 715        _observation_params_context.get()[trace_id].update(params_to_update)
 716
 717    def update_current_observation(
 718        self,
 719        *,
 720        input: Optional[Any] = None,
 721        output: Optional[Any] = None,
 722        name: Optional[str] = None,
 723        version: Optional[str] = None,
 724        metadata: Optional[Any] = None,
 725        start_time: Optional[datetime] = None,
 726        end_time: Optional[datetime] = None,
 727        release: Optional[str] = None,
 728        tags: Optional[List[str]] = None,
 729        user_id: Optional[str] = None,
 730        session_id: Optional[str] = None,
 731        level: Optional[SpanLevel] = None,
 732        status_message: Optional[str] = None,
 733        completion_start_time: Optional[datetime] = None,
 734        model: Optional[str] = None,
 735        model_parameters: Optional[Dict[str, MapValue]] = None,
 736        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 737        prompt: Optional[PromptClient] = None,
 738        public: Optional[bool] = None,
 739    ):
 740        """Update parameters for the current observation within an active trace context.
 741
 742        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 743        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 744        enhancing the observability and traceability of the execution context.
 745
 746        Note that if a param is not available on a specific observation type, it will be ignored.
 747
 748        Shared params:
 749            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 750            - `output` (Optional[Any]): The output or result of the trace or observation
 751            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 752            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 753            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 754            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 755            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 756
 757        Trace-specific params:
 758            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 759            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 760            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 761            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 762            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 763
 764        Span-specific params:
 765            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 766            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 767
 768        Generation-specific params:
 769            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 770            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 771            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 772            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 773
 774        Returns:
 775            None
 776
 777        Raises:
 778            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 779
 780        Note:
 781            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 782            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 783            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 784        """
 785        stack = _observation_stack_context.get()
 786        observation = stack[-1] if stack else None
 787
 788        if not observation:
 789            self._log.warning("No observation found in the current context")
 790
 791            return
 792
 793        update_params = {
 794            k: v
 795            for k, v in {
 796                "input": input,
 797                "output": output,
 798                "name": name,
 799                "version": version,
 800                "metadata": metadata,
 801                "start_time": start_time,
 802                "end_time": end_time,
 803                "release": release,
 804                "tags": tags,
 805                "user_id": user_id,
 806                "session_id": session_id,
 807                "level": level,
 808                "status_message": status_message,
 809                "completion_start_time": completion_start_time,
 810                "model": model,
 811                "model_parameters": model_parameters,
 812                "usage": usage,
 813                "prompt": prompt,
 814                "public": public,
 815            }.items()
 816            if v is not None
 817        }
 818
 819        _observation_params_context.get()[observation.id].update(update_params)
 820
 821    def score_current_observation(
 822        self,
 823        *,
 824        name: str,
 825        value: Union[float, str],
 826        data_type: Optional[ScoreDataType] = None,
 827        comment: Optional[str] = None,
 828        id: Optional[str] = None,
 829        config_id: Optional[str] = None,
 830    ):
 831        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 832
 833        Arguments:
 834            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 835            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 836            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 837              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 838            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 839            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 840            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 841
 842        Returns:
 843            None
 844
 845        Note:
 846            This method is intended to be used within the context of an active trace or observation.
 847        """
 848        try:
 849            langfuse = self._get_langfuse()
 850            trace_id = self.get_current_trace_id()
 851            current_observation_id = self.get_current_observation_id()
 852
 853            observation_id = (
 854                current_observation_id if current_observation_id != trace_id else None
 855            )
 856
 857            if trace_id:
 858                langfuse.score(
 859                    trace_id=trace_id,
 860                    observation_id=observation_id,
 861                    name=name,
 862                    value=value,
 863                    data_type=data_type,
 864                    comment=comment,
 865                    id=id,
 866                    config_id=config_id,
 867                )
 868            else:
 869                raise ValueError("No trace or observation found in the current context")
 870
 871        except Exception as e:
 872            self._log.error(f"Failed to score observation: {e}")
 873
 874    def score_current_trace(
 875        self,
 876        *,
 877        name: str,
 878        value: Union[float, str],
 879        data_type: Optional[ScoreDataType] = None,
 880        comment: Optional[str] = None,
 881        id: Optional[str] = None,
 882        config_id: Optional[str] = None,
 883    ):
 884        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 885
 886        Arguments:
 887            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 888            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 889            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 890              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 891            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 892            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 893            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 894
 895        Returns:
 896            None
 897
 898        Note:
 899            This method is intended to be used within the context of an active trace or observation.
 900        """
 901        try:
 902            langfuse = self._get_langfuse()
 903            trace_id = self.get_current_trace_id()
 904
 905            if trace_id:
 906                langfuse.score(
 907                    trace_id=trace_id,
 908                    name=name,
 909                    value=value,
 910                    data_type=data_type,
 911                    comment=comment,
 912                    id=id,
 913                    config_id=config_id,
 914                )
 915            else:
 916                raise ValueError("No trace found in the current context")
 917
 918        except Exception as e:
 919            self._log.error(f"Failed to score observation: {e}")
 920
 921    @catch_and_log_errors
 922    def flush(self):
 923        """Force immediate flush of all buffered observations to the Langfuse backend.
 924
 925        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
 926        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
 927
 928        Usage:
 929            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
 930            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
 931
 932        Returns:
 933            None
 934
 935        Raises:
 936            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
 937
 938        Note:
 939            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
 940            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
 941            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
 942        """
 943        langfuse = self._get_langfuse()
 944        if langfuse:
 945            langfuse.flush()
 946        else:
 947            self._log.warning("No langfuse object found in the current context")
 948
 949    def configure(
 950        self,
 951        *,
 952        public_key: Optional[str] = None,
 953        secret_key: Optional[str] = None,
 954        host: Optional[str] = None,
 955        release: Optional[str] = None,
 956        debug: Optional[bool] = None,
 957        threads: Optional[int] = None,
 958        flush_at: Optional[int] = None,
 959        flush_interval: Optional[int] = None,
 960        max_retries: Optional[int] = None,
 961        timeout: Optional[int] = None,
 962        httpx_client: Optional[httpx.Client] = None,
 963        enabled: Optional[bool] = None,
 964    ):
 965        """Configure the Langfuse client.
 966
 967        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
 968
 969        Args:
 970            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 971            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 972            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 973            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 974            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 975            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 976            flush_at: Max batch size that's sent to the API.
 977            flush_interval: Max delay until a new batch is sent to the API.
 978            max_retries: Max number of retries in case of API/network errors.
 979            timeout: Timeout of API requests in seconds. Default is 20 seconds.
 980            httpx_client: Pass your own httpx client for more customizability of requests.
 981            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
 982        """
 983        langfuse_singleton = LangfuseSingleton()
 984        langfuse_singleton.reset()
 985
 986        langfuse_singleton.get(
 987            public_key=public_key,
 988            secret_key=secret_key,
 989            host=host,
 990            release=release,
 991            debug=debug,
 992            threads=threads,
 993            flush_at=flush_at,
 994            flush_interval=flush_interval,
 995            max_retries=max_retries,
 996            timeout=timeout,
 997            httpx_client=httpx_client,
 998            enabled=enabled,
 999        )
1000
1001    def _get_langfuse(self) -> Langfuse:
1002        return LangfuseSingleton().get()
1003
1004    def _set_root_trace_id(self, trace_id: str):
1005        if _observation_stack_context.get():
1006            self._log.warning(
1007                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
1008            )
1009            return
1010
1011        _root_trace_id_context.set(trace_id)
1012
1013    def auth_check(self) -> bool:
1014        """Check if the current Langfuse client is authenticated.
1015
1016        Returns:
1017            bool: True if the client is authenticated, False otherwise
1018        """
1019        try:
1020            langfuse = self._get_langfuse()
1021
1022            return langfuse.auth_check()
1023        except Exception as e:
1024            self._log.error("No Langfuse object found in the current context", e)
1025
1026            return False
def observe( self, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
 95    def observe(
 96        self,
 97        *,
 98        name: Optional[str] = None,
 99        as_type: Optional[Literal["generation"]] = None,
100        capture_input: bool = True,
101        capture_output: bool = True,
102        transform_to_string: Optional[Callable[[Iterable], str]] = None,
103    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
104        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
105
106        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
107        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
108
109        Attributes:
110            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
111            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
112            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
113            capture_output (bool): If True, captures the return value of the function as output. Default is True.
114            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
115
116        Returns:
117            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
118
119        Example:
120            For general tracing (functions/methods):
121            ```python
122            @observe()
123            def your_function(args):
124                # Your implementation here
125            ```
126            For observing language model generations:
127            ```python
128            @observe(as_type="generation")
129            def your_LLM_function(args):
130                # Your LLM invocation here
131            ```
132
133        Raises:
134            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
135
136        Note:
137        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
138        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
139        """
140
141        def decorator(func: Callable[P, R]) -> Callable[P, R]:
142            return (
143                self._async_observe(
144                    func,
145                    name=name,
146                    as_type=as_type,
147                    capture_input=capture_input,
148                    capture_output=capture_output,
149                    transform_to_string=transform_to_string,
150                )
151                if asyncio.iscoroutinefunction(func)
152                else self._sync_observe(
153                    func,
154                    name=name,
155                    as_type=as_type,
156                    capture_input=capture_input,
157                    capture_output=capture_output,
158                    transform_to_string=transform_to_string,
159                )
160            )
161
162        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
495    def get_current_llama_index_handler(self):
496        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
497
498        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
499        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
500
501        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
502
503        Returns:
504            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
505
506        Note:
507            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
508            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
509        """
510        try:
511            from langfuse.llama_index import LlamaIndexCallbackHandler
512        except ImportError:
513            self._log.error(
514                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
515            )
516
517            return None
518
519        observation = _observation_stack_context.get()[-1]
520
521        if observation is None:
522            self._log.warning("No observation found in the current context")
523
524            return None
525
526        if isinstance(observation, StatefulGenerationClient):
527            self._log.warning(
528                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
529            )
530
531            return None
532
533        callback_handler = LlamaIndexCallbackHandler()
534        callback_handler.set_root(observation)
535
536        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
538    def get_current_langchain_handler(self):
539        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
540
541        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
542        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
543
544        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
545
546        Returns:
547            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
548
549        Note:
550            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
551            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
552        """
553        observation = _observation_stack_context.get()[-1]
554
555        if observation is None:
556            self._log.warning("No observation found in the current context")
557
558            return None
559
560        if isinstance(observation, StatefulGenerationClient):
561            self._log.warning(
562                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
563            )
564
565            return None
566
567        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
569    def get_current_trace_id(self):
570        """Retrieve the ID of the current trace from the observation stack context.
571
572        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
573        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
574        representing the entry point of the traced execution context.
575
576        Returns:
577            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
578            possibly due to the method being called outside of any @observe-decorated function execution.
579
580        Note:
581            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
582            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
583        """
584        stack = _observation_stack_context.get()
585        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
586
587        if not stack:
588            if should_log_warning:
589                self._log.warning("No trace found in the current context")
590
591            return None
592
593        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
605    def get_current_trace_url(self) -> Optional[str]:
606        """Retrieve the URL of the current trace in context.
607
608        Returns:
609            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
610            possibly due to the method being called outside of any @observe-decorated function execution.
611
612        Note:
613            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
614            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
615        """
616        try:
617            trace_id = self.get_current_trace_id()
618            langfuse = self._get_langfuse()
619
620            if not trace_id:
621                raise ValueError("No trace found in the current context")
622
623            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
624
625        except Exception as e:
626            self._log.error(f"Failed to get current trace URL: {e}")
627
628            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
630    def get_current_observation_id(self):
631        """Retrieve the ID of the current observation in context.
632
633        Returns:
634            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
635            possibly due to the method being called outside of any @observe-decorated function execution.
636
637        Note:
638            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
639            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
640            - If called at the top level of a trace, it will return the trace ID.
641        """
642        stack = _observation_stack_context.get()
643        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
644
645        if not stack:
646            if should_log_warning:
647                self._log.warning("No observation found in the current context")
648
649            return None
650
651        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, input: Optional[Any] = None, output: Optional[Any] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
653    def update_current_trace(
654        self,
655        name: Optional[str] = None,
656        input: Optional[Any] = None,
657        output: Optional[Any] = None,
658        user_id: Optional[str] = None,
659        session_id: Optional[str] = None,
660        version: Optional[str] = None,
661        release: Optional[str] = None,
662        metadata: Optional[Any] = None,
663        tags: Optional[List[str]] = None,
664        public: Optional[bool] = None,
665    ):
666        """Set parameters for the current trace, updating the trace's metadata and context information.
667
668        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
669        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
670        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
671
672        Arguments:
673            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
674            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
675            output (Optional[Any]): The output or result of the trace
676            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
677            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
678            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
679            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
680            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
681            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
682
683        Returns:
684            None
685
686        Note:
687            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
688            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
689            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
690        """
691        trace_id = self.get_current_trace_id()
692
693        if trace_id is None:
694            self._log.warning("No trace found in the current context")
695
696            return
697
698        params_to_update = {
699            k: v
700            for k, v in {
701                "name": name,
702                "input": input,
703                "output": output,
704                "user_id": user_id,
705                "session_id": session_id,
706                "version": version,
707                "release": release,
708                "metadata": metadata,
709                "tags": tags,
710                "public": public,
711            }.items()
712            if v is not None
713        }
714
715        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
717    def update_current_observation(
718        self,
719        *,
720        input: Optional[Any] = None,
721        output: Optional[Any] = None,
722        name: Optional[str] = None,
723        version: Optional[str] = None,
724        metadata: Optional[Any] = None,
725        start_time: Optional[datetime] = None,
726        end_time: Optional[datetime] = None,
727        release: Optional[str] = None,
728        tags: Optional[List[str]] = None,
729        user_id: Optional[str] = None,
730        session_id: Optional[str] = None,
731        level: Optional[SpanLevel] = None,
732        status_message: Optional[str] = None,
733        completion_start_time: Optional[datetime] = None,
734        model: Optional[str] = None,
735        model_parameters: Optional[Dict[str, MapValue]] = None,
736        usage: Optional[Union[BaseModel, ModelUsage]] = None,
737        prompt: Optional[PromptClient] = None,
738        public: Optional[bool] = None,
739    ):
740        """Update parameters for the current observation within an active trace context.
741
742        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
743        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
744        enhancing the observability and traceability of the execution context.
745
746        Note that if a param is not available on a specific observation type, it will be ignored.
747
748        Shared params:
749            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
750            - `output` (Optional[Any]): The output or result of the trace or observation
751            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
752            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
753            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
754            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
755            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
756
757        Trace-specific params:
758            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
759            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
760            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
761            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
762            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
763
764        Span-specific params:
765            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
766            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
767
768        Generation-specific params:
769            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
770            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
771            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
772            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
773
774        Returns:
775            None
776
777        Raises:
778            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
779
780        Note:
781            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
782            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
783            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
784        """
785        stack = _observation_stack_context.get()
786        observation = stack[-1] if stack else None
787
788        if not observation:
789            self._log.warning("No observation found in the current context")
790
791            return
792
793        update_params = {
794            k: v
795            for k, v in {
796                "input": input,
797                "output": output,
798                "name": name,
799                "version": version,
800                "metadata": metadata,
801                "start_time": start_time,
802                "end_time": end_time,
803                "release": release,
804                "tags": tags,
805                "user_id": user_id,
806                "session_id": session_id,
807                "level": level,
808                "status_message": status_message,
809                "completion_start_time": completion_start_time,
810                "model": model,
811                "model_parameters": model_parameters,
812                "usage": usage,
813                "prompt": prompt,
814                "public": public,
815            }.items()
816            if v is not None
817        }
818
819        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
821    def score_current_observation(
822        self,
823        *,
824        name: str,
825        value: Union[float, str],
826        data_type: Optional[ScoreDataType] = None,
827        comment: Optional[str] = None,
828        id: Optional[str] = None,
829        config_id: Optional[str] = None,
830    ):
831        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
832
833        Arguments:
834            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
835            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
836            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
837              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
838            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
839            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
840            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
841
842        Returns:
843            None
844
845        Note:
846            This method is intended to be used within the context of an active trace or observation.
847        """
848        try:
849            langfuse = self._get_langfuse()
850            trace_id = self.get_current_trace_id()
851            current_observation_id = self.get_current_observation_id()
852
853            observation_id = (
854                current_observation_id if current_observation_id != trace_id else None
855            )
856
857            if trace_id:
858                langfuse.score(
859                    trace_id=trace_id,
860                    observation_id=observation_id,
861                    name=name,
862                    value=value,
863                    data_type=data_type,
864                    comment=comment,
865                    id=id,
866                    config_id=config_id,
867                )
868            else:
869                raise ValueError("No trace or observation found in the current context")
870
871        except Exception as e:
872            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
874    def score_current_trace(
875        self,
876        *,
877        name: str,
878        value: Union[float, str],
879        data_type: Optional[ScoreDataType] = None,
880        comment: Optional[str] = None,
881        id: Optional[str] = None,
882        config_id: Optional[str] = None,
883    ):
884        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
885
886        Arguments:
887            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
888            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
889            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
890              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
891            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
892            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
893            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
894
895        Returns:
896            None
897
898        Note:
899            This method is intended to be used within the context of an active trace or observation.
900        """
901        try:
902            langfuse = self._get_langfuse()
903            trace_id = self.get_current_trace_id()
904
905            if trace_id:
906                langfuse.score(
907                    trace_id=trace_id,
908                    name=name,
909                    value=value,
910                    data_type=data_type,
911                    comment=comment,
912                    id=id,
913                    config_id=config_id,
914                )
915            else:
916                raise ValueError("No trace found in the current context")
917
918        except Exception as e:
919            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
921    @catch_and_log_errors
922    def flush(self):
923        """Force immediate flush of all buffered observations to the Langfuse backend.
924
925        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
926        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
927
928        Usage:
929            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
930            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
931
932        Returns:
933            None
934
935        Raises:
936            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
937
938        Note:
939            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
940            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
941            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
942        """
943        langfuse = self._get_langfuse()
944        if langfuse:
945            langfuse.flush()
946        else:
947            self._log.warning("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None):
949    def configure(
950        self,
951        *,
952        public_key: Optional[str] = None,
953        secret_key: Optional[str] = None,
954        host: Optional[str] = None,
955        release: Optional[str] = None,
956        debug: Optional[bool] = None,
957        threads: Optional[int] = None,
958        flush_at: Optional[int] = None,
959        flush_interval: Optional[int] = None,
960        max_retries: Optional[int] = None,
961        timeout: Optional[int] = None,
962        httpx_client: Optional[httpx.Client] = None,
963        enabled: Optional[bool] = None,
964    ):
965        """Configure the Langfuse client.
966
967        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
968
969        Args:
970            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
971            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
972            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
973            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
974            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
975            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
976            flush_at: Max batch size that's sent to the API.
977            flush_interval: Max delay until a new batch is sent to the API.
978            max_retries: Max number of retries in case of API/network errors.
979            timeout: Timeout of API requests in seconds. Default is 20 seconds.
980            httpx_client: Pass your own httpx client for more customizability of requests.
981            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
982        """
983        langfuse_singleton = LangfuseSingleton()
984        langfuse_singleton.reset()
985
986        langfuse_singleton.get(
987            public_key=public_key,
988            secret_key=secret_key,
989            host=host,
990            release=release,
991            debug=debug,
992            threads=threads,
993            flush_at=flush_at,
994            flush_interval=flush_interval,
995            max_retries=max_retries,
996            timeout=timeout,
997            httpx_client=httpx_client,
998            enabled=enabled,
999        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
def auth_check(self) -> bool:
1013    def auth_check(self) -> bool:
1014        """Check if the current Langfuse client is authenticated.
1015
1016        Returns:
1017            bool: True if the client is authenticated, False otherwise
1018        """
1019        try:
1020            langfuse = self._get_langfuse()
1021
1022            return langfuse.auth_check()
1023        except Exception as e:
1024            self._log.error("No Langfuse object found in the current context", e)
1025
1026            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise