langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 89    def observe(
 90        self,
 91        *,
 92        name: Optional[str] = None,
 93        as_type: Optional[Literal["generation"]] = None,
 94        capture_input: bool = True,
 95        capture_output: bool = True,
 96        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 97    ) -> Callable[[F], F]:
 98        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 99
100        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
101        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
102
103        Attributes:
104            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
105            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
106            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
107            capture_output (bool): If True, captures the return value of the function as output. Default is True.
108            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
109
110        Returns:
111            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
112
113        Example:
114            For general tracing (functions/methods):
115            ```python
116            @observe()
117            def your_function(args):
118                # Your implementation here
119            ```
120            For observing language model generations:
121            ```python
122            @observe(as_type="generation")
123            def your_LLM_function(args):
124                # Your LLM invocation here
125            ```
126
127        Raises:
128            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
129
130        Note:
131        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
132        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
133        """
134
135        def decorator(func: F) -> F:
136            return (
137                self._async_observe(
138                    func,
139                    name=name,
140                    as_type=as_type,
141                    capture_input=capture_input,
142                    capture_output=capture_output,
143                    transform_to_string=transform_to_string,
144                )
145                if asyncio.iscoroutinefunction(func)
146                else self._sync_observe(
147                    func,
148                    name=name,
149                    as_type=as_type,
150                    capture_input=capture_input,
151                    capture_output=capture_output,
152                    transform_to_string=transform_to_string,
153                )
154            )
155
156        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  86class LangfuseDecorator:
  87    _log = logging.getLogger("langfuse")
  88
  89    def observe(
  90        self,
  91        *,
  92        name: Optional[str] = None,
  93        as_type: Optional[Literal["generation"]] = None,
  94        capture_input: bool = True,
  95        capture_output: bool = True,
  96        transform_to_string: Optional[Callable[[Iterable], str]] = None,
  97    ) -> Callable[[F], F]:
  98        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
  99
 100        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 101        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 102
 103        Attributes:
 104            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 105            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 106            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 107            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 108            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 109
 110        Returns:
 111            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 112
 113        Example:
 114            For general tracing (functions/methods):
 115            ```python
 116            @observe()
 117            def your_function(args):
 118                # Your implementation here
 119            ```
 120            For observing language model generations:
 121            ```python
 122            @observe(as_type="generation")
 123            def your_LLM_function(args):
 124                # Your LLM invocation here
 125            ```
 126
 127        Raises:
 128            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 129
 130        Note:
 131        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 132        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 133        """
 134
 135        def decorator(func: F) -> F:
 136            return (
 137                self._async_observe(
 138                    func,
 139                    name=name,
 140                    as_type=as_type,
 141                    capture_input=capture_input,
 142                    capture_output=capture_output,
 143                    transform_to_string=transform_to_string,
 144                )
 145                if asyncio.iscoroutinefunction(func)
 146                else self._sync_observe(
 147                    func,
 148                    name=name,
 149                    as_type=as_type,
 150                    capture_input=capture_input,
 151                    capture_output=capture_output,
 152                    transform_to_string=transform_to_string,
 153                )
 154            )
 155
 156        return decorator
 157
 158    def _async_observe(
 159        self,
 160        func: F,
 161        *,
 162        name: Optional[str],
 163        as_type: Optional[Literal["generation"]],
 164        capture_input: bool,
 165        capture_output: bool,
 166        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 167    ) -> F:
 168        @wraps(func)
 169        async def async_wrapper(*args, **kwargs):
 170            observation = self._prepare_call(
 171                name=name or func.__name__,
 172                as_type=as_type,
 173                capture_input=capture_input,
 174                is_method=self._is_method(func),
 175                func_args=args,
 176                func_kwargs=kwargs,
 177            )
 178            result = None
 179
 180            try:
 181                result = await func(*args, **kwargs)
 182            except Exception as e:
 183                self._handle_exception(observation, e)
 184            finally:
 185                result = self._finalize_call(
 186                    observation, result, capture_output, transform_to_string
 187                )
 188
 189                # Returning from finally block may swallow errors, so only return if result is not None
 190                if result is not None:
 191                    return result
 192
 193        return cast(F, async_wrapper)
 194
 195    def _sync_observe(
 196        self,
 197        func: F,
 198        *,
 199        name: Optional[str],
 200        as_type: Optional[Literal["generation"]],
 201        capture_input: bool,
 202        capture_output: bool,
 203        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 204    ) -> F:
 205        @wraps(func)
 206        def sync_wrapper(*args, **kwargs):
 207            observation = self._prepare_call(
 208                name=name or func.__name__,
 209                as_type=as_type,
 210                capture_input=capture_input,
 211                is_method=self._is_method(func),
 212                func_args=args,
 213                func_kwargs=kwargs,
 214            )
 215            result = None
 216
 217            try:
 218                result = func(*args, **kwargs)
 219            except Exception as e:
 220                self._handle_exception(observation, e)
 221            finally:
 222                result = self._finalize_call(
 223                    observation, result, capture_output, transform_to_string
 224                )
 225
 226                # Returning from finally block may swallow errors, so only return if result is not None
 227                if result is not None:
 228                    return result
 229
 230        return cast(F, sync_wrapper)
 231
 232    @staticmethod
 233    def _is_method(func: Callable) -> bool:
 234        """Check if a callable is likely an class or instance method based on its signature.
 235
 236        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 237
 238        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 239
 240        Returns:
 241        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 242        """
 243        return (
 244            "self" in inspect.signature(func).parameters
 245            or "cls" in inspect.signature(func).parameters
 246        )
 247
 248    def _prepare_call(
 249        self,
 250        *,
 251        name: str,
 252        as_type: Optional[Literal["generation"]],
 253        capture_input: bool,
 254        is_method: bool = False,
 255        func_args: Tuple = (),
 256        func_kwargs: Dict = {},
 257    ) -> Optional[
 258        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 259    ]:
 260        try:
 261            langfuse = self._get_langfuse()
 262            stack = _observation_stack_context.get().copy()
 263            parent = stack[-1] if stack else None
 264
 265            # Collect default observation data
 266            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 267            id = str(observation_id) if observation_id else None
 268            start_time = _get_timestamp()
 269
 270            input = (
 271                self._get_input_from_func_args(
 272                    is_method=is_method,
 273                    func_args=func_args,
 274                    func_kwargs=func_kwargs,
 275                )
 276                if capture_input
 277                else None
 278            )
 279
 280            params = {
 281                "id": id,
 282                "name": name,
 283                "start_time": start_time,
 284                "input": input,
 285            }
 286
 287            # Create observation
 288            if parent and as_type == "generation":
 289                observation = parent.generation(**params)
 290            elif as_type == "generation":
 291                # Create wrapper trace if generation is top-level
 292                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 293                trace = langfuse.trace(id=id, name=name, start_time=start_time)
 294                observation = langfuse.generation(
 295                    name=name, start_time=start_time, input=input, trace_id=trace.id
 296                )
 297            elif parent:
 298                observation = parent.span(**params)
 299            else:
 300                params["id"] = self._get_context_trace_id() or params["id"]
 301                observation = langfuse.trace(**params)
 302
 303            _observation_stack_context.set(stack + [observation])
 304
 305            return observation
 306        except Exception as e:
 307            self._log.error(f"Failed to prepare observation: {e}")
 308
 309    def _get_input_from_func_args(
 310        self,
 311        *,
 312        is_method: bool = False,
 313        func_args: Tuple = (),
 314        func_kwargs: Dict = {},
 315    ) -> Any:
 316        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 317        logged_args = func_args[1:] if is_method else func_args
 318        raw_input = {
 319            "args": logged_args,
 320            "kwargs": func_kwargs,
 321        }
 322
 323        # Serialize and deserialize to ensure proper JSON serialization.
 324        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 325        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 326
 327    def _get_context_trace_id(self):
 328        context_trace_id = _root_trace_id_context.get()
 329
 330        if context_trace_id is not None:
 331            # Clear the context trace ID to avoid leaking it to other traces
 332            _root_trace_id_context.set(None)
 333
 334            return context_trace_id
 335
 336        return None
 337
 338    def _finalize_call(
 339        self,
 340        observation: Optional[
 341            Union[
 342                StatefulSpanClient,
 343                StatefulTraceClient,
 344                StatefulGenerationClient,
 345            ]
 346        ],
 347        result: Any,
 348        capture_output: bool,
 349        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 350    ):
 351        if inspect.isgenerator(result):
 352            return self._wrap_sync_generator_result(
 353                observation, result, capture_output, transform_to_string
 354            )
 355        elif inspect.isasyncgen(result):
 356            return self._wrap_async_generator_result(
 357                observation, result, capture_output, transform_to_string
 358            )
 359
 360        else:
 361            return self._handle_call_result(observation, result, capture_output)
 362
 363    def _handle_call_result(
 364        self,
 365        observation: Optional[
 366            Union[
 367                StatefulSpanClient,
 368                StatefulTraceClient,
 369                StatefulGenerationClient,
 370            ]
 371        ],
 372        result: Any,
 373        capture_output: bool,
 374    ):
 375        try:
 376            if observation is None:
 377                raise ValueError("No observation found in the current context")
 378
 379            # Collect final observation data
 380            observation_params = _observation_params_context.get()[
 381                observation.id
 382            ].copy()
 383            del _observation_params_context.get()[
 384                observation.id
 385            ]  # Remove observation params to avoid leaking
 386
 387            end_time = observation_params["end_time"] or _get_timestamp()
 388            raw_output = observation_params["output"] or (
 389                result if result and capture_output else None
 390            )
 391
 392            # Serialize and deserialize to ensure proper JSON serialization.
 393            # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 394            output = json.loads(json.dumps(raw_output, cls=EventSerializer))
 395            observation_params.update(end_time=end_time, output=output)
 396
 397            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 398                observation.end(**observation_params)
 399            elif isinstance(observation, StatefulTraceClient):
 400                observation.update(**observation_params)
 401
 402            # Remove observation from top of stack
 403            stack = _observation_stack_context.get()
 404            _observation_stack_context.set(stack[:-1])
 405
 406        except Exception as e:
 407            self._log.error(f"Failed to finalize observation: {e}")
 408
 409        finally:
 410            return result
 411
 412    def _handle_exception(
 413        self,
 414        observation: Optional[
 415            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 416        ],
 417        e: Exception,
 418    ):
 419        if observation:
 420            _observation_params_context.get()[observation.id].update(
 421                level="ERROR", status_message=str(e)
 422            )
 423        raise e
 424
 425    def _wrap_sync_generator_result(
 426        self,
 427        observation: Optional[
 428            Union[
 429                StatefulSpanClient,
 430                StatefulTraceClient,
 431                StatefulGenerationClient,
 432            ]
 433        ],
 434        generator: Generator,
 435        capture_output: bool,
 436        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 437    ):
 438        items = []
 439
 440        try:
 441            for item in generator:
 442                items.append(item)
 443
 444                yield item
 445
 446        finally:
 447            output = items
 448
 449            if transform_to_string is not None:
 450                output = transform_to_string(items)
 451
 452            elif all(isinstance(item, str) for item in items):
 453                output = "".join(items)
 454
 455            self._handle_call_result(observation, output, capture_output)
 456
 457    async def _wrap_async_generator_result(
 458        self,
 459        observation: Optional[
 460            Union[
 461                StatefulSpanClient,
 462                StatefulTraceClient,
 463                StatefulGenerationClient,
 464            ]
 465        ],
 466        generator: AsyncGenerator,
 467        capture_output: bool,
 468        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 469    ) -> AsyncGenerator:
 470        items = []
 471
 472        try:
 473            async for item in generator:
 474                items.append(item)
 475
 476                yield item
 477
 478        finally:
 479            output = items
 480
 481            if transform_to_string is not None:
 482                output = transform_to_string(items)
 483
 484            elif all(isinstance(item, str) for item in items):
 485                output = "".join(items)
 486
 487            self._handle_call_result(observation, output, capture_output)
 488
 489    def get_current_llama_index_handler(self):
 490        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 491
 492        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 493        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 494
 495        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 496
 497        Returns:
 498            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 499
 500        Note:
 501            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 502            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 503        """
 504        try:
 505            from langfuse.llama_index import LlamaIndexCallbackHandler
 506        except ImportError:
 507            self._log.error(
 508                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 509            )
 510
 511            return None
 512
 513        observation = _observation_stack_context.get()[-1]
 514
 515        if observation is None:
 516            self._log.warn("No observation found in the current context")
 517
 518            return None
 519
 520        if isinstance(observation, StatefulGenerationClient):
 521            self._log.warn(
 522                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 523            )
 524
 525            return None
 526
 527        callback_handler = LlamaIndexCallbackHandler()
 528        callback_handler.set_root(observation)
 529
 530        return callback_handler
 531
 532    def get_current_langchain_handler(self):
 533        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 534
 535        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 536        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 537
 538        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 539
 540        Returns:
 541            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 542
 543        Note:
 544            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 545            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 546        """
 547        observation = _observation_stack_context.get()[-1]
 548
 549        if observation is None:
 550            self._log.warn("No observation found in the current context")
 551
 552            return None
 553
 554        if isinstance(observation, StatefulGenerationClient):
 555            self._log.warn(
 556                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 557            )
 558
 559            return None
 560
 561        return observation.get_langchain_handler()
 562
 563    def get_current_trace_id(self):
 564        """Retrieve the ID of the current trace from the observation stack context.
 565
 566        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 567        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 568        representing the entry point of the traced execution context.
 569
 570        Returns:
 571            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 572            possibly due to the method being called outside of any @observe-decorated function execution.
 573
 574        Note:
 575            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 576            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 577        """
 578        stack = _observation_stack_context.get()
 579        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 580
 581        if not stack:
 582            if should_log_warning:
 583                self._log.warn("No trace found in the current context")
 584
 585            return None
 586
 587        return stack[0].id
 588
 589    def _get_caller_module_name(self):
 590        try:
 591            caller_module = inspect.getmodule(inspect.stack()[2][0])
 592        except Exception as e:
 593            self._log.warn(f"Failed to get caller module: {e}")
 594
 595            return None
 596
 597        return caller_module.__name__ if caller_module else None
 598
 599    def get_current_trace_url(self) -> Optional[str]:
 600        """Retrieve the URL of the current trace in context.
 601
 602        Returns:
 603            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 604            possibly due to the method being called outside of any @observe-decorated function execution.
 605
 606        Note:
 607            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 608            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 609        """
 610        try:
 611            trace_id = self.get_current_trace_id()
 612            langfuse = self._get_langfuse()
 613
 614            if not trace_id:
 615                raise ValueError("No trace found in the current context")
 616
 617            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
 618
 619        except Exception as e:
 620            self._log.error(f"Failed to get current trace URL: {e}")
 621
 622            return None
 623
 624    def get_current_observation_id(self):
 625        """Retrieve the ID of the current observation in context.
 626
 627        Returns:
 628            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 629            possibly due to the method being called outside of any @observe-decorated function execution.
 630
 631        Note:
 632            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 633            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 634            - If called at the top level of a trace, it will return the trace ID.
 635        """
 636        stack = _observation_stack_context.get()
 637        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 638
 639        if not stack:
 640            if should_log_warning:
 641                self._log.warn("No observation found in the current context")
 642
 643            return None
 644
 645        return stack[-1].id
 646
 647    def update_current_trace(
 648        self,
 649        name: Optional[str] = None,
 650        user_id: Optional[str] = None,
 651        session_id: Optional[str] = None,
 652        version: Optional[str] = None,
 653        release: Optional[str] = None,
 654        metadata: Optional[Any] = None,
 655        tags: Optional[List[str]] = None,
 656        public: Optional[bool] = None,
 657    ):
 658        """Set parameters for the current trace, updating the trace's metadata and context information.
 659
 660        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 661        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 662        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 663
 664        Arguments:
 665            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 666            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 667            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 668            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 669            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 670            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 671            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 672
 673        Returns:
 674            None
 675
 676        Note:
 677            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 678            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 679            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 680        """
 681        trace_id = self.get_current_trace_id()
 682
 683        if trace_id is None:
 684            self._log.warn("No trace found in the current context")
 685
 686            return
 687
 688        params_to_update = {
 689            k: v
 690            for k, v in {
 691                "name": name,
 692                "user_id": user_id,
 693                "session_id": session_id,
 694                "version": version,
 695                "release": release,
 696                "metadata": metadata,
 697                "tags": tags,
 698                "public": public,
 699            }.items()
 700            if v is not None
 701        }
 702
 703        _observation_params_context.get()[trace_id].update(params_to_update)
 704
 705    def update_current_observation(
 706        self,
 707        *,
 708        input: Optional[Any] = None,
 709        output: Optional[Any] = None,
 710        name: Optional[str] = None,
 711        version: Optional[str] = None,
 712        metadata: Optional[Any] = None,
 713        start_time: Optional[datetime] = None,
 714        end_time: Optional[datetime] = None,
 715        release: Optional[str] = None,
 716        tags: Optional[List[str]] = None,
 717        user_id: Optional[str] = None,
 718        session_id: Optional[str] = None,
 719        level: Optional[SpanLevel] = None,
 720        status_message: Optional[str] = None,
 721        completion_start_time: Optional[datetime] = None,
 722        model: Optional[str] = None,
 723        model_parameters: Optional[Dict[str, MapValue]] = None,
 724        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 725        prompt: Optional[PromptClient] = None,
 726        public: Optional[bool] = None,
 727    ):
 728        """Update parameters for the current observation within an active trace context.
 729
 730        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 731        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 732        enhancing the observability and traceability of the execution context.
 733
 734        Note that if a param is not available on a specific observation type, it will be ignored.
 735
 736        Shared params:
 737            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 738            - `output` (Optional[Any]): The output or result of the trace or observation
 739            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 740            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 741            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 742            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 743            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 744
 745        Trace-specific params:
 746            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 747            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 748            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 749            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 750            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 751
 752        Span-specific params:
 753            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 754            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 755
 756        Generation-specific params:
 757            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 758            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 759            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 760            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 761
 762        Returns:
 763            None
 764
 765        Raises:
 766            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 767
 768        Note:
 769            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 770            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 771            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 772        """
 773        stack = _observation_stack_context.get()
 774        observation = stack[-1] if stack else None
 775
 776        if not observation:
 777            self._log.warn("No observation found in the current context")
 778
 779            return
 780
 781        update_params = {
 782            k: v
 783            for k, v in {
 784                "input": input,
 785                "output": output,
 786                "name": name,
 787                "version": version,
 788                "metadata": metadata,
 789                "start_time": start_time,
 790                "end_time": end_time,
 791                "release": release,
 792                "tags": tags,
 793                "user_id": user_id,
 794                "session_id": session_id,
 795                "level": level,
 796                "status_message": status_message,
 797                "completion_start_time": completion_start_time,
 798                "model": model,
 799                "model_parameters": model_parameters,
 800                "usage": usage,
 801                "prompt": prompt,
 802                "public": public,
 803            }.items()
 804            if v is not None
 805        }
 806
 807        _observation_params_context.get()[observation.id].update(update_params)
 808
 809    def score_current_observation(
 810        self,
 811        *,
 812        name: str,
 813        value: float,
 814        comment: Optional[str] = None,
 815        id: Optional[str] = None,
 816    ):
 817        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 818
 819        Arguments:
 820            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 821            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 822            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 823            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 824
 825        Returns:
 826            None
 827
 828        Note:
 829            This method is intended to be used within the context of an active trace or observation.
 830        """
 831        try:
 832            langfuse = self._get_langfuse()
 833            trace_id = self.get_current_trace_id()
 834            current_observation_id = self.get_current_observation_id()
 835
 836            observation_id = (
 837                current_observation_id if current_observation_id != trace_id else None
 838            )
 839
 840            if trace_id:
 841                langfuse.score(
 842                    trace_id=trace_id,
 843                    observation_id=observation_id,
 844                    name=name,
 845                    value=value,
 846                    comment=comment,
 847                    id=id,
 848                )
 849            else:
 850                raise ValueError("No trace or observation found in the current context")
 851
 852        except Exception as e:
 853            self._log.error(f"Failed to score observation: {e}")
 854
 855    def score_current_trace(
 856        self,
 857        *,
 858        name: str,
 859        value: float,
 860        comment: Optional[str] = None,
 861        id: Optional[str] = None,
 862    ):
 863        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 864
 865        Arguments:
 866            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 867            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 868            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 869            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 870
 871        Returns:
 872            None
 873
 874        Note:
 875            This method is intended to be used within the context of an active trace or observation.
 876        """
 877        try:
 878            langfuse = self._get_langfuse()
 879            trace_id = self.get_current_trace_id()
 880
 881            if trace_id:
 882                langfuse.score(
 883                    trace_id=trace_id,
 884                    name=name,
 885                    value=value,
 886                    comment=comment,
 887                    id=id,
 888                )
 889            else:
 890                raise ValueError("No trace found in the current context")
 891
 892        except Exception as e:
 893            self._log.error(f"Failed to score observation: {e}")
 894
 895    @catch_and_log_errors
 896    def flush(self):
 897        """Force immediate flush of all buffered observations to the Langfuse backend.
 898
 899        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
 900        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
 901
 902        Usage:
 903            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
 904            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
 905
 906        Returns:
 907            None
 908
 909        Raises:
 910            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
 911
 912        Note:
 913            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
 914            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
 915            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
 916        """
 917        langfuse = self._get_langfuse()
 918        if langfuse:
 919            langfuse.flush()
 920        else:
 921            self._log.warn("No langfuse object found in the current context")
 922
 923    def configure(
 924        self,
 925        *,
 926        public_key: Optional[str] = None,
 927        secret_key: Optional[str] = None,
 928        host: Optional[str] = None,
 929        release: Optional[str] = None,
 930        debug: Optional[bool] = None,
 931        threads: Optional[int] = None,
 932        flush_at: Optional[int] = None,
 933        flush_interval: Optional[int] = None,
 934        max_retries: Optional[int] = None,
 935        timeout: Optional[int] = None,
 936        httpx_client: Optional[httpx.Client] = None,
 937        enabled: Optional[bool] = None,
 938    ):
 939        """Configure the Langfuse client.
 940
 941        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
 942
 943        Args:
 944            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 945            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 946            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 947            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 948            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 949            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 950            flush_at: Max batch size that's sent to the API.
 951            flush_interval: Max delay until a new batch is sent to the API.
 952            max_retries: Max number of retries in case of API/network errors.
 953            timeout: Timeout of API requests in seconds. Default is 20 seconds.
 954            httpx_client: Pass your own httpx client for more customizability of requests.
 955            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
 956        """
 957        langfuse_singleton = LangfuseSingleton()
 958        langfuse_singleton.reset()
 959
 960        langfuse_singleton.get(
 961            public_key=public_key,
 962            secret_key=secret_key,
 963            host=host,
 964            release=release,
 965            debug=debug,
 966            threads=threads,
 967            flush_at=flush_at,
 968            flush_interval=flush_interval,
 969            max_retries=max_retries,
 970            timeout=timeout,
 971            httpx_client=httpx_client,
 972            enabled=enabled,
 973        )
 974
 975    def _get_langfuse(self) -> Langfuse:
 976        return LangfuseSingleton().get()
 977
 978    def _set_root_trace_id(self, trace_id: str):
 979        if _observation_stack_context.get():
 980            self._log.warn(
 981                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
 982            )
 983            return
 984
 985        _root_trace_id_context.set(trace_id)
 986
 987    def auth_check(self) -> bool:
 988        """Check if the current Langfuse client is authenticated.
 989
 990        Returns:
 991            bool: True if the client is authenticated, False otherwise
 992        """
 993        try:
 994            langfuse = self._get_langfuse()
 995
 996            return langfuse.auth_check()
 997        except Exception as e:
 998            self._log.error("No Langfuse object found in the current context", e)
 999
1000            return False
def observe( self, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[~F], ~F]:
 89    def observe(
 90        self,
 91        *,
 92        name: Optional[str] = None,
 93        as_type: Optional[Literal["generation"]] = None,
 94        capture_input: bool = True,
 95        capture_output: bool = True,
 96        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 97    ) -> Callable[[F], F]:
 98        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 99
100        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
101        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
102
103        Attributes:
104            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
105            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
106            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
107            capture_output (bool): If True, captures the return value of the function as output. Default is True.
108            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
109
110        Returns:
111            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
112
113        Example:
114            For general tracing (functions/methods):
115            ```python
116            @observe()
117            def your_function(args):
118                # Your implementation here
119            ```
120            For observing language model generations:
121            ```python
122            @observe(as_type="generation")
123            def your_LLM_function(args):
124                # Your LLM invocation here
125            ```
126
127        Raises:
128            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
129
130        Note:
131        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
132        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
133        """
134
135        def decorator(func: F) -> F:
136            return (
137                self._async_observe(
138                    func,
139                    name=name,
140                    as_type=as_type,
141                    capture_input=capture_input,
142                    capture_output=capture_output,
143                    transform_to_string=transform_to_string,
144                )
145                if asyncio.iscoroutinefunction(func)
146                else self._sync_observe(
147                    func,
148                    name=name,
149                    as_type=as_type,
150                    capture_input=capture_input,
151                    capture_output=capture_output,
152                    transform_to_string=transform_to_string,
153                )
154            )
155
156        return decorator

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
489    def get_current_llama_index_handler(self):
490        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
491
492        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
493        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
494
495        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
496
497        Returns:
498            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
499
500        Note:
501            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
502            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
503        """
504        try:
505            from langfuse.llama_index import LlamaIndexCallbackHandler
506        except ImportError:
507            self._log.error(
508                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
509            )
510
511            return None
512
513        observation = _observation_stack_context.get()[-1]
514
515        if observation is None:
516            self._log.warn("No observation found in the current context")
517
518            return None
519
520        if isinstance(observation, StatefulGenerationClient):
521            self._log.warn(
522                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
523            )
524
525            return None
526
527        callback_handler = LlamaIndexCallbackHandler()
528        callback_handler.set_root(observation)
529
530        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
532    def get_current_langchain_handler(self):
533        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
534
535        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
536        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
537
538        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
539
540        Returns:
541            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
542
543        Note:
544            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
545            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
546        """
547        observation = _observation_stack_context.get()[-1]
548
549        if observation is None:
550            self._log.warn("No observation found in the current context")
551
552            return None
553
554        if isinstance(observation, StatefulGenerationClient):
555            self._log.warn(
556                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
557            )
558
559            return None
560
561        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
563    def get_current_trace_id(self):
564        """Retrieve the ID of the current trace from the observation stack context.
565
566        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
567        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
568        representing the entry point of the traced execution context.
569
570        Returns:
571            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
572            possibly due to the method being called outside of any @observe-decorated function execution.
573
574        Note:
575            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
576            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
577        """
578        stack = _observation_stack_context.get()
579        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
580
581        if not stack:
582            if should_log_warning:
583                self._log.warn("No trace found in the current context")
584
585            return None
586
587        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
599    def get_current_trace_url(self) -> Optional[str]:
600        """Retrieve the URL of the current trace in context.
601
602        Returns:
603            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
604            possibly due to the method being called outside of any @observe-decorated function execution.
605
606        Note:
607            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
608            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
609        """
610        try:
611            trace_id = self.get_current_trace_id()
612            langfuse = self._get_langfuse()
613
614            if not trace_id:
615                raise ValueError("No trace found in the current context")
616
617            return f"{langfuse.client._client_wrapper._base_url}/trace/{trace_id}"
618
619        except Exception as e:
620            self._log.error(f"Failed to get current trace URL: {e}")
621
622            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
624    def get_current_observation_id(self):
625        """Retrieve the ID of the current observation in context.
626
627        Returns:
628            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
629            possibly due to the method being called outside of any @observe-decorated function execution.
630
631        Note:
632            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
633            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
634            - If called at the top level of a trace, it will return the trace ID.
635        """
636        stack = _observation_stack_context.get()
637        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
638
639        if not stack:
640            if should_log_warning:
641                self._log.warn("No observation found in the current context")
642
643            return None
644
645        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
647    def update_current_trace(
648        self,
649        name: Optional[str] = None,
650        user_id: Optional[str] = None,
651        session_id: Optional[str] = None,
652        version: Optional[str] = None,
653        release: Optional[str] = None,
654        metadata: Optional[Any] = None,
655        tags: Optional[List[str]] = None,
656        public: Optional[bool] = None,
657    ):
658        """Set parameters for the current trace, updating the trace's metadata and context information.
659
660        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
661        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
662        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
663
664        Arguments:
665            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
666            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
667            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
668            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
669            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
670            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
671            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
672
673        Returns:
674            None
675
676        Note:
677            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
678            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
679            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
680        """
681        trace_id = self.get_current_trace_id()
682
683        if trace_id is None:
684            self._log.warn("No trace found in the current context")
685
686            return
687
688        params_to_update = {
689            k: v
690            for k, v in {
691                "name": name,
692                "user_id": user_id,
693                "session_id": session_id,
694                "version": version,
695                "release": release,
696                "metadata": metadata,
697                "tags": tags,
698                "public": public,
699            }.items()
700            if v is not None
701        }
702
703        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
705    def update_current_observation(
706        self,
707        *,
708        input: Optional[Any] = None,
709        output: Optional[Any] = None,
710        name: Optional[str] = None,
711        version: Optional[str] = None,
712        metadata: Optional[Any] = None,
713        start_time: Optional[datetime] = None,
714        end_time: Optional[datetime] = None,
715        release: Optional[str] = None,
716        tags: Optional[List[str]] = None,
717        user_id: Optional[str] = None,
718        session_id: Optional[str] = None,
719        level: Optional[SpanLevel] = None,
720        status_message: Optional[str] = None,
721        completion_start_time: Optional[datetime] = None,
722        model: Optional[str] = None,
723        model_parameters: Optional[Dict[str, MapValue]] = None,
724        usage: Optional[Union[BaseModel, ModelUsage]] = None,
725        prompt: Optional[PromptClient] = None,
726        public: Optional[bool] = None,
727    ):
728        """Update parameters for the current observation within an active trace context.
729
730        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
731        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
732        enhancing the observability and traceability of the execution context.
733
734        Note that if a param is not available on a specific observation type, it will be ignored.
735
736        Shared params:
737            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
738            - `output` (Optional[Any]): The output or result of the trace or observation
739            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
740            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
741            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
742            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
743            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
744
745        Trace-specific params:
746            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
747            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
748            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
749            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
750            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
751
752        Span-specific params:
753            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
754            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
755
756        Generation-specific params:
757            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
758            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
759            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
760            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
761
762        Returns:
763            None
764
765        Raises:
766            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
767
768        Note:
769            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
770            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
771            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
772        """
773        stack = _observation_stack_context.get()
774        observation = stack[-1] if stack else None
775
776        if not observation:
777            self._log.warn("No observation found in the current context")
778
779            return
780
781        update_params = {
782            k: v
783            for k, v in {
784                "input": input,
785                "output": output,
786                "name": name,
787                "version": version,
788                "metadata": metadata,
789                "start_time": start_time,
790                "end_time": end_time,
791                "release": release,
792                "tags": tags,
793                "user_id": user_id,
794                "session_id": session_id,
795                "level": level,
796                "status_message": status_message,
797                "completion_start_time": completion_start_time,
798                "model": model,
799                "model_parameters": model_parameters,
800                "usage": usage,
801                "prompt": prompt,
802                "public": public,
803            }.items()
804            if v is not None
805        }
806
807        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: float, comment: Optional[str] = None, id: Optional[str] = None):
809    def score_current_observation(
810        self,
811        *,
812        name: str,
813        value: float,
814        comment: Optional[str] = None,
815        id: Optional[str] = None,
816    ):
817        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
818
819        Arguments:
820            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
821            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
822            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
823            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
824
825        Returns:
826            None
827
828        Note:
829            This method is intended to be used within the context of an active trace or observation.
830        """
831        try:
832            langfuse = self._get_langfuse()
833            trace_id = self.get_current_trace_id()
834            current_observation_id = self.get_current_observation_id()
835
836            observation_id = (
837                current_observation_id if current_observation_id != trace_id else None
838            )
839
840            if trace_id:
841                langfuse.score(
842                    trace_id=trace_id,
843                    observation_id=observation_id,
844                    name=name,
845                    value=value,
846                    comment=comment,
847                    id=id,
848                )
849            else:
850                raise ValueError("No trace or observation found in the current context")
851
852        except Exception as e:
853            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: float, comment: Optional[str] = None, id: Optional[str] = None):
855    def score_current_trace(
856        self,
857        *,
858        name: str,
859        value: float,
860        comment: Optional[str] = None,
861        id: Optional[str] = None,
862    ):
863        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
864
865        Arguments:
866            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
867            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
868            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
869            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
870
871        Returns:
872            None
873
874        Note:
875            This method is intended to be used within the context of an active trace or observation.
876        """
877        try:
878            langfuse = self._get_langfuse()
879            trace_id = self.get_current_trace_id()
880
881            if trace_id:
882                langfuse.score(
883                    trace_id=trace_id,
884                    name=name,
885                    value=value,
886                    comment=comment,
887                    id=id,
888                )
889            else:
890                raise ValueError("No trace found in the current context")
891
892        except Exception as e:
893            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
895    @catch_and_log_errors
896    def flush(self):
897        """Force immediate flush of all buffered observations to the Langfuse backend.
898
899        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
900        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
901
902        Usage:
903            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
904            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
905
906        Returns:
907            None
908
909        Raises:
910            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
911
912        Note:
913            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
914            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
915            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
916        """
917        langfuse = self._get_langfuse()
918        if langfuse:
919            langfuse.flush()
920        else:
921            self._log.warn("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None):
923    def configure(
924        self,
925        *,
926        public_key: Optional[str] = None,
927        secret_key: Optional[str] = None,
928        host: Optional[str] = None,
929        release: Optional[str] = None,
930        debug: Optional[bool] = None,
931        threads: Optional[int] = None,
932        flush_at: Optional[int] = None,
933        flush_interval: Optional[int] = None,
934        max_retries: Optional[int] = None,
935        timeout: Optional[int] = None,
936        httpx_client: Optional[httpx.Client] = None,
937        enabled: Optional[bool] = None,
938    ):
939        """Configure the Langfuse client.
940
941        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
942
943        Args:
944            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
945            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
946            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
947            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
948            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
949            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
950            flush_at: Max batch size that's sent to the API.
951            flush_interval: Max delay until a new batch is sent to the API.
952            max_retries: Max number of retries in case of API/network errors.
953            timeout: Timeout of API requests in seconds. Default is 20 seconds.
954            httpx_client: Pass your own httpx client for more customizability of requests.
955            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
956        """
957        langfuse_singleton = LangfuseSingleton()
958        langfuse_singleton.reset()
959
960        langfuse_singleton.get(
961            public_key=public_key,
962            secret_key=secret_key,
963            host=host,
964            release=release,
965            debug=debug,
966            threads=threads,
967            flush_at=flush_at,
968            flush_interval=flush_interval,
969            max_retries=max_retries,
970            timeout=timeout,
971            httpx_client=httpx_client,
972            enabled=enabled,
973        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
def auth_check(self) -> bool:
 987    def auth_check(self) -> bool:
 988        """Check if the current Langfuse client is authenticated.
 989
 990        Returns:
 991            bool: True if the client is authenticated, False otherwise
 992        """
 993        try:
 994            langfuse = self._get_langfuse()
 995
 996            return langfuse.auth_check()
 997        except Exception as e:
 998            self._log.error("No Langfuse object found in the current context", e)
 999
1000            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise