langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
117    def observe(
118        self,
119        func: Optional[Callable[P, R]] = None,
120        *,
121        name: Optional[str] = None,
122        as_type: Optional[Literal["generation"]] = None,
123        capture_input: bool = True,
124        capture_output: bool = True,
125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
128
129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
131
132        Attributes:
133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
138
139        Returns:
140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
141
142        Example:
143            For general tracing (functions/methods):
144            ```python
145            @observe()
146            def your_function(args):
147                # Your implementation here
148            ```
149            For observing language model generations:
150            ```python
151            @observe(as_type="generation")
152            def your_LLM_function(args):
153                # Your LLM invocation here
154            ```
155
156        Raises:
157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
158
159        Note:
160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
162        """
163
164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
165            return (
166                self._async_observe(
167                    func,
168                    name=name,
169                    as_type=as_type,
170                    capture_input=capture_input,
171                    capture_output=capture_output,
172                    transform_to_string=transform_to_string,
173                )
174                if asyncio.iscoroutinefunction(func)
175                else self._sync_observe(
176                    func,
177                    name=name,
178                    as_type=as_type,
179                    capture_input=capture_input,
180                    capture_output=capture_output,
181                    transform_to_string=transform_to_string,
182                )
183            )
184
185        """
186        If the decorator is called without arguments, return the decorator function itself. 
187        This allows the decorator to be used with or without arguments. 
188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
189        """
190        if func is None:
191            return decorator
192        else:
193            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  96class LangfuseDecorator:
  97    _log = logging.getLogger("langfuse")
  98
  99    # Type overload for observe decorator with no arguments
 100    @overload
 101    def observe(self, func: F) -> F: ...
 102
 103    # Type overload for observe decorator with arguments
 104    @overload
 105    def observe(
 106        self,
 107        func: None = None,
 108        *,
 109        name: Optional[str] = None,
 110        as_type: Optional[Literal["generation"]] = None,
 111        capture_input: bool = True,
 112        capture_output: bool = True,
 113        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 114    ) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
 115
 116    # Implementation of observe decorator
 117    def observe(
 118        self,
 119        func: Optional[Callable[P, R]] = None,
 120        *,
 121        name: Optional[str] = None,
 122        as_type: Optional[Literal["generation"]] = None,
 123        capture_input: bool = True,
 124        capture_output: bool = True,
 125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
 127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 128
 129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 131
 132        Attributes:
 133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 138
 139        Returns:
 140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 141
 142        Example:
 143            For general tracing (functions/methods):
 144            ```python
 145            @observe()
 146            def your_function(args):
 147                # Your implementation here
 148            ```
 149            For observing language model generations:
 150            ```python
 151            @observe(as_type="generation")
 152            def your_LLM_function(args):
 153                # Your LLM invocation here
 154            ```
 155
 156        Raises:
 157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 158
 159        Note:
 160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 162        """
 163
 164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
 165            return (
 166                self._async_observe(
 167                    func,
 168                    name=name,
 169                    as_type=as_type,
 170                    capture_input=capture_input,
 171                    capture_output=capture_output,
 172                    transform_to_string=transform_to_string,
 173                )
 174                if asyncio.iscoroutinefunction(func)
 175                else self._sync_observe(
 176                    func,
 177                    name=name,
 178                    as_type=as_type,
 179                    capture_input=capture_input,
 180                    capture_output=capture_output,
 181                    transform_to_string=transform_to_string,
 182                )
 183            )
 184
 185        """
 186        If the decorator is called without arguments, return the decorator function itself. 
 187        This allows the decorator to be used with or without arguments. 
 188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
 189        """
 190        if func is None:
 191            return decorator
 192        else:
 193            return decorator(func)
 194
 195    def _async_observe(
 196        self,
 197        func: F,
 198        *,
 199        name: Optional[str],
 200        as_type: Optional[Literal["generation"]],
 201        capture_input: bool,
 202        capture_output: bool,
 203        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 204    ) -> F:
 205        @wraps(func)
 206        async def async_wrapper(*args, **kwargs):
 207            observation = self._prepare_call(
 208                name=name or func.__name__,
 209                as_type=as_type,
 210                capture_input=capture_input,
 211                is_method=self._is_method(func),
 212                func_args=args,
 213                func_kwargs=kwargs,
 214            )
 215            result = None
 216
 217            try:
 218                result = await func(*args, **kwargs)
 219            except Exception as e:
 220                self._handle_exception(observation, e)
 221            finally:
 222                result = self._finalize_call(
 223                    observation, result, capture_output, transform_to_string
 224                )
 225
 226                # Returning from finally block may swallow errors, so only return if result is not None
 227                if result is not None:
 228                    return result
 229
 230        return cast(F, async_wrapper)
 231
 232    def _sync_observe(
 233        self,
 234        func: F,
 235        *,
 236        name: Optional[str],
 237        as_type: Optional[Literal["generation"]],
 238        capture_input: bool,
 239        capture_output: bool,
 240        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 241    ) -> F:
 242        @wraps(func)
 243        def sync_wrapper(*args, **kwargs):
 244            observation = self._prepare_call(
 245                name=name or func.__name__,
 246                as_type=as_type,
 247                capture_input=capture_input,
 248                is_method=self._is_method(func),
 249                func_args=args,
 250                func_kwargs=kwargs,
 251            )
 252            result = None
 253
 254            try:
 255                result = func(*args, **kwargs)
 256            except Exception as e:
 257                self._handle_exception(observation, e)
 258            finally:
 259                result = self._finalize_call(
 260                    observation, result, capture_output, transform_to_string
 261                )
 262
 263                # Returning from finally block may swallow errors, so only return if result is not None
 264                if result is not None:
 265                    return result
 266
 267        return cast(F, sync_wrapper)
 268
 269    @staticmethod
 270    def _is_method(func: Callable) -> bool:
 271        """Check if a callable is likely an class or instance method based on its signature.
 272
 273        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 274
 275        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 276
 277        Returns:
 278        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 279        """
 280        return (
 281            "self" in inspect.signature(func).parameters
 282            or "cls" in inspect.signature(func).parameters
 283        )
 284
 285    def _prepare_call(
 286        self,
 287        *,
 288        name: str,
 289        as_type: Optional[Literal["generation"]],
 290        capture_input: bool,
 291        is_method: bool = False,
 292        func_args: Tuple = (),
 293        func_kwargs: Dict = {},
 294    ) -> Optional[
 295        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 296    ]:
 297        try:
 298            stack = _observation_stack_context.get().copy()
 299            parent = stack[-1] if stack else None
 300
 301            # Collect default observation data
 302            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 303            provided_parent_trace_id = func_kwargs.pop("langfuse_parent_trace_id", None)
 304            provided_parent_observation_id = func_kwargs.pop(
 305                "langfuse_parent_observation_id", None
 306            )
 307
 308            id = str(observation_id) if observation_id else None
 309            start_time = _get_timestamp()
 310
 311            input = (
 312                self._get_input_from_func_args(
 313                    is_method=is_method,
 314                    func_args=func_args,
 315                    func_kwargs=func_kwargs,
 316                )
 317                if capture_input
 318                else None
 319            )
 320
 321            params = {
 322                "id": id,
 323                "name": name,
 324                "start_time": start_time,
 325                "input": input,
 326            }
 327
 328            # Handle user-providedparent trace ID and observation ID
 329            if parent and (provided_parent_trace_id or provided_parent_observation_id):
 330                self._log.warning(
 331                    "Ignoring langfuse_parent_trace_id and/or langfuse_parent_observation_id as they can be only set in the top-level decorated function."
 332                )
 333
 334            elif provided_parent_observation_id and not provided_parent_trace_id:
 335                self._log.warning(
 336                    "Ignoring langfuse_parent_observation_id as langfuse_parent_trace_id is not set."
 337                )
 338
 339            elif provided_parent_observation_id and (
 340                provided_parent_observation_id != provided_parent_trace_id
 341            ):
 342                parent = StatefulSpanClient(
 343                    id=provided_parent_observation_id,
 344                    trace_id=provided_parent_trace_id,
 345                    task_manager=self.client_instance.task_manager,
 346                    client=self.client_instance.client,
 347                    state_type=StateType.OBSERVATION,
 348                    environment=self.client_instance.environment,
 349                )
 350                self._set_root_trace_id(provided_parent_trace_id)
 351
 352            elif provided_parent_trace_id:
 353                parent = StatefulTraceClient(
 354                    id=provided_parent_trace_id,
 355                    trace_id=provided_parent_trace_id,
 356                    task_manager=self.client_instance.task_manager,
 357                    client=self.client_instance.client,
 358                    state_type=StateType.TRACE,
 359                    environment=self.client_instance.environment,
 360                )
 361                self._set_root_trace_id(provided_parent_trace_id)
 362
 363            # Create observation
 364            if parent and as_type == "generation":
 365                observation = parent.generation(**params)
 366            elif as_type == "generation":
 367                # Create wrapper trace if generation is top-level
 368                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 369                trace = self.client_instance.trace(
 370                    id=_root_trace_id_context.get() or id,
 371                    name=name,
 372                    start_time=start_time,
 373                )
 374                self._set_root_trace_id(trace.id)
 375
 376                observation = self.client_instance.generation(
 377                    name=name, start_time=start_time, input=input, trace_id=trace.id
 378                )
 379            elif parent:
 380                observation = parent.span(**params)
 381            else:
 382                params["id"] = _root_trace_id_context.get() or params["id"]
 383                observation = self.client_instance.trace(**params)
 384
 385            _observation_stack_context.set(stack + [observation])
 386
 387            return observation
 388        except Exception as e:
 389            self._log.error(f"Failed to prepare observation: {e}")
 390
 391    def _get_input_from_func_args(
 392        self,
 393        *,
 394        is_method: bool = False,
 395        func_args: Tuple = (),
 396        func_kwargs: Dict = {},
 397    ) -> Any:
 398        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 399        logged_args = func_args[1:] if is_method else func_args
 400        raw_input = {
 401            "args": logged_args,
 402            "kwargs": func_kwargs,
 403        }
 404
 405        # Serialize and deserialize to ensure proper JSON serialization.
 406        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 407        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 408
 409    def _finalize_call(
 410        self,
 411        observation: Optional[
 412            Union[
 413                StatefulSpanClient,
 414                StatefulTraceClient,
 415                StatefulGenerationClient,
 416            ]
 417        ],
 418        result: Any,
 419        capture_output: bool,
 420        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 421    ):
 422        if inspect.isgenerator(result):
 423            return self._wrap_sync_generator_result(
 424                observation, result, capture_output, transform_to_string
 425            )
 426        elif inspect.isasyncgen(result):
 427            return self._wrap_async_generator_result(
 428                observation, result, capture_output, transform_to_string
 429            )
 430
 431        else:
 432            return self._handle_call_result(observation, result, capture_output)
 433
 434    def _handle_call_result(
 435        self,
 436        observation: Optional[
 437            Union[
 438                StatefulSpanClient,
 439                StatefulTraceClient,
 440                StatefulGenerationClient,
 441            ]
 442        ],
 443        result: Any,
 444        capture_output: bool,
 445    ):
 446        try:
 447            if observation is None:
 448                raise ValueError("No observation found in the current context")
 449
 450            # Collect final observation data
 451            observation_params = self._pop_observation_params_from_context(
 452                observation.id
 453            )
 454
 455            end_time = observation_params["end_time"] or _get_timestamp()
 456
 457            output = observation_params["output"] or (
 458                # Serialize and deserialize to ensure proper JSON serialization.
 459                # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 460                json.loads(
 461                    json.dumps(
 462                        result if result is not None and capture_output else None,
 463                        cls=EventSerializer,
 464                    )
 465                )
 466            )
 467
 468            observation_params.update(end_time=end_time, output=output)
 469
 470            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 471                observation.end(**observation_params)
 472            elif isinstance(observation, StatefulTraceClient):
 473                observation.update(**observation_params)
 474
 475            # Remove observation from top of stack
 476            stack = _observation_stack_context.get()
 477            _observation_stack_context.set(stack[:-1])
 478
 479            # Update trace that was provided directly and not part of the observation stack
 480            if not _observation_stack_context.get() and (
 481                provided_trace_id := _root_trace_id_context.get()
 482            ):
 483                observation_params = self._pop_observation_params_from_context(
 484                    provided_trace_id
 485                )
 486
 487                has_updates = any(observation_params.values())
 488
 489                if has_updates:
 490                    trace_client = StatefulTraceClient(
 491                        id=provided_trace_id,
 492                        trace_id=provided_trace_id,
 493                        task_manager=self.client_instance.task_manager,
 494                        client=self.client_instance.client,
 495                        state_type=StateType.TRACE,
 496                        environment=self.client_instance.environment,
 497                    )
 498                    trace_client.update(**observation_params)
 499
 500        except Exception as e:
 501            self._log.error(f"Failed to finalize observation: {e}")
 502
 503        finally:
 504            # Clear the context trace ID to avoid leaking to next execution
 505            if not _observation_stack_context.get():
 506                _root_trace_id_context.set(None)
 507
 508            return result
 509
 510    def _handle_exception(
 511        self,
 512        observation: Optional[
 513            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 514        ],
 515        e: Exception,
 516    ):
 517        if observation:
 518            _observation_params_context.get()[observation.id].update(
 519                level="ERROR", status_message=str(e)
 520            )
 521        raise e
 522
 523    def _wrap_sync_generator_result(
 524        self,
 525        observation: Optional[
 526            Union[
 527                StatefulSpanClient,
 528                StatefulTraceClient,
 529                StatefulGenerationClient,
 530            ]
 531        ],
 532        generator: Generator,
 533        capture_output: bool,
 534        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 535    ):
 536        items = []
 537
 538        try:
 539            for item in generator:
 540                items.append(item)
 541
 542                yield item
 543
 544        finally:
 545            output = items
 546
 547            if transform_to_string is not None:
 548                output = transform_to_string(items)
 549
 550            elif all(isinstance(item, str) for item in items):
 551                output = "".join(items)
 552
 553            self._handle_call_result(observation, output, capture_output)
 554
 555    async def _wrap_async_generator_result(
 556        self,
 557        observation: Optional[
 558            Union[
 559                StatefulSpanClient,
 560                StatefulTraceClient,
 561                StatefulGenerationClient,
 562            ]
 563        ],
 564        generator: AsyncGenerator,
 565        capture_output: bool,
 566        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 567    ) -> AsyncGenerator:
 568        items = []
 569
 570        try:
 571            async for item in generator:
 572                items.append(item)
 573
 574                yield item
 575
 576        finally:
 577            output = items
 578
 579            if transform_to_string is not None:
 580                output = transform_to_string(items)
 581
 582            elif all(isinstance(item, str) for item in items):
 583                output = "".join(items)
 584
 585            self._handle_call_result(observation, output, capture_output)
 586
 587    def get_current_llama_index_handler(self):
 588        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 589
 590        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 591        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 592
 593        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 594
 595        Returns:
 596            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 597
 598        Note:
 599            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 600            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 601        """
 602        try:
 603            from langfuse.llama_index import LlamaIndexCallbackHandler
 604        except ImportError:
 605            self._log.error(
 606                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 607            )
 608
 609            return None
 610
 611        stack = _observation_stack_context.get()
 612        observation = stack[-1] if stack else None
 613
 614        if observation is None:
 615            self._log.warning("No observation found in the current context")
 616
 617            return None
 618
 619        if isinstance(observation, StatefulGenerationClient):
 620            self._log.warning(
 621                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 622            )
 623
 624            return None
 625
 626        callback_handler = LlamaIndexCallbackHandler()
 627        callback_handler.set_root(observation)
 628
 629        return callback_handler
 630
 631    def get_current_langchain_handler(self):
 632        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 633
 634        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 635        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 636
 637        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 638
 639        Returns:
 640            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 641
 642        Note:
 643            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 644            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 645        """
 646        stack = _observation_stack_context.get()
 647        observation = stack[-1] if stack else None
 648
 649        if observation is None:
 650            self._log.warning("No observation found in the current context")
 651
 652            return None
 653
 654        if isinstance(observation, StatefulGenerationClient):
 655            self._log.warning(
 656                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 657            )
 658
 659            return None
 660
 661        return observation.get_langchain_handler()
 662
 663    def get_current_trace_id(self):
 664        """Retrieve the ID of the current trace from the observation stack context.
 665
 666        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 667        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 668        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
 669
 670        Returns:
 671            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 672            possibly due to the method being called outside of any @observe-decorated function execution.
 673
 674        Note:
 675            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 676            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 677        """
 678        context_trace_id = _root_trace_id_context.get()
 679        if context_trace_id:
 680            return context_trace_id
 681
 682        stack = _observation_stack_context.get()
 683
 684        if not stack:
 685            return None
 686
 687        return stack[0].id
 688
 689    def get_current_trace_url(self) -> Optional[str]:
 690        """Retrieve the URL of the current trace in context.
 691
 692        Returns:
 693            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 694            possibly due to the method being called outside of any @observe-decorated function execution.
 695
 696        Note:
 697            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 698            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 699        """
 700        try:
 701            trace_id = self.get_current_trace_id()
 702
 703            if not trace_id:
 704                raise ValueError("No trace found in the current context")
 705
 706            project_id = self.client_instance._get_project_id()
 707
 708            if not project_id:
 709                return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
 710
 711            return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}"
 712
 713        except Exception as e:
 714            self._log.error(f"Failed to get current trace URL: {e}")
 715
 716            return None
 717
 718    def get_current_observation_id(self):
 719        """Retrieve the ID of the current observation in context.
 720
 721        Returns:
 722            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 723            possibly due to the method being called outside of any @observe-decorated function execution.
 724
 725        Note:
 726            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 727            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 728            - If called at the top level of a trace, it will return the trace ID.
 729        """
 730        stack = _observation_stack_context.get()
 731
 732        if not stack:
 733            return None
 734
 735        return stack[-1].id
 736
 737    def update_current_trace(
 738        self,
 739        name: Optional[str] = None,
 740        input: Optional[Any] = None,
 741        output: Optional[Any] = None,
 742        user_id: Optional[str] = None,
 743        session_id: Optional[str] = None,
 744        version: Optional[str] = None,
 745        release: Optional[str] = None,
 746        metadata: Optional[Any] = None,
 747        tags: Optional[List[str]] = None,
 748        public: Optional[bool] = None,
 749    ):
 750        """Set parameters for the current trace, updating the trace's metadata and context information.
 751
 752        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 753        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 754        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 755
 756        Arguments:
 757            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 758            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
 759            output (Optional[Any]): The output or result of the trace
 760            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 761            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 762            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 763            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 764            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 765            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 766
 767        Returns:
 768            None
 769
 770        Note:
 771            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 772            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 773            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 774        """
 775        trace_id = self.get_current_trace_id()
 776
 777        if trace_id is None:
 778            self._log.warning("No trace found in the current context")
 779
 780            return
 781
 782        params_to_update = {
 783            k: v
 784            for k, v in {
 785                "name": name,
 786                "input": input,
 787                "output": output,
 788                "user_id": user_id,
 789                "session_id": session_id,
 790                "version": version,
 791                "release": release,
 792                "metadata": metadata,
 793                "tags": tags,
 794                "public": public,
 795            }.items()
 796            if v is not None
 797        }
 798
 799        # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side
 800        server_merged_attributes = ["metadata", "tags"]
 801        if any(attribute in params_to_update for attribute in server_merged_attributes):
 802            self.client_instance.trace(
 803                id=trace_id,
 804                **{
 805                    k: v
 806                    for k, v in params_to_update.items()
 807                    if k in server_merged_attributes
 808                },
 809            )
 810
 811        _observation_params_context.get()[trace_id].update(params_to_update)
 812
 813    def update_current_observation(
 814        self,
 815        *,
 816        input: Optional[Any] = None,
 817        output: Optional[Any] = None,
 818        name: Optional[str] = None,
 819        version: Optional[str] = None,
 820        metadata: Optional[Any] = None,
 821        start_time: Optional[datetime] = None,
 822        end_time: Optional[datetime] = None,
 823        release: Optional[str] = None,
 824        tags: Optional[List[str]] = None,
 825        user_id: Optional[str] = None,
 826        session_id: Optional[str] = None,
 827        level: Optional[SpanLevel] = None,
 828        status_message: Optional[str] = None,
 829        completion_start_time: Optional[datetime] = None,
 830        model: Optional[str] = None,
 831        model_parameters: Optional[Dict[str, MapValue]] = None,
 832        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 833        usage_details: Optional[UsageDetails] = None,
 834        cost_details: Optional[Dict[str, float]] = None,
 835        prompt: Optional[PromptClient] = None,
 836        public: Optional[bool] = None,
 837    ):
 838        """Update parameters for the current observation within an active trace context.
 839
 840        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 841        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 842        enhancing the observability and traceability of the execution context.
 843
 844        Note that if a param is not available on a specific observation type, it will be ignored.
 845
 846        Shared params:
 847            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 848            - `output` (Optional[Any]): The output or result of the trace or observation
 849            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 850            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 851            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 852            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 853            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 854
 855        Trace-specific params:
 856            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 857            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 858            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 859            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 860            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 861
 862        Span-specific params:
 863            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 864            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 865
 866        Generation-specific params:
 867            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 868            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 869            - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 870            - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
 871            - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
 872            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 873
 874        Returns:
 875            None
 876
 877        Raises:
 878            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 879
 880        Note:
 881            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 882            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 883            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 884        """
 885        stack = _observation_stack_context.get()
 886        observation = stack[-1] if stack else None
 887
 888        if not observation:
 889            self._log.warning("No observation found in the current context")
 890
 891            return
 892
 893        update_params = {
 894            k: v
 895            for k, v in {
 896                "input": input,
 897                "output": output,
 898                "name": name,
 899                "version": version,
 900                "metadata": metadata,
 901                "start_time": start_time,
 902                "end_time": end_time,
 903                "release": release,
 904                "tags": tags,
 905                "user_id": user_id,
 906                "session_id": session_id,
 907                "level": level,
 908                "status_message": status_message,
 909                "completion_start_time": completion_start_time,
 910                "model": model,
 911                "model_parameters": model_parameters,
 912                "usage": usage,
 913                "usage_details": usage_details,
 914                "cost_details": cost_details,
 915                "prompt": prompt,
 916                "public": public,
 917            }.items()
 918            if v is not None
 919        }
 920
 921        _observation_params_context.get()[observation.id].update(update_params)
 922
 923    def score_current_observation(
 924        self,
 925        *,
 926        name: str,
 927        value: Union[float, str],
 928        data_type: Optional[ScoreDataType] = None,
 929        comment: Optional[str] = None,
 930        id: Optional[str] = None,
 931        config_id: Optional[str] = None,
 932    ):
 933        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 934
 935        Arguments:
 936            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 937            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 938            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 939              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 940            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 941            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 942            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 943
 944        Returns:
 945            None
 946
 947        Note:
 948            This method is intended to be used within the context of an active trace or observation.
 949        """
 950        try:
 951            trace_id = self.get_current_trace_id()
 952            current_observation_id = self.get_current_observation_id()
 953
 954            observation_id = (
 955                current_observation_id if current_observation_id != trace_id else None
 956            )
 957
 958            if trace_id:
 959                self.client_instance.score(
 960                    trace_id=trace_id,
 961                    observation_id=observation_id,
 962                    name=name,
 963                    value=value,
 964                    data_type=data_type,
 965                    comment=comment,
 966                    id=id,
 967                    config_id=config_id,
 968                )
 969            else:
 970                raise ValueError("No trace or observation found in the current context")
 971
 972        except Exception as e:
 973            self._log.error(f"Failed to score observation: {e}")
 974
 975    def score_current_trace(
 976        self,
 977        *,
 978        name: str,
 979        value: Union[float, str],
 980        data_type: Optional[ScoreDataType] = None,
 981        comment: Optional[str] = None,
 982        id: Optional[str] = None,
 983        config_id: Optional[str] = None,
 984    ):
 985        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 986
 987        Arguments:
 988            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 989            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 990            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 991              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 992            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 993            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 994            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 995
 996        Returns:
 997            None
 998
 999        Note:
1000            This method is intended to be used within the context of an active trace or observation.
1001        """
1002        try:
1003            trace_id = self.get_current_trace_id()
1004
1005            if trace_id:
1006                self.client_instance.score(
1007                    trace_id=trace_id,
1008                    name=name,
1009                    value=value,
1010                    data_type=data_type,
1011                    comment=comment,
1012                    id=id,
1013                    config_id=config_id,
1014                )
1015            else:
1016                raise ValueError("No trace found in the current context")
1017
1018        except Exception as e:
1019            self._log.error(f"Failed to score observation: {e}")
1020
1021    @catch_and_log_errors
1022    def flush(self):
1023        """Force immediate flush of all buffered observations to the Langfuse backend.
1024
1025        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1026        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1027
1028        Usage:
1029            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1030            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1031
1032        Returns:
1033            None
1034
1035        Raises:
1036            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1037
1038        Note:
1039            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1040            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1041            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1042        """
1043        if self.client_instance:
1044            self.client_instance.flush()
1045        else:
1046            self._log.warning("No langfuse object found in the current context")
1047
1048    def configure(
1049        self,
1050        *,
1051        public_key: Optional[str] = None,
1052        secret_key: Optional[str] = None,
1053        host: Optional[str] = None,
1054        release: Optional[str] = None,
1055        debug: Optional[bool] = None,
1056        threads: Optional[int] = None,
1057        flush_at: Optional[int] = None,
1058        flush_interval: Optional[int] = None,
1059        max_retries: Optional[int] = None,
1060        timeout: Optional[int] = None,
1061        httpx_client: Optional[httpx.Client] = None,
1062        enabled: Optional[bool] = None,
1063        mask: Optional[Callable] = None,
1064        environment: Optional[str] = None,
1065    ):
1066        """Configure the Langfuse client.
1067
1068        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1069
1070        Args:
1071            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1072            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1073            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1074            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1075            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1076            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1077            flush_at: Max batch size that's sent to the API.
1078            flush_interval: Max delay until a new batch is sent to the API.
1079            max_retries: Max number of retries in case of API/network errors.
1080            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1081            httpx_client: Pass your own httpx client for more customizability of requests.
1082            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1083            mask (Callable): Function that masks sensitive information from input and output in log messages.
1084            environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
1085        """
1086        langfuse_singleton = LangfuseSingleton()
1087        langfuse_singleton.reset()
1088
1089        langfuse_singleton.get(
1090            public_key=public_key,
1091            secret_key=secret_key,
1092            host=host,
1093            release=release,
1094            debug=debug,
1095            threads=threads,
1096            flush_at=flush_at,
1097            flush_interval=flush_interval,
1098            max_retries=max_retries,
1099            timeout=timeout,
1100            httpx_client=httpx_client,
1101            enabled=enabled,
1102            mask=mask,
1103            environment=environment,
1104        )
1105
1106    @property
1107    def client_instance(self) -> Langfuse:
1108        """Get the Langfuse client instance for the current decorator context."""
1109        return LangfuseSingleton().get()
1110
1111    def _set_root_trace_id(self, trace_id: str):
1112        if _observation_stack_context.get():
1113            self._log.warning(
1114                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
1115            )
1116            return
1117
1118        _root_trace_id_context.set(trace_id)
1119
1120    def _pop_observation_params_from_context(
1121        self, observation_id: str
1122    ) -> ObservationParams:
1123        params = _observation_params_context.get()[observation_id].copy()
1124
1125        # Remove observation params to avoid leaking
1126        del _observation_params_context.get()[observation_id]
1127
1128        return params
1129
1130    def auth_check(self) -> bool:
1131        """Check if the current Langfuse client is authenticated.
1132
1133        Returns:
1134            bool: True if the client is authenticated, False otherwise
1135        """
1136        try:
1137            return self.client_instance.auth_check()
1138        except Exception as e:
1139            self._log.error(
1140                "No Langfuse object found in the current context", exc_info=e
1141            )
1142
1143            return False
def observe( self, func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
117    def observe(
118        self,
119        func: Optional[Callable[P, R]] = None,
120        *,
121        name: Optional[str] = None,
122        as_type: Optional[Literal["generation"]] = None,
123        capture_input: bool = True,
124        capture_output: bool = True,
125        transform_to_string: Optional[Callable[[Iterable], str]] = None,
126    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
127        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
128
129        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
130        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
131
132        Attributes:
133            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
134            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
135            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
136            capture_output (bool): If True, captures the return value of the function as output. Default is True.
137            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
138
139        Returns:
140            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
141
142        Example:
143            For general tracing (functions/methods):
144            ```python
145            @observe()
146            def your_function(args):
147                # Your implementation here
148            ```
149            For observing language model generations:
150            ```python
151            @observe(as_type="generation")
152            def your_LLM_function(args):
153                # Your LLM invocation here
154            ```
155
156        Raises:
157            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
158
159        Note:
160        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
161        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
162        """
163
164        def decorator(func: Callable[P, R]) -> Callable[P, R]:
165            return (
166                self._async_observe(
167                    func,
168                    name=name,
169                    as_type=as_type,
170                    capture_input=capture_input,
171                    capture_output=capture_output,
172                    transform_to_string=transform_to_string,
173                )
174                if asyncio.iscoroutinefunction(func)
175                else self._sync_observe(
176                    func,
177                    name=name,
178                    as_type=as_type,
179                    capture_input=capture_input,
180                    capture_output=capture_output,
181                    transform_to_string=transform_to_string,
182                )
183            )
184
185        """
186        If the decorator is called without arguments, return the decorator function itself. 
187        This allows the decorator to be used with or without arguments. 
188        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
189        """
190        if func is None:
191            return decorator
192        else:
193            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
587    def get_current_llama_index_handler(self):
588        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
589
590        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
591        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
592
593        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
594
595        Returns:
596            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
597
598        Note:
599            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
600            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
601        """
602        try:
603            from langfuse.llama_index import LlamaIndexCallbackHandler
604        except ImportError:
605            self._log.error(
606                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
607            )
608
609            return None
610
611        stack = _observation_stack_context.get()
612        observation = stack[-1] if stack else None
613
614        if observation is None:
615            self._log.warning("No observation found in the current context")
616
617            return None
618
619        if isinstance(observation, StatefulGenerationClient):
620            self._log.warning(
621                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
622            )
623
624            return None
625
626        callback_handler = LlamaIndexCallbackHandler()
627        callback_handler.set_root(observation)
628
629        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
631    def get_current_langchain_handler(self):
632        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
633
634        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
635        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
636
637        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
638
639        Returns:
640            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
641
642        Note:
643            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
644            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
645        """
646        stack = _observation_stack_context.get()
647        observation = stack[-1] if stack else None
648
649        if observation is None:
650            self._log.warning("No observation found in the current context")
651
652            return None
653
654        if isinstance(observation, StatefulGenerationClient):
655            self._log.warning(
656                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
657            )
658
659            return None
660
661        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
663    def get_current_trace_id(self):
664        """Retrieve the ID of the current trace from the observation stack context.
665
666        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
667        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
668        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
669
670        Returns:
671            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
672            possibly due to the method being called outside of any @observe-decorated function execution.
673
674        Note:
675            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
676            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
677        """
678        context_trace_id = _root_trace_id_context.get()
679        if context_trace_id:
680            return context_trace_id
681
682        stack = _observation_stack_context.get()
683
684        if not stack:
685            return None
686
687        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
689    def get_current_trace_url(self) -> Optional[str]:
690        """Retrieve the URL of the current trace in context.
691
692        Returns:
693            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
694            possibly due to the method being called outside of any @observe-decorated function execution.
695
696        Note:
697            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
698            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
699        """
700        try:
701            trace_id = self.get_current_trace_id()
702
703            if not trace_id:
704                raise ValueError("No trace found in the current context")
705
706            project_id = self.client_instance._get_project_id()
707
708            if not project_id:
709                return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
710
711            return f"{self.client_instance.client._client_wrapper._base_url}/project/{project_id}/traces/{trace_id}"
712
713        except Exception as e:
714            self._log.error(f"Failed to get current trace URL: {e}")
715
716            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
718    def get_current_observation_id(self):
719        """Retrieve the ID of the current observation in context.
720
721        Returns:
722            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
723            possibly due to the method being called outside of any @observe-decorated function execution.
724
725        Note:
726            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
727            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
728            - If called at the top level of a trace, it will return the trace ID.
729        """
730        stack = _observation_stack_context.get()
731
732        if not stack:
733            return None
734
735        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, input: Optional[Any] = None, output: Optional[Any] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
737    def update_current_trace(
738        self,
739        name: Optional[str] = None,
740        input: Optional[Any] = None,
741        output: Optional[Any] = None,
742        user_id: Optional[str] = None,
743        session_id: Optional[str] = None,
744        version: Optional[str] = None,
745        release: Optional[str] = None,
746        metadata: Optional[Any] = None,
747        tags: Optional[List[str]] = None,
748        public: Optional[bool] = None,
749    ):
750        """Set parameters for the current trace, updating the trace's metadata and context information.
751
752        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
753        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
754        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
755
756        Arguments:
757            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
758            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
759            output (Optional[Any]): The output or result of the trace
760            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
761            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
762            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
763            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
764            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
765            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
766
767        Returns:
768            None
769
770        Note:
771            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
772            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
773            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
774        """
775        trace_id = self.get_current_trace_id()
776
777        if trace_id is None:
778            self._log.warning("No trace found in the current context")
779
780            return
781
782        params_to_update = {
783            k: v
784            for k, v in {
785                "name": name,
786                "input": input,
787                "output": output,
788                "user_id": user_id,
789                "session_id": session_id,
790                "version": version,
791                "release": release,
792                "metadata": metadata,
793                "tags": tags,
794                "public": public,
795            }.items()
796            if v is not None
797        }
798
799        # metadata and tags are merged server side. Send separate update event to avoid merging them SDK side
800        server_merged_attributes = ["metadata", "tags"]
801        if any(attribute in params_to_update for attribute in server_merged_attributes):
802            self.client_instance.trace(
803                id=trace_id,
804                **{
805                    k: v
806                    for k, v in params_to_update.items()
807                    if k in server_merged_attributes
808                },
809            )
810
811        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, usage_details: Union[Dict[str, int], langfuse.api.OpenAiCompletionUsageSchema, langfuse.api.OpenAiResponseUsageSchema, NoneType] = None, cost_details: Optional[Dict[str, float]] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
813    def update_current_observation(
814        self,
815        *,
816        input: Optional[Any] = None,
817        output: Optional[Any] = None,
818        name: Optional[str] = None,
819        version: Optional[str] = None,
820        metadata: Optional[Any] = None,
821        start_time: Optional[datetime] = None,
822        end_time: Optional[datetime] = None,
823        release: Optional[str] = None,
824        tags: Optional[List[str]] = None,
825        user_id: Optional[str] = None,
826        session_id: Optional[str] = None,
827        level: Optional[SpanLevel] = None,
828        status_message: Optional[str] = None,
829        completion_start_time: Optional[datetime] = None,
830        model: Optional[str] = None,
831        model_parameters: Optional[Dict[str, MapValue]] = None,
832        usage: Optional[Union[BaseModel, ModelUsage]] = None,
833        usage_details: Optional[UsageDetails] = None,
834        cost_details: Optional[Dict[str, float]] = None,
835        prompt: Optional[PromptClient] = None,
836        public: Optional[bool] = None,
837    ):
838        """Update parameters for the current observation within an active trace context.
839
840        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
841        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
842        enhancing the observability and traceability of the execution context.
843
844        Note that if a param is not available on a specific observation type, it will be ignored.
845
846        Shared params:
847            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
848            - `output` (Optional[Any]): The output or result of the trace or observation
849            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
850            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
851            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
852            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
853            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
854
855        Trace-specific params:
856            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
857            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
858            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
859            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
860            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
861
862        Span-specific params:
863            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
864            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
865
866        Generation-specific params:
867            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
868            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
869            - `usage` (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use `usage_details` and `cost_details` instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
870            - `usage_details` (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed.
871            - `cost_details` (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation.
872            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
873
874        Returns:
875            None
876
877        Raises:
878            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
879
880        Note:
881            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
882            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
883            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
884        """
885        stack = _observation_stack_context.get()
886        observation = stack[-1] if stack else None
887
888        if not observation:
889            self._log.warning("No observation found in the current context")
890
891            return
892
893        update_params = {
894            k: v
895            for k, v in {
896                "input": input,
897                "output": output,
898                "name": name,
899                "version": version,
900                "metadata": metadata,
901                "start_time": start_time,
902                "end_time": end_time,
903                "release": release,
904                "tags": tags,
905                "user_id": user_id,
906                "session_id": session_id,
907                "level": level,
908                "status_message": status_message,
909                "completion_start_time": completion_start_time,
910                "model": model,
911                "model_parameters": model_parameters,
912                "usage": usage,
913                "usage_details": usage_details,
914                "cost_details": cost_details,
915                "prompt": prompt,
916                "public": public,
917            }.items()
918            if v is not None
919        }
920
921        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): (Deprecated. Use usage_details and cost_details instead.) The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - usage_details (Optional[Dict[str, int]]): The usage details of the observation. Reflects the number of units consumed per usage type. All keys must sum up to the total key value. The total key holds the total number of units consumed. - cost_details (Optional[Dict[str, float]]): The cost details of the observation. Reflects the USD cost of the observation per cost type. All keys must sum up to the total key value. The total key holds the total cost of the observation. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
923    def score_current_observation(
924        self,
925        *,
926        name: str,
927        value: Union[float, str],
928        data_type: Optional[ScoreDataType] = None,
929        comment: Optional[str] = None,
930        id: Optional[str] = None,
931        config_id: Optional[str] = None,
932    ):
933        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
934
935        Arguments:
936            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
937            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
938            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
939              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
940            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
941            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
942            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
943
944        Returns:
945            None
946
947        Note:
948            This method is intended to be used within the context of an active trace or observation.
949        """
950        try:
951            trace_id = self.get_current_trace_id()
952            current_observation_id = self.get_current_observation_id()
953
954            observation_id = (
955                current_observation_id if current_observation_id != trace_id else None
956            )
957
958            if trace_id:
959                self.client_instance.score(
960                    trace_id=trace_id,
961                    observation_id=observation_id,
962                    name=name,
963                    value=value,
964                    data_type=data_type,
965                    comment=comment,
966                    id=id,
967                    config_id=config_id,
968                )
969            else:
970                raise ValueError("No trace or observation found in the current context")
971
972        except Exception as e:
973            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
 975    def score_current_trace(
 976        self,
 977        *,
 978        name: str,
 979        value: Union[float, str],
 980        data_type: Optional[ScoreDataType] = None,
 981        comment: Optional[str] = None,
 982        id: Optional[str] = None,
 983        config_id: Optional[str] = None,
 984    ):
 985        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 986
 987        Arguments:
 988            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 989            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 990            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 991              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 992            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 993            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 994            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 995
 996        Returns:
 997            None
 998
 999        Note:
1000            This method is intended to be used within the context of an active trace or observation.
1001        """
1002        try:
1003            trace_id = self.get_current_trace_id()
1004
1005            if trace_id:
1006                self.client_instance.score(
1007                    trace_id=trace_id,
1008                    name=name,
1009                    value=value,
1010                    data_type=data_type,
1011                    comment=comment,
1012                    id=id,
1013                    config_id=config_id,
1014                )
1015            else:
1016                raise ValueError("No trace found in the current context")
1017
1018        except Exception as e:
1019            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
1021    @catch_and_log_errors
1022    def flush(self):
1023        """Force immediate flush of all buffered observations to the Langfuse backend.
1024
1025        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1026        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1027
1028        Usage:
1029            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1030            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1031
1032        Returns:
1033            None
1034
1035        Raises:
1036            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1037
1038        Note:
1039            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1040            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1041            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1042        """
1043        if self.client_instance:
1044            self.client_instance.flush()
1045        else:
1046            self._log.warning("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, mask: Optional[Callable] = None, environment: Optional[str] = None):
1048    def configure(
1049        self,
1050        *,
1051        public_key: Optional[str] = None,
1052        secret_key: Optional[str] = None,
1053        host: Optional[str] = None,
1054        release: Optional[str] = None,
1055        debug: Optional[bool] = None,
1056        threads: Optional[int] = None,
1057        flush_at: Optional[int] = None,
1058        flush_interval: Optional[int] = None,
1059        max_retries: Optional[int] = None,
1060        timeout: Optional[int] = None,
1061        httpx_client: Optional[httpx.Client] = None,
1062        enabled: Optional[bool] = None,
1063        mask: Optional[Callable] = None,
1064        environment: Optional[str] = None,
1065    ):
1066        """Configure the Langfuse client.
1067
1068        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1069
1070        Args:
1071            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1072            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1073            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1074            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1075            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1076            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1077            flush_at: Max batch size that's sent to the API.
1078            flush_interval: Max delay until a new batch is sent to the API.
1079            max_retries: Max number of retries in case of API/network errors.
1080            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1081            httpx_client: Pass your own httpx client for more customizability of requests.
1082            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1083            mask (Callable): Function that masks sensitive information from input and output in log messages.
1084            environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
1085        """
1086        langfuse_singleton = LangfuseSingleton()
1087        langfuse_singleton.reset()
1088
1089        langfuse_singleton.get(
1090            public_key=public_key,
1091            secret_key=secret_key,
1092            host=host,
1093            release=release,
1094            debug=debug,
1095            threads=threads,
1096            flush_at=flush_at,
1097            flush_interval=flush_interval,
1098            max_retries=max_retries,
1099            timeout=timeout,
1100            httpx_client=httpx_client,
1101            enabled=enabled,
1102            mask=mask,
1103            environment=environment,
1104        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
  • mask (Callable): Function that masks sensitive information from input and output in log messages.
  • environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via LANGFUSE_TRACING_ENVIRONMENT environment variable.
client_instance: langfuse.client.Langfuse
1106    @property
1107    def client_instance(self) -> Langfuse:
1108        """Get the Langfuse client instance for the current decorator context."""
1109        return LangfuseSingleton().get()

Get the Langfuse client instance for the current decorator context.

def auth_check(self) -> bool:
1130    def auth_check(self) -> bool:
1131        """Check if the current Langfuse client is authenticated.
1132
1133        Returns:
1134            bool: True if the client is authenticated, False otherwise
1135        """
1136        try:
1137            return self.client_instance.auth_check()
1138        except Exception as e:
1139            self._log.error(
1140                "No Langfuse object found in the current context", exc_info=e
1141            )
1142
1143            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise