langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
117    def observe(
118        self,
119        func: Optional[Callable[P, R]] = None,
120        *,
121        name: Optional[str] = None,
122        as_type: Optional[Literal["generation"]] = None,
123        capture_input: bool = True,
124        capture_output: bool = True,
125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
128
129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
131
132        Attributes:
133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
138
139        Returns:
140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
141
142        Example:
143            For general tracing (functions/methods):
144            ```python
145            @observe()
146            def your_function(args):
147                # Your implementation here
148            ```
149            For observing language model generations:
150            ```python
151            @observe(as_type="generation")
152            def your_LLM_function(args):
153                # Your LLM invocation here
154            ```
155
156        Raises:
157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
158
159        Note:
160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
162        """
163
164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
165            return (
166                self._async_observe(
167                    func,
168                    name=name,
169                    as_type=as_type,
170                    capture_input=capture_input,
171                    capture_output=capture_output,
172                    transform_to_string=transform_to_string,
173                )
174                if asyncio.iscoroutinefunction(func)
175                else self._sync_observe(
176                    func,
177                    name=name,
178                    as_type=as_type,
179                    capture_input=capture_input,
180                    capture_output=capture_output,
181                    transform_to_string=transform_to_string,
182                )
183            )
184
185        """
186        If the decorator is called without arguments, return the decorator function itself. 
187        This allows the decorator to be used with or without arguments. 
188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
189        """
190        if func is None:
191            return decorator
192        else:
193            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  96class LangfuseDecorator:
  97    _log = logging.getLogger("langfuse")
  98
  99    # Type overload for observe decorator with no arguments
 100    @overload
 101    def observe(self, func: F) -> F: ...
 102
 103    # Type overload for observe decorator with arguments
 104    @overload
 105    def observe(
 106        self,
 107        func: None = None,
 108        *,
 109        name: Optional[str] = None,
 110        as_type: Optional[Literal["generation"]] = None,
 111        capture_input: bool = True,
 112        capture_output: bool = True,
 113        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 114    ) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
 115
 116    # Implementation of observe decorator
 117    def observe(
 118        self,
 119        func: Optional[Callable[P, R]] = None,
 120        *,
 121        name: Optional[str] = None,
 122        as_type: Optional[Literal["generation"]] = None,
 123        capture_input: bool = True,
 124        capture_output: bool = True,
 125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
 127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 128
 129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 131
 132        Attributes:
 133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 138
 139        Returns:
 140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 141
 142        Example:
 143            For general tracing (functions/methods):
 144            ```python
 145            @observe()
 146            def your_function(args):
 147                # Your implementation here
 148            ```
 149            For observing language model generations:
 150            ```python
 151            @observe(as_type="generation")
 152            def your_LLM_function(args):
 153                # Your LLM invocation here
 154            ```
 155
 156        Raises:
 157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 158
 159        Note:
 160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 162        """
 163
 164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
 165            return (
 166                self._async_observe(
 167                    func,
 168                    name=name,
 169                    as_type=as_type,
 170                    capture_input=capture_input,
 171                    capture_output=capture_output,
 172                    transform_to_string=transform_to_string,
 173                )
 174                if asyncio.iscoroutinefunction(func)
 175                else self._sync_observe(
 176                    func,
 177                    name=name,
 178                    as_type=as_type,
 179                    capture_input=capture_input,
 180                    capture_output=capture_output,
 181                    transform_to_string=transform_to_string,
 182                )
 183            )
 184
 185        """
 186        If the decorator is called without arguments, return the decorator function itself. 
 187        This allows the decorator to be used with or without arguments. 
 188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
 189        """
 190        if func is None:
 191            return decorator
 192        else:
 193            return decorator(func)
 194
 195    def _async_observe(
 196        self,
 197        func: F,
 198        *,
 199        name: Optional[str],
 200        as_type: Optional[Literal["generation"]],
 201        capture_input: bool,
 202        capture_output: bool,
 203        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 204    ) -> F:
 205        @wraps(func)
 206        async def async_wrapper(*args, **kwargs):
 207            observation = self._prepare_call(
 208                name=name or func.__name__,
 209                as_type=as_type,
 210                capture_input=capture_input,
 211                is_method=self._is_method(func),
 212                func_args=args,
 213                func_kwargs=kwargs,
 214            )
 215            result = None
 216
 217            try:
 218                result = await func(*args, **kwargs)
 219            except Exception as e:
 220                self._handle_exception(observation, e)
 221            finally:
 222                result = self._finalize_call(
 223                    observation, result, capture_output, transform_to_string
 224                )
 225
 226                # Returning from finally block may swallow errors, so only return if result is not None
 227                if result is not None:
 228                    return result
 229
 230        return cast(F, async_wrapper)
 231
 232    def _sync_observe(
 233        self,
 234        func: F,
 235        *,
 236        name: Optional[str],
 237        as_type: Optional[Literal["generation"]],
 238        capture_input: bool,
 239        capture_output: bool,
 240        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 241    ) -> F:
 242        @wraps(func)
 243        def sync_wrapper(*args, **kwargs):
 244            observation = self._prepare_call(
 245                name=name or func.__name__,
 246                as_type=as_type,
 247                capture_input=capture_input,
 248                is_method=self._is_method(func),
 249                func_args=args,
 250                func_kwargs=kwargs,
 251            )
 252            result = None
 253
 254            try:
 255                result = func(*args, **kwargs)
 256            except Exception as e:
 257                self._handle_exception(observation, e)
 258            finally:
 259                result = self._finalize_call(
 260                    observation, result, capture_output, transform_to_string
 261                )
 262
 263                # Returning from finally block may swallow errors, so only return if result is not None
 264                if result is not None:
 265                    return result
 266
 267        return cast(F, sync_wrapper)
 268
 269    @staticmethod
 270    def _is_method(func: Callable) -> bool:
 271        """Check if a callable is likely an class or instance method based on its signature.
 272
 273        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 274
 275        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 276
 277        Returns:
 278        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 279        """
 280        return (
 281            "self" in inspect.signature(func).parameters
 282            or "cls" in inspect.signature(func).parameters
 283        )
 284
 285    def _prepare_call(
 286        self,
 287        *,
 288        name: str,
 289        as_type: Optional[Literal["generation"]],
 290        capture_input: bool,
 291        is_method: bool = False,
 292        func_args: Tuple = (),
 293        func_kwargs: Dict = {},
 294    ) -> Optional[
 295        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 296    ]:
 297        try:
 298            stack = _observation_stack_context.get().copy()
 299            parent = stack[-1] if stack else None
 300
 301            # Collect default observation data
 302            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 303            provided_parent_trace_id = func_kwargs.pop("langfuse_parent_trace_id", None)
 304            provided_parent_observation_id = func_kwargs.pop(
 305                "langfuse_parent_observation_id", None
 306            )
 307
 308            id = str(observation_id) if observation_id else None
 309            start_time = _get_timestamp()
 310
 311            input = (
 312                self._get_input_from_func_args(
 313                    is_method=is_method,
 314                    func_args=func_args,
 315                    func_kwargs=func_kwargs,
 316                )
 317                if capture_input
 318                else None
 319            )
 320
 321            params = {
 322                "id": id,
 323                "name": name,
 324                "start_time": start_time,
 325                "input": input,
 326            }
 327
 328            # Handle user-providedparent trace ID and observation ID
 329            if parent and (provided_parent_trace_id or provided_parent_observation_id):
 330                self._log.warning(
 331                    "Ignoring langfuse_parent_trace_id and/or langfuse_parent_observation_id as they can be only set in the top-level decorated function."
 332                )
 333
 334            elif provided_parent_observation_id and not provided_parent_trace_id:
 335                self._log.warning(
 336                    "Ignoring langfuse_parent_observation_id as langfuse_parent_trace_id is not set."
 337                )
 338
 339            elif provided_parent_observation_id and (
 340                provided_parent_observation_id != provided_parent_trace_id
 341            ):
 342                parent = StatefulSpanClient(
 343                    id=provided_parent_observation_id,
 344                    trace_id=provided_parent_trace_id,
 345                    task_manager=self.client_instance.task_manager,
 346                    client=self.client_instance.client,
 347                    state_type=StateType.OBSERVATION,
 348                )
 349                self._set_root_trace_id(provided_parent_trace_id)
 350
 351            elif provided_parent_trace_id:
 352                parent = StatefulTraceClient(
 353                    id=provided_parent_trace_id,
 354                    trace_id=provided_parent_trace_id,
 355                    task_manager=self.client_instance.task_manager,
 356                    client=self.client_instance.client,
 357                    state_type=StateType.TRACE,
 358                )
 359                self._set_root_trace_id(provided_parent_trace_id)
 360
 361            # Create observation
 362            if parent and as_type == "generation":
 363                observation = parent.generation(**params)
 364            elif as_type == "generation":
 365                # Create wrapper trace if generation is top-level
 366                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 367                trace = self.client_instance.trace(
 368                    id=_root_trace_id_context.get() or id,
 369                    name=name,
 370                    start_time=start_time,
 371                )
 372                self._set_root_trace_id(trace.id)
 373
 374                observation = self.client_instance.generation(
 375                    name=name, start_time=start_time, input=input, trace_id=trace.id
 376                )
 377            elif parent:
 378                observation = parent.span(**params)
 379            else:
 380                params["id"] = _root_trace_id_context.get() or params["id"]
 381                observation = self.client_instance.trace(**params)
 382
 383            _observation_stack_context.set(stack + [observation])
 384
 385            return observation
 386        except Exception as e:
 387            self._log.error(f"Failed to prepare observation: {e}")
 388
 389    def _get_input_from_func_args(
 390        self,
 391        *,
 392        is_method: bool = False,
 393        func_args: Tuple = (),
 394        func_kwargs: Dict = {},
 395    ) -> Any:
 396        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 397        logged_args = func_args[1:] if is_method else func_args
 398        raw_input = {
 399            "args": logged_args,
 400            "kwargs": func_kwargs,
 401        }
 402
 403        # Serialize and deserialize to ensure proper JSON serialization.
 404        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 405        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 406
 407    def _finalize_call(
 408        self,
 409        observation: Optional[
 410            Union[
 411                StatefulSpanClient,
 412                StatefulTraceClient,
 413                StatefulGenerationClient,
 414            ]
 415        ],
 416        result: Any,
 417        capture_output: bool,
 418        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 419    ):
 420        if inspect.isgenerator(result):
 421            return self._wrap_sync_generator_result(
 422                observation, result, capture_output, transform_to_string
 423            )
 424        elif inspect.isasyncgen(result):
 425            return self._wrap_async_generator_result(
 426                observation, result, capture_output, transform_to_string
 427            )
 428
 429        else:
 430            return self._handle_call_result(observation, result, capture_output)
 431
 432    def _handle_call_result(
 433        self,
 434        observation: Optional[
 435            Union[
 436                StatefulSpanClient,
 437                StatefulTraceClient,
 438                StatefulGenerationClient,
 439            ]
 440        ],
 441        result: Any,
 442        capture_output: bool,
 443    ):
 444        try:
 445            if observation is None:
 446                raise ValueError("No observation found in the current context")
 447
 448            # Collect final observation data
 449            observation_params = self._pop_observation_params_from_context(
 450                observation.id
 451            )
 452
 453            end_time = observation_params["end_time"] or _get_timestamp()
 454
 455            output = observation_params["output"] or (
 456                # Serialize and deserialize to ensure proper JSON serialization.
 457                # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 458                json.loads(
 459                    json.dumps(
 460                        result if result is not None and capture_output else None,
 461                        cls=EventSerializer,
 462                    )
 463                )
 464            )
 465
 466            observation_params.update(end_time=end_time, output=output)
 467
 468            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 469                observation.end(**observation_params)
 470            elif isinstance(observation, StatefulTraceClient):
 471                observation.update(**observation_params)
 472
 473            # Remove observation from top of stack
 474            stack = _observation_stack_context.get()
 475            _observation_stack_context.set(stack[:-1])
 476
 477            # Update trace that was provided directly and not part of the observation stack
 478            if not _observation_stack_context.get() and (
 479                provided_trace_id := _root_trace_id_context.get()
 480            ):
 481                observation_params = self._pop_observation_params_from_context(
 482                    provided_trace_id
 483                )
 484
 485                has_updates = any(observation_params.values())
 486
 487                if has_updates:
 488                    trace_client = StatefulTraceClient(
 489                        id=provided_trace_id,
 490                        trace_id=provided_trace_id,
 491                        task_manager=self.client_instance.task_manager,
 492                        client=self.client_instance.client,
 493                        state_type=StateType.TRACE,
 494                    )
 495                    trace_client.update(**observation_params)
 496
 497        except Exception as e:
 498            self._log.error(f"Failed to finalize observation: {e}")
 499
 500        finally:
 501            # Clear the context trace ID to avoid leaking to next execution
 502            if not _observation_stack_context.get():
 503                _root_trace_id_context.set(None)
 504
 505            return result
 506
 507    def _handle_exception(
 508        self,
 509        observation: Optional[
 510            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 511        ],
 512        e: Exception,
 513    ):
 514        if observation:
 515            _observation_params_context.get()[observation.id].update(
 516                level="ERROR", status_message=str(e)
 517            )
 518        raise e
 519
 520    def _wrap_sync_generator_result(
 521        self,
 522        observation: Optional[
 523            Union[
 524                StatefulSpanClient,
 525                StatefulTraceClient,
 526                StatefulGenerationClient,
 527            ]
 528        ],
 529        generator: Generator,
 530        capture_output: bool,
 531        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 532    ):
 533        items = []
 534
 535        try:
 536            for item in generator:
 537                items.append(item)
 538
 539                yield item
 540
 541        finally:
 542            output = items
 543
 544            if transform_to_string is not None:
 545                output = transform_to_string(items)
 546
 547            elif all(isinstance(item, str) for item in items):
 548                output = "".join(items)
 549
 550            self._handle_call_result(observation, output, capture_output)
 551
 552    async def _wrap_async_generator_result(
 553        self,
 554        observation: Optional[
 555            Union[
 556                StatefulSpanClient,
 557                StatefulTraceClient,
 558                StatefulGenerationClient,
 559            ]
 560        ],
 561        generator: AsyncGenerator,
 562        capture_output: bool,
 563        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 564    ) -> AsyncGenerator:
 565        items = []
 566
 567        try:
 568            async for item in generator:
 569                items.append(item)
 570
 571                yield item
 572
 573        finally:
 574            output = items
 575
 576            if transform_to_string is not None:
 577                output = transform_to_string(items)
 578
 579            elif all(isinstance(item, str) for item in items):
 580                output = "".join(items)
 581
 582            self._handle_call_result(observation, output, capture_output)
 583
 584    def get_current_llama_index_handler(self):
 585        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 586
 587        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 588        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 589
 590        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 591
 592        Returns:
 593            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 594
 595        Note:
 596            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 597            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 598        """
 599        try:
 600            from langfuse.llama_index import LlamaIndexCallbackHandler
 601        except ImportError:
 602            self._log.error(
 603                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 604            )
 605
 606            return None
 607
 608        stack = _observation_stack_context.get()
 609        observation = stack[-1] if stack else None
 610
 611        if observation is None:
 612            self._log.warning("No observation found in the current context")
 613
 614            return None
 615
 616        if isinstance(observation, StatefulGenerationClient):
 617            self._log.warning(
 618                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 619            )
 620
 621            return None
 622
 623        callback_handler = LlamaIndexCallbackHandler()
 624        callback_handler.set_root(observation)
 625
 626        return callback_handler
 627
 628    def get_current_langchain_handler(self):
 629        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 630
 631        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 632        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 633
 634        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 635
 636        Returns:
 637            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 638
 639        Note:
 640            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 641            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 642        """
 643        stack = _observation_stack_context.get()
 644        observation = stack[-1] if stack else None
 645
 646        if observation is None:
 647            self._log.warning("No observation found in the current context")
 648
 649            return None
 650
 651        if isinstance(observation, StatefulGenerationClient):
 652            self._log.warning(
 653                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 654            )
 655
 656            return None
 657
 658        return observation.get_langchain_handler()
 659
 660    def get_current_trace_id(self):
 661        """Retrieve the ID of the current trace from the observation stack context.
 662
 663        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 664        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 665        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
 666
 667        Returns:
 668            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 669            possibly due to the method being called outside of any @observe-decorated function execution.
 670
 671        Note:
 672            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 673            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 674        """
 675        context_trace_id = _root_trace_id_context.get()
 676        if context_trace_id:
 677            return context_trace_id
 678
 679        stack = _observation_stack_context.get()
 680
 681        if not stack:
 682            return None
 683
 684        return stack[0].id
 685
 686    def get_current_trace_url(self) -> Optional[str]:
 687        """Retrieve the URL of the current trace in context.
 688
 689        Returns:
 690            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 691            possibly due to the method being called outside of any @observe-decorated function execution.
 692
 693        Note:
 694            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 695            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 696        """
 697        try:
 698            trace_id = self.get_current_trace_id()
 699
 700            if not trace_id:
 701                raise ValueError("No trace found in the current context")
 702
 703            project_id = self.client_instance._get_project_id()
 704
 705            if not project_id:
 706                return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
 707
 708            return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}"
 709
 710        except Exception as e:
 711            self._log.error(f"Failed to get current trace URL: {e}")
 712
 713            return None
 714
 715    def get_current_observation_id(self):
 716        """Retrieve the ID of the current observation in context.
 717
 718        Returns:
 719            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 720            possibly due to the method being called outside of any @observe-decorated function execution.
 721
 722        Note:
 723            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 724            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 725            - If called at the top level of a trace, it will return the trace ID.
 726        """
 727        stack = _observation_stack_context.get()
 728
 729        if not stack:
 730            return None
 731
 732        return stack[-1].id
 733
 734    def update_current_trace(
 735        self,
 736        name: Optional[str] = None,
 737        input: Optional[Any] = None,
 738        output: Optional[Any] = None,
 739        user_id: Optional[str] = None,
 740        session_id: Optional[str] = None,
 741        version: Optional[str] = None,
 742        release: Optional[str] = None,
 743        metadata: Optional[Any] = None,
 744        tags: Optional[List[str]] = None,
 745        public: Optional[bool] = None,
 746    ):
 747        """Set parameters for the current trace, updating the trace's metadata and context information.
 748
 749        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 750        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 751        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 752
 753        Arguments:
 754            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 755            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
 756            output (Optional[Any]): The output or result of the trace
 757            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 758            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 759            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 760            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 761            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 762            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 763
 764        Returns:
 765            None
 766
 767        Note:
 768            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 769            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 770            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 771        """
 772        trace_id = self.get_current_trace_id()
 773
 774        if trace_id is None:
 775            self._log.warning("No trace found in the current context")
 776
 777            return
 778
 779        params_to_update = {
 780            k: v
 781            for k, v in {
 782                "name": name,
 783                "input": input,
 784                "output": output,
 785                "user_id": user_id,
 786                "session_id": session_id,
 787                "version": version,
 788                "release": release,
 789                "metadata": metadata,
 790                "tags": tags,
 791                "public": public,
 792            }.items()
 793            if v is not None
 794        }
 795
 796        # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side
 797        server_merged_attributes = ["metadata", "tags"]
 798        if any(attribute in params_to_update for attribute in server_merged_attributes):
 799            self.client_instance.trace(
 800                id=trace_id,
 801                **{
 802                    k: v
 803                    for k, v in params_to_update.items()
 804                    if k in server_merged_attributes
 805                },
 806            )
 807
 808        _observation_params_context.get()[trace_id].update(params_to_update)
 809
 810    def update_current_observation(
 811        self,
 812        *,
 813        input: Optional[Any] = None,
 814        output: Optional[Any] = None,
 815        name: Optional[str] = None,
 816        version: Optional[str] = None,
 817        metadata: Optional[Any] = None,
 818        start_time: Optional[datetime] = None,
 819        end_time: Optional[datetime] = None,
 820        release: Optional[str] = None,
 821        tags: Optional[List[str]] = None,
 822        user_id: Optional[str] = None,
 823        session_id: Optional[str] = None,
 824        level: Optional[SpanLevel] = None,
 825        status_message: Optional[str] = None,
 826        completion_start_time: Optional[datetime] = None,
 827        model: Optional[str] = None,
 828        model_parameters: Optional[Dict[str, MapValue]] = None,
 829        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 830        usage_details: Optional[UsageDetails] = None,
 831        cost_details: Optional[Dict[str, float]] = None,
 832        prompt: Optional[PromptClient] = None,
 833        public: Optional[bool] = None,
 834    ):
 835        """Update parameters for the current observation within an active trace context.
 836
 837        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 838        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 839        enhancing the observability and traceability of the execution context.
 840
 841        Note that if a param is not available on a specific observation type, it will be ignored.
 842
 843        Shared params:
 844            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 845            - `output` (Optional[Any]): The output or result of the trace or observation
 846            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 847            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 848            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 849            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 850            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 851
 852        Trace-specific params:
 853            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 854            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 855            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 856            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 857            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 858
 859        Span-specific params:
 860            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 861            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 862
 863        Generation-specific params:
 864            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 865            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 866            - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 867            - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
 868            - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
 869            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 870
 871        Returns:
 872            None
 873
 874        Raises:
 875            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 876
 877        Note:
 878            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 879            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 880            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 881        """
 882        stack = _observation_stack_context.get()
 883        observation = stack[-1] if stack else None
 884
 885        if not observation:
 886            self._log.warning("No observation found in the current context")
 887
 888            return
 889
 890        update_params = {
 891            k: v
 892            for k, v in {
 893                "input": input,
 894                "output": output,
 895                "name": name,
 896                "version": version,
 897                "metadata": metadata,
 898                "start_time": start_time,
 899                "end_time": end_time,
 900                "release": release,
 901                "tags": tags,
 902                "user_id": user_id,
 903                "session_id": session_id,
 904                "level": level,
 905                "status_message": status_message,
 906                "completion_start_time": completion_start_time,
 907                "model": model,
 908                "model_parameters": model_parameters,
 909                "usage": usage,
 910                "usage_details": usage_details,
 911                "cost_details": cost_details,
 912                "prompt": prompt,
 913                "public": public,
 914            }.items()
 915            if v is not None
 916        }
 917
 918        _observation_params_context.get()[observation.id].update(update_params)
 919
 920    def score_current_observation(
 921        self,
 922        *,
 923        name: str,
 924        value: Union[float, str],
 925        data_type: Optional[ScoreDataType] = None,
 926        comment: Optional[str] = None,
 927        id: Optional[str] = None,
 928        config_id: Optional[str] = None,
 929    ):
 930        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 931
 932        Arguments:
 933            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 934            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 935            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 936              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 937            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 938            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 939            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 940
 941        Returns:
 942            None
 943
 944        Note:
 945            This method is intended to be used within the context of an active trace or observation.
 946        """
 947        try:
 948            trace_id = self.get_current_trace_id()
 949            current_observation_id = self.get_current_observation_id()
 950
 951            observation_id = (
 952                current_observation_id if current_observation_id != trace_id else None
 953            )
 954
 955            if trace_id:
 956                self.client_instance.score(
 957                    trace_id=trace_id,
 958                    observation_id=observation_id,
 959                    name=name,
 960                    value=value,
 961                    data_type=data_type,
 962                    comment=comment,
 963                    id=id,
 964                    config_id=config_id,
 965                )
 966            else:
 967                raise ValueError("No trace or observation found in the current context")
 968
 969        except Exception as e:
 970            self._log.error(f"Failed to score observation: {e}")
 971
 972    def score_current_trace(
 973        self,
 974        *,
 975        name: str,
 976        value: Union[float, str],
 977        data_type: Optional[ScoreDataType] = None,
 978        comment: Optional[str] = None,
 979        id: Optional[str] = None,
 980        config_id: Optional[str] = None,
 981    ):
 982        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 983
 984        Arguments:
 985            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 986            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 987            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 988              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 989            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 990            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 991            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 992
 993        Returns:
 994            None
 995
 996        Note:
 997            This method is intended to be used within the context of an active trace or observation.
 998        """
 999        try:
1000            trace_id = self.get_current_trace_id()
1001
1002            if trace_id:
1003                self.client_instance.score(
1004                    trace_id=trace_id,
1005                    name=name,
1006                    value=value,
1007                    data_type=data_type,
1008                    comment=comment,
1009                    id=id,
1010                    config_id=config_id,
1011                )
1012            else:
1013                raise ValueError("No trace found in the current context")
1014
1015        except Exception as e:
1016            self._log.error(f"Failed to score observation: {e}")
1017
1018    @catch_and_log_errors
1019    def flush(self):
1020        """Force immediate flush of all buffered observations to the Langfuse backend.
1021
1022        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1023        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1024
1025        Usage:
1026            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1027            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1028
1029        Returns:
1030            None
1031
1032        Raises:
1033            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1034
1035        Note:
1036            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1037            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1038            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1039        """
1040        if self.client_instance:
1041            self.client_instance.flush()
1042        else:
1043            self._log.warning("No langfuse object found in the current context")
1044
1045    def configure(
1046        self,
1047        *,
1048        public_key: Optional[str] = None,
1049        secret_key: Optional[str] = None,
1050        host: Optional[str] = None,
1051        release: Optional[str] = None,
1052        debug: Optional[bool] = None,
1053        threads: Optional[int] = None,
1054        flush_at: Optional[int] = None,
1055        flush_interval: Optional[int] = None,
1056        max_retries: Optional[int] = None,
1057        timeout: Optional[int] = None,
1058        httpx_client: Optional[httpx.Client] = None,
1059        enabled: Optional[bool] = None,
1060        mask: Optional[Callable] = None,
1061    ):
1062        """Configure the Langfuse client.
1063
1064        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1065
1066        Args:
1067            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1068            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1069            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1070            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1071            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1072            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1073            flush_at: Max batch size that's sent to the API.
1074            flush_interval: Max delay until a new batch is sent to the API.
1075            max_retries: Max number of retries in case of API/network errors.
1076            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1077            httpx_client: Pass your own httpx client for more customizability of requests.
1078            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1079            mask (Callable): Function that masks sensitive information from input and output in log messages.
1080
1081        """
1082        langfuse_singleton = LangfuseSingleton()
1083        langfuse_singleton.reset()
1084
1085        langfuse_singleton.get(
1086            public_key=public_key,
1087            secret_key=secret_key,
1088            host=host,
1089            release=release,
1090            debug=debug,
1091            threads=threads,
1092            flush_at=flush_at,
1093            flush_interval=flush_interval,
1094            max_retries=max_retries,
1095            timeout=timeout,
1096            httpx_client=httpx_client,
1097            enabled=enabled,
1098            mask=mask,
1099        )
1100
1101    @property
1102    def client_instance(self) -> Langfuse:
1103        """Get the Langfuse client instance for the current decorator context."""
1104        return LangfuseSingleton().get()
1105
1106    def _set_root_trace_id(self, trace_id: str):
1107        if _observation_stack_context.get():
1108            self._log.warning(
1109                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
1110            )
1111            return
1112
1113        _root_trace_id_context.set(trace_id)
1114
1115    def _pop_observation_params_from_context(
1116        self, observation_id: str
1117    ) -> ObservationParams:
1118        params = _observation_params_context.get()[observation_id].copy()
1119
1120        # Remove observation params to avoid leaking
1121        del _observation_params_context.get()[observation_id]
1122
1123        return params
1124
1125    def auth_check(self) -> bool:
1126        """Check if the current Langfuse client is authenticated.
1127
1128        Returns:
1129            bool: True if the client is authenticated, False otherwise
1130        """
1131        try:
1132            return self.client_instance.auth_check()
1133        except Exception as e:
1134            self._log.error(
1135                "No Langfuse object found in the current context", exc_info=e
1136            )
1137
1138            return False
def observe( self, func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
117    def observe(
118        self,
119        func: Optional[Callable[P, R]] = None,
120        *,
121        name: Optional[str] = None,
122        as_type: Optional[Literal["generation"]] = None,
123        capture_input: bool = True,
124        capture_output: bool = True,
125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
128
129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
131
132        Attributes:
133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
138
139        Returns:
140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
141
142        Example:
143            For general tracing (functions/methods):
144            ```python
145            @observe()
146            def your_function(args):
147                # Your implementation here
148            ```
149            For observing language model generations:
150            ```python
151            @observe(as_type="generation")
152            def your_LLM_function(args):
153                # Your LLM invocation here
154            ```
155
156        Raises:
157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
158
159        Note:
160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
162        """
163
164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
165            return (
166                self._async_observe(
167                    func,
168                    name=name,
169                    as_type=as_type,
170                    capture_input=capture_input,
171                    capture_output=capture_output,
172                    transform_to_string=transform_to_string,
173                )
174                if asyncio.iscoroutinefunction(func)
175                else self._sync_observe(
176                    func,
177                    name=name,
178                    as_type=as_type,
179                    capture_input=capture_input,
180                    capture_output=capture_output,
181                    transform_to_string=transform_to_string,
182                )
183            )
184
185        """
186        If the decorator is called without arguments, return the decorator function itself. 
187        This allows the decorator to be used with or without arguments. 
188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
189        """
190        if func is None:
191            return decorator
192        else:
193            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
584    def get_current_llama_index_handler(self):
585        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
586
587        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
588        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
589
590        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
591
592        Returns:
593            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
594
595        Note:
596            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
597            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
598        """
599        try:
600            from langfuse.llama_index import LlamaIndexCallbackHandler
601        except ImportError:
602            self._log.error(
603                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
604            )
605
606            return None
607
608        stack = _observation_stack_context.get()
609        observation = stack[-1] if stack else None
610
611        if observation is None:
612            self._log.warning("No observation found in the current context")
613
614            return None
615
616        if isinstance(observation, StatefulGenerationClient):
617            self._log.warning(
618                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
619            )
620
621            return None
622
623        callback_handler = LlamaIndexCallbackHandler()
624        callback_handler.set_root(observation)
625
626        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
628    def get_current_langchain_handler(self):
629        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
630
631        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
632        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
633
634        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
635
636        Returns:
637            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
638
639        Note:
640            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
641            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
642        """
643        stack = _observation_stack_context.get()
644        observation = stack[-1] if stack else None
645
646        if observation is None:
647            self._log.warning("No observation found in the current context")
648
649            return None
650
651        if isinstance(observation, StatefulGenerationClient):
652            self._log.warning(
653                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
654            )
655
656            return None
657
658        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
660    def get_current_trace_id(self):
661        """Retrieve the ID of the current trace from the observation stack context.
662
663        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
664        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
665        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
666
667        Returns:
668            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
669            possibly due to the method being called outside of any @observe-decorated function execution.
670
671        Note:
672            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
673            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
674        """
675        context_trace_id = _root_trace_id_context.get()
676        if context_trace_id:
677            return context_trace_id
678
679        stack = _observation_stack_context.get()
680
681        if not stack:
682            return None
683
684        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
686    def get_current_trace_url(self) -> Optional[str]:
687        """Retrieve the URL of the current trace in context.
688
689        Returns:
690            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
691            possibly due to the method being called outside of any @observe-decorated function execution.
692
693        Note:
694            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
695            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
696        """
697        try:
698            trace_id = self.get_current_trace_id()
699
700            if not trace_id:
701                raise ValueError("No trace found in the current context")
702
703            project_id = self.client_instance._get_project_id()
704
705            if not project_id:
706                return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
707
708            return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}"
709
710        except Exception as e:
711            self._log.error(f"Failed to get current trace URL: {e}")
712
713            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
715    def get_current_observation_id(self):
716        """Retrieve the ID of the current observation in context.
717
718        Returns:
719            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
720            possibly due to the method being called outside of any @observe-decorated function execution.
721
722        Note:
723            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
724            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
725            - If called at the top level of a trace, it will return the trace ID.
726        """
727        stack = _observation_stack_context.get()
728
729        if not stack:
730            return None
731
732        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, input: Optional[Any] = None, output: Optional[Any] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
734    def update_current_trace(
735        self,
736        name: Optional[str] = None,
737        input: Optional[Any] = None,
738        output: Optional[Any] = None,
739        user_id: Optional[str] = None,
740        session_id: Optional[str] = None,
741        version: Optional[str] = None,
742        release: Optional[str] = None,
743        metadata: Optional[Any] = None,
744        tags: Optional[List[str]] = None,
745        public: Optional[bool] = None,
746    ):
747        """Set parameters for the current trace, updating the trace's metadata and context information.
748
749        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
750        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
751        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
752
753        Arguments:
754            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
755            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
756            output (Optional[Any]): The output or result of the trace
757            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
758            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
759            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
760            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
761            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
762            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
763
764        Returns:
765            None
766
767        Note:
768            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
769            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
770            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
771        """
772        trace_id = self.get_current_trace_id()
773
774        if trace_id is None:
775            self._log.warning("No trace found in the current context")
776
777            return
778
779        params_to_update = {
780            k: v
781            for k, v in {
782                "name": name,
783                "input": input,
784                "output": output,
785                "user_id": user_id,
786                "session_id": session_id,
787                "version": version,
788                "release": release,
789                "metadata": metadata,
790                "tags": tags,
791                "public": public,
792            }.items()
793            if v is not None
794        }
795
796        # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side
797        server_merged_attributes = ["metadata", "tags"]
798        if any(attribute in params_to_update for attribute in server_merged_attributes):
799            self.client_instance.trace(
800                id=trace_id,
801                **{
802                    k: v
803                    for k, v in params_to_update.items()
804                    if k in server_merged_attributes
805                },
806            )
807
808        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, usage_details: Union[Dict[str, int], langfuse.api.OpenAiUsageSchema, NoneType] = None, cost_details: Optional[Dict[str, float]] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
810    def update_current_observation(
811        self,
812        *,
813        input: Optional[Any] = None,
814        output: Optional[Any] = None,
815        name: Optional[str] = None,
816        version: Optional[str] = None,
817        metadata: Optional[Any] = None,
818        start_time: Optional[datetime] = None,
819        end_time: Optional[datetime] = None,
820        release: Optional[str] = None,
821        tags: Optional[List[str]] = None,
822        user_id: Optional[str] = None,
823        session_id: Optional[str] = None,
824        level: Optional[SpanLevel] = None,
825        status_message: Optional[str] = None,
826        completion_start_time: Optional[datetime] = None,
827        model: Optional[str] = None,
828        model_parameters: Optional[Dict[str, MapValue]] = None,
829        usage: Optional[Union[BaseModel, ModelUsage]] = None,
830        usage_details: Optional[UsageDetails] = None,
831        cost_details: Optional[Dict[str, float]] = None,
832        prompt: Optional[PromptClient] = None,
833        public: Optional[bool] = None,
834    ):
835        """Update parameters for the current observation within an active trace context.
836
837        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
838        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
839        enhancing the observability and traceability of the execution context.
840
841        Note that if a param is not available on a specific observation type, it will be ignored.
842
843        Shared params:
844            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
845            - `output` (Optional[Any]): The output or result of the trace or observation
846            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
847            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
848            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
849            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
850            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
851
852        Trace-specific params:
853            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
854            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
855            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
856            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
857            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
858
859        Span-specific params:
860            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
861            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
862
863        Generation-specific params:
864            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
865            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
866            - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
867            - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
868            - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
869            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
870
871        Returns:
872            None
873
874        Raises:
875            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
876
877        Note:
878            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
879            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
880            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
881        """
882        stack = _observation_stack_context.get()
883        observation = stack[-1] if stack else None
884
885        if not observation:
886            self._log.warning("No observation found in the current context")
887
888            return
889
890        update_params = {
891            k: v
892            for k, v in {
893                "input": input,
894                "output": output,
895                "name": name,
896                "version": version,
897                "metadata": metadata,
898                "start_time": start_time,
899                "end_time": end_time,
900                "release": release,
901                "tags": tags,
902                "user_id": user_id,
903                "session_id": session_id,
904                "level": level,
905                "status_message": status_message,
906                "completion_start_time": completion_start_time,
907                "model": model,
908                "model_parameters": model_parameters,
909                "usage": usage,
910                "usage_details": usage_details,
911                "cost_details": cost_details,
912                "prompt": prompt,
913                "public": public,
914            }.items()
915            if v is not None
916        }
917
918        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use usage_details and cost_details instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - usage_details (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. - cost_details (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
920    def score_current_observation(
921        self,
922        *,
923        name: str,
924        value: Union[float, str],
925        data_type: Optional[ScoreDataType] = None,
926        comment: Optional[str] = None,
927        id: Optional[str] = None,
928        config_id: Optional[str] = None,
929    ):
930        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
931
932        Arguments:
933            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
934            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
935            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
936              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
937            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
938            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
939            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
940
941        Returns:
942            None
943
944        Note:
945            This method is intended to be used within the context of an active trace or observation.
946        """
947        try:
948            trace_id = self.get_current_trace_id()
949            current_observation_id = self.get_current_observation_id()
950
951            observation_id = (
952                current_observation_id if current_observation_id != trace_id else None
953            )
954
955            if trace_id:
956                self.client_instance.score(
957                    trace_id=trace_id,
958                    observation_id=observation_id,
959                    name=name,
960                    value=value,
961                    data_type=data_type,
962                    comment=comment,
963                    id=id,
964                    config_id=config_id,
965                )
966            else:
967                raise ValueError("No trace or observation found in the current context")
968
969        except Exception as e:
970            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
 972    def score_current_trace(
 973        self,
 974        *,
 975        name: str,
 976        value: Union[float, str],
 977        data_type: Optional[ScoreDataType] = None,
 978        comment: Optional[str] = None,
 979        id: Optional[str] = None,
 980        config_id: Optional[str] = None,
 981    ):
 982        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 983
 984        Arguments:
 985            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 986            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 987            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 988              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 989            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 990            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 991            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 992
 993        Returns:
 994            None
 995
 996        Note:
 997            This method is intended to be used within the context of an active trace or observation.
 998        """
 999        try:
1000            trace_id = self.get_current_trace_id()
1001
1002            if trace_id:
1003                self.client_instance.score(
1004                    trace_id=trace_id,
1005                    name=name,
1006                    value=value,
1007                    data_type=data_type,
1008                    comment=comment,
1009                    id=id,
1010                    config_id=config_id,
1011                )
1012            else:
1013                raise ValueError("No trace found in the current context")
1014
1015        except Exception as e:
1016            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
1018    @catch_and_log_errors
1019    def flush(self):
1020        """Force immediate flush of all buffered observations to the Langfuse backend.
1021
1022        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1023        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1024
1025        Usage:
1026            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1027            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1028
1029        Returns:
1030            None
1031
1032        Raises:
1033            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1034
1035        Note:
1036            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1037            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1038            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1039        """
1040        if self.client_instance:
1041            self.client_instance.flush()
1042        else:
1043            self._log.warning("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, mask: Optional[Callable] = None):
1045    def configure(
1046        self,
1047        *,
1048        public_key: Optional[str] = None,
1049        secret_key: Optional[str] = None,
1050        host: Optional[str] = None,
1051        release: Optional[str] = None,
1052        debug: Optional[bool] = None,
1053        threads: Optional[int] = None,
1054        flush_at: Optional[int] = None,
1055        flush_interval: Optional[int] = None,
1056        max_retries: Optional[int] = None,
1057        timeout: Optional[int] = None,
1058        httpx_client: Optional[httpx.Client] = None,
1059        enabled: Optional[bool] = None,
1060        mask: Optional[Callable] = None,
1061    ):
1062        """Configure the Langfuse client.
1063
1064        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1065
1066        Args:
1067            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1068            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1069            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1070            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1071            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1072            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1073            flush_at: Max batch size that's sent to the API.
1074            flush_interval: Max delay until a new batch is sent to the API.
1075            max_retries: Max number of retries in case of API/network errors.
1076            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1077            httpx_client: Pass your own httpx client for more customizability of requests.
1078            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1079            mask (Callable): Function that masks sensitive information from input and output in log messages.
1080
1081        """
1082        langfuse_singleton = LangfuseSingleton()
1083        langfuse_singleton.reset()
1084
1085        langfuse_singleton.get(
1086            public_key=public_key,
1087            secret_key=secret_key,
1088            host=host,
1089            release=release,
1090            debug=debug,
1091            threads=threads,
1092            flush_at=flush_at,
1093            flush_interval=flush_interval,
1094            max_retries=max_retries,
1095            timeout=timeout,
1096            httpx_client=httpx_client,
1097            enabled=enabled,
1098            mask=mask,
1099        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
  • mask (Callable): Function that masks sensitive information from input and output in log messages.
client_instance: langfuse.client.Langfuse
1101    @property
1102    def client_instance(self) -> Langfuse:
1103        """Get the Langfuse client instance for the current decorator context."""
1104        return LangfuseSingleton().get()

Get the Langfuse client instance for the current decorator context.

def auth_check(self) -> bool:
1125    def auth_check(self) -> bool:
1126        """Check if the current Langfuse client is authenticated.
1127
1128        Returns:
1129            bool: True if the client is authenticated, False otherwise
1130        """
1131        try:
1132            return self.client_instance.auth_check()
1133        except Exception as e:
1134            self._log.error(
1135                "No Langfuse object found in the current context", exc_info=e
1136            )
1137
1138            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise