langfuse.decorators

Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the @observe() decorator.

Simple example (decorator + openai integration)

from langfuse.decorators import observe
from langfuse.openai import openai # OpenAI integration

@observe()
def story():
    return openai.chat.completions.create(
        model="gpt-3.5-turbo",
        max_tokens=100,
        messages=[
          {"role": "system", "content": "You are a great storyteller."},
          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
        ],
    ).choices[0].message.content

@observe()
def main():
    return story()

main()

See docs for more information.

 1"""Integrate Langfuse Tracing into your LLM applications with the Langfuse Python SDK using the `@observe()` decorator.
 2
 3*Simple example (decorator + openai integration)*
 4
 5```python
 6from langfuse.decorators import observe
 7from langfuse.openai import openai # OpenAI integration
 8
 9@observe()
10def story():
11    return openai.chat.completions.create(
12        model="gpt-3.5-turbo",
13        max_tokens=100,
14        messages=[
15          {"role": "system", "content": "You are a great storyteller."},
16          {"role": "user", "content": "Once upon a time in a galaxy far, far away..."}
17        ],
18    ).choices[0].message.content
19
20@observe()
21def main():
22    return story()
23
24main()
25```
26
27See [docs](https://langfuse.com/docs/sdk/python/decorators) for more information.
28"""
29
30from .langfuse_decorator import langfuse_context, observe, LangfuseDecorator
31
32__all__ = ["langfuse_context", "observe", "LangfuseDecorator"]
langfuse_context = <LangfuseDecorator object>
def observe( func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
115    def observe(
116        self,
117        func: Optional[Callable[P, R]] = None,
118        *,
119        name: Optional[str] = None,
120        as_type: Optional[Literal["generation"]] = None,
121        capture_input: bool = True,
122        capture_output: bool = True,
123        transform_to_string: Optional[Callable[[Iterable], str]] = None,
124    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
125        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
126
127        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
128        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
129
130        Attributes:
131            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
132            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
133            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
134            capture_output (bool): If True, captures the return value of the function as output. Default is True.
135            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
136
137        Returns:
138            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
139
140        Example:
141            For general tracing (functions/methods):
142            ```python
143            @observe()
144            def your_function(args):
145                # Your implementation here
146            ```
147            For observing language model generations:
148            ```python
149            @observe(as_type="generation")
150            def your_LLM_function(args):
151                # Your LLM invocation here
152            ```
153
154        Raises:
155            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
156
157        Note:
158        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
159        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
160        """
161
162        def decorator(func: Callable[P, R]) -> Callable[P, R]:
163            return (
164                self._async_observe(
165                    func,
166                    name=name,
167                    as_type=as_type,
168                    capture_input=capture_input,
169                    capture_output=capture_output,
170                    transform_to_string=transform_to_string,
171                )
172                if asyncio.iscoroutinefunction(func)
173                else self._sync_observe(
174                    func,
175                    name=name,
176                    as_type=as_type,
177                    capture_input=capture_input,
178                    capture_output=capture_output,
179                    transform_to_string=transform_to_string,
180                )
181            )
182
183        """
184        If the decorator is called without arguments, return the decorator function itself. 
185        This allows the decorator to be used with or without arguments. 
186        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
187        """
188        if func is None:
189            return decorator
190        else:
191            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
class LangfuseDecorator:
  94class LangfuseDecorator:
  95    _log = logging.getLogger("langfuse")
  96
  97    # Type overload for observe decorator with no arguments
  98    @overload
  99    def observe(self, func: F) -> F: ...
 100
 101    # Type overload for observe decorator with arguments
 102    @overload
 103    def observe(
 104        self,
 105        func: None = None,
 106        *,
 107        name: Optional[str] = None,
 108        as_type: Optional[Literal["generation"]] = None,
 109        capture_input: bool = True,
 110        capture_output: bool = True,
 111        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 112    ) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
 113
 114    # Implementation of observe decorator
 115    def observe(
 116        self,
 117        func: Optional[Callable[P, R]] = None,
 118        *,
 119        name: Optional[str] = None,
 120        as_type: Optional[Literal["generation"]] = None,
 121        capture_input: bool = True,
 122        capture_output: bool = True,
 123        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 124    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
 125        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
 126
 127        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
 128        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
 129
 130        Attributes:
 131            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
 132            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
 133            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
 134            capture_output (bool): If True, captures the return value of the function as output. Default is True.
 135            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
 136
 137        Returns:
 138            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
 139
 140        Example:
 141            For general tracing (functions/methods):
 142            ```python
 143            @observe()
 144            def your_function(args):
 145                # Your implementation here
 146            ```
 147            For observing language model generations:
 148            ```python
 149            @observe(as_type="generation")
 150            def your_LLM_function(args):
 151                # Your LLM invocation here
 152            ```
 153
 154        Raises:
 155            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
 156
 157        Note:
 158        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
 159        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
 160        """
 161
 162        def decorator(func: Callable[P, R]) -> Callable[P, R]:
 163            return (
 164                self._async_observe(
 165                    func,
 166                    name=name,
 167                    as_type=as_type,
 168                    capture_input=capture_input,
 169                    capture_output=capture_output,
 170                    transform_to_string=transform_to_string,
 171                )
 172                if asyncio.iscoroutinefunction(func)
 173                else self._sync_observe(
 174                    func,
 175                    name=name,
 176                    as_type=as_type,
 177                    capture_input=capture_input,
 178                    capture_output=capture_output,
 179                    transform_to_string=transform_to_string,
 180                )
 181            )
 182
 183        """
 184        If the decorator is called without arguments, return the decorator function itself. 
 185        This allows the decorator to be used with or without arguments. 
 186        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
 187        """
 188        if func is None:
 189            return decorator
 190        else:
 191            return decorator(func)
 192
 193    def _async_observe(
 194        self,
 195        func: F,
 196        *,
 197        name: Optional[str],
 198        as_type: Optional[Literal["generation"]],
 199        capture_input: bool,
 200        capture_output: bool,
 201        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 202    ) -> F:
 203        @wraps(func)
 204        async def async_wrapper(*args, **kwargs):
 205            observation = self._prepare_call(
 206                name=name or func.__name__,
 207                as_type=as_type,
 208                capture_input=capture_input,
 209                is_method=self._is_method(func),
 210                func_args=args,
 211                func_kwargs=kwargs,
 212            )
 213            result = None
 214
 215            try:
 216                result = await func(*args, **kwargs)
 217            except Exception as e:
 218                self._handle_exception(observation, e)
 219            finally:
 220                result = self._finalize_call(
 221                    observation, result, capture_output, transform_to_string
 222                )
 223
 224                # Returning from finally block may swallow errors, so only return if result is not None
 225                if result is not None:
 226                    return result
 227
 228        return cast(F, async_wrapper)
 229
 230    def _sync_observe(
 231        self,
 232        func: F,
 233        *,
 234        name: Optional[str],
 235        as_type: Optional[Literal["generation"]],
 236        capture_input: bool,
 237        capture_output: bool,
 238        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 239    ) -> F:
 240        @wraps(func)
 241        def sync_wrapper(*args, **kwargs):
 242            observation = self._prepare_call(
 243                name=name or func.__name__,
 244                as_type=as_type,
 245                capture_input=capture_input,
 246                is_method=self._is_method(func),
 247                func_args=args,
 248                func_kwargs=kwargs,
 249            )
 250            result = None
 251
 252            try:
 253                result = func(*args, **kwargs)
 254            except Exception as e:
 255                self._handle_exception(observation, e)
 256            finally:
 257                result = self._finalize_call(
 258                    observation, result, capture_output, transform_to_string
 259                )
 260
 261                # Returning from finally block may swallow errors, so only return if result is not None
 262                if result is not None:
 263                    return result
 264
 265        return cast(F, sync_wrapper)
 266
 267    @staticmethod
 268    def _is_method(func: Callable) -> bool:
 269        """Check if a callable is likely an class or instance method based on its signature.
 270
 271        This method inspects the given callable's signature for the presence of a 'cls' or 'self' parameter, which is conventionally used for class and instance methods in Python classes. It returns True if 'class' or 'self' is found among the parameters, suggesting the callable is a method.
 272
 273        Note: This method relies on naming conventions and may not accurately identify instance methods if unconventional parameter names are used or if static or class methods incorrectly include a 'self' or 'cls' parameter. Additionally, during decorator execution, inspect.ismethod does not work as expected because the function has not yet been bound to an instance; it is still a function, not a method. This check attempts to infer method status based on signature, which can be useful in decorator contexts where traditional method identification techniques fail.
 274
 275        Returns:
 276        bool: True if 'cls' or 'self' is in the callable's parameters, False otherwise.
 277        """
 278        return (
 279            "self" in inspect.signature(func).parameters
 280            or "cls" in inspect.signature(func).parameters
 281        )
 282
 283    def _prepare_call(
 284        self,
 285        *,
 286        name: str,
 287        as_type: Optional[Literal["generation"]],
 288        capture_input: bool,
 289        is_method: bool = False,
 290        func_args: Tuple = (),
 291        func_kwargs: Dict = {},
 292    ) -> Optional[
 293        Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 294    ]:
 295        try:
 296            stack = _observation_stack_context.get().copy()
 297            parent = stack[-1] if stack else None
 298
 299            # Collect default observation data
 300            observation_id = func_kwargs.pop("langfuse_observation_id", None)
 301            provided_parent_trace_id = func_kwargs.pop("langfuse_parent_trace_id", None)
 302            provided_parent_observation_id = func_kwargs.pop(
 303                "langfuse_parent_observation_id", None
 304            )
 305
 306            id = str(observation_id) if observation_id else None
 307            start_time = _get_timestamp()
 308
 309            input = (
 310                self._get_input_from_func_args(
 311                    is_method=is_method,
 312                    func_args=func_args,
 313                    func_kwargs=func_kwargs,
 314                )
 315                if capture_input
 316                else None
 317            )
 318
 319            params = {
 320                "id": id,
 321                "name": name,
 322                "start_time": start_time,
 323                "input": input,
 324            }
 325
 326            # Handle user-providedparent trace ID and observation ID
 327            if parent and (provided_parent_trace_id or provided_parent_observation_id):
 328                self._log.warning(
 329                    "Ignoring langfuse_parent_trace_id and/or langfuse_parent_observation_id as they can be only set in the top-level decorated function."
 330                )
 331
 332            elif provided_parent_observation_id and not provided_parent_trace_id:
 333                self._log.warning(
 334                    "Ignoring langfuse_parent_observation_id as langfuse_parent_trace_id is not set."
 335                )
 336
 337            elif provided_parent_observation_id and (
 338                provided_parent_observation_id != provided_parent_trace_id
 339            ):
 340                parent = StatefulSpanClient(
 341                    id=provided_parent_observation_id,
 342                    trace_id=provided_parent_trace_id,
 343                    task_manager=self.client_instance.task_manager,
 344                    client=self.client_instance.client,
 345                    state_type=StateType.OBSERVATION,
 346                )
 347                self._set_root_trace_id(provided_parent_trace_id)
 348
 349            elif provided_parent_trace_id:
 350                parent = StatefulTraceClient(
 351                    id=provided_parent_trace_id,
 352                    trace_id=provided_parent_trace_id,
 353                    task_manager=self.client_instance.task_manager,
 354                    client=self.client_instance.client,
 355                    state_type=StateType.TRACE,
 356                )
 357                self._set_root_trace_id(provided_parent_trace_id)
 358
 359            # Create observation
 360            if parent and as_type == "generation":
 361                observation = parent.generation(**params)
 362            elif as_type == "generation":
 363                # Create wrapper trace if generation is top-level
 364                # Do not add wrapper trace to stack, as it does not have a corresponding end that will pop it off again
 365                trace = self.client_instance.trace(
 366                    id=id, name=name, start_time=start_time
 367                )
 368                self._set_root_trace_id(trace.id)
 369
 370                observation = self.client_instance.generation(
 371                    name=name, start_time=start_time, input=input, trace_id=trace.id
 372                )
 373            elif parent:
 374                observation = parent.span(**params)
 375            else:
 376                params["id"] = _root_trace_id_context.get() or params["id"]
 377                observation = self.client_instance.trace(**params)
 378
 379            _observation_stack_context.set(stack + [observation])
 380
 381            return observation
 382        except Exception as e:
 383            self._log.error(f"Failed to prepare observation: {e}")
 384
 385    def _get_input_from_func_args(
 386        self,
 387        *,
 388        is_method: bool = False,
 389        func_args: Tuple = (),
 390        func_kwargs: Dict = {},
 391    ) -> Any:
 392        # Remove implicitly passed "self" or "cls" argument for instance or class methods
 393        logged_args = func_args[1:] if is_method else func_args
 394        raw_input = {
 395            "args": logged_args,
 396            "kwargs": func_kwargs,
 397        }
 398
 399        # Serialize and deserialize to ensure proper JSON serialization.
 400        # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 401        return json.loads(json.dumps(raw_input, cls=EventSerializer))
 402
 403    def _finalize_call(
 404        self,
 405        observation: Optional[
 406            Union[
 407                StatefulSpanClient,
 408                StatefulTraceClient,
 409                StatefulGenerationClient,
 410            ]
 411        ],
 412        result: Any,
 413        capture_output: bool,
 414        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 415    ):
 416        if inspect.isgenerator(result):
 417            return self._wrap_sync_generator_result(
 418                observation, result, capture_output, transform_to_string
 419            )
 420        elif inspect.isasyncgen(result):
 421            return self._wrap_async_generator_result(
 422                observation, result, capture_output, transform_to_string
 423            )
 424
 425        else:
 426            return self._handle_call_result(observation, result, capture_output)
 427
 428    def _handle_call_result(
 429        self,
 430        observation: Optional[
 431            Union[
 432                StatefulSpanClient,
 433                StatefulTraceClient,
 434                StatefulGenerationClient,
 435            ]
 436        ],
 437        result: Any,
 438        capture_output: bool,
 439    ):
 440        try:
 441            if observation is None:
 442                raise ValueError("No observation found in the current context")
 443
 444            # Collect final observation data
 445            observation_params = self._pop_observation_params_from_context(
 446                observation.id
 447            )
 448
 449            end_time = observation_params["end_time"] or _get_timestamp()
 450
 451            output = observation_params["output"] or (
 452                # Serialize and deserialize to ensure proper JSON serialization.
 453                # Objects are later serialized again so deserialization is necessary here to avoid unnecessary escaping of quotes.
 454                json.loads(
 455                    json.dumps(
 456                        result if result and capture_output else None,
 457                        cls=EventSerializer,
 458                    )
 459                )
 460            )
 461
 462            observation_params.update(end_time=end_time, output=output)
 463
 464            if isinstance(observation, (StatefulSpanClient, StatefulGenerationClient)):
 465                observation.end(**observation_params)
 466            elif isinstance(observation, StatefulTraceClient):
 467                observation.update(**observation_params)
 468
 469            # Remove observation from top of stack
 470            stack = _observation_stack_context.get()
 471            _observation_stack_context.set(stack[:-1])
 472
 473            # Update trace that was provided directly and not part of the observation stack
 474            if not _observation_stack_context.get() and (
 475                provided_trace_id := _root_trace_id_context.get()
 476            ):
 477                observation_params = self._pop_observation_params_from_context(
 478                    provided_trace_id
 479                )
 480
 481                has_updates = any(observation_params.values())
 482
 483                if has_updates:
 484                    trace_client = StatefulTraceClient(
 485                        id=provided_trace_id,
 486                        trace_id=provided_trace_id,
 487                        task_manager=self.client_instance.task_manager,
 488                        client=self.client_instance.client,
 489                        state_type=StateType.TRACE,
 490                    )
 491                    trace_client.update(**observation_params)
 492
 493        except Exception as e:
 494            self._log.error(f"Failed to finalize observation: {e}")
 495
 496        finally:
 497            # Clear the context trace ID to avoid leaking to next execution
 498            if not _observation_stack_context.get():
 499                _root_trace_id_context.set(None)
 500
 501            return result
 502
 503    def _handle_exception(
 504        self,
 505        observation: Optional[
 506            Union[StatefulSpanClient, StatefulTraceClient, StatefulGenerationClient]
 507        ],
 508        e: Exception,
 509    ):
 510        if observation:
 511            _observation_params_context.get()[observation.id].update(
 512                level="ERROR", status_message=str(e)
 513            )
 514        raise e
 515
 516    def _wrap_sync_generator_result(
 517        self,
 518        observation: Optional[
 519            Union[
 520                StatefulSpanClient,
 521                StatefulTraceClient,
 522                StatefulGenerationClient,
 523            ]
 524        ],
 525        generator: Generator,
 526        capture_output: bool,
 527        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 528    ):
 529        items = []
 530
 531        try:
 532            for item in generator:
 533                items.append(item)
 534
 535                yield item
 536
 537        finally:
 538            output = items
 539
 540            if transform_to_string is not None:
 541                output = transform_to_string(items)
 542
 543            elif all(isinstance(item, str) for item in items):
 544                output = "".join(items)
 545
 546            self._handle_call_result(observation, output, capture_output)
 547
 548    async def _wrap_async_generator_result(
 549        self,
 550        observation: Optional[
 551            Union[
 552                StatefulSpanClient,
 553                StatefulTraceClient,
 554                StatefulGenerationClient,
 555            ]
 556        ],
 557        generator: AsyncGenerator,
 558        capture_output: bool,
 559        transform_to_string: Optional[Callable[[Iterable], str]] = None,
 560    ) -> AsyncGenerator:
 561        items = []
 562
 563        try:
 564            async for item in generator:
 565                items.append(item)
 566
 567                yield item
 568
 569        finally:
 570            output = items
 571
 572            if transform_to_string is not None:
 573                output = transform_to_string(items)
 574
 575            elif all(isinstance(item, str) for item in items):
 576                output = "".join(items)
 577
 578            self._handle_call_result(observation, output, capture_output)
 579
 580    def get_current_llama_index_handler(self):
 581        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
 582
 583        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
 584        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
 585
 586        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
 587
 588        Returns:
 589            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 590
 591        Note:
 592            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 593            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 594        """
 595        try:
 596            from langfuse.llama_index import LlamaIndexCallbackHandler
 597        except ImportError:
 598            self._log.error(
 599                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
 600            )
 601
 602            return None
 603
 604        stack = _observation_stack_context.get()
 605        observation = stack[-1] if stack else None
 606
 607        if observation is None:
 608            self._log.warning("No observation found in the current context")
 609
 610            return None
 611
 612        if isinstance(observation, StatefulGenerationClient):
 613            self._log.warning(
 614                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
 615            )
 616
 617            return None
 618
 619        callback_handler = LlamaIndexCallbackHandler()
 620        callback_handler.set_root(observation)
 621
 622        return callback_handler
 623
 624    def get_current_langchain_handler(self):
 625        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
 626
 627        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
 628        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
 629
 630        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
 631
 632        Returns:
 633            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
 634
 635        Note:
 636            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
 637            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
 638        """
 639        stack = _observation_stack_context.get()
 640        observation = stack[-1] if stack else None
 641
 642        if observation is None:
 643            self._log.warning("No observation found in the current context")
 644
 645            return None
 646
 647        if isinstance(observation, StatefulGenerationClient):
 648            self._log.warning(
 649                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
 650            )
 651
 652            return None
 653
 654        return observation.get_langchain_handler()
 655
 656    def get_current_trace_id(self):
 657        """Retrieve the ID of the current trace from the observation stack context.
 658
 659        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
 660        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
 661        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
 662
 663        Returns:
 664            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 665            possibly due to the method being called outside of any @observe-decorated function execution.
 666
 667        Note:
 668            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 669            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 670        """
 671        context_trace_id = _root_trace_id_context.get()
 672        if context_trace_id:
 673            return context_trace_id
 674
 675        stack = _observation_stack_context.get()
 676        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 677
 678        if not stack:
 679            if should_log_warning:
 680                self._log.warning("No trace found in the current context")
 681
 682            return None
 683
 684        return stack[0].id
 685
 686    def _get_caller_module_name(self):
 687        try:
 688            caller_module = inspect.getmodule(inspect.stack()[2][0])
 689        except Exception as e:
 690            self._log.warning(f"Failed to get caller module: {e}")
 691
 692            return None
 693
 694        return caller_module.__name__ if caller_module else None
 695
 696    def get_current_trace_url(self) -> Optional[str]:
 697        """Retrieve the URL of the current trace in context.
 698
 699        Returns:
 700            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
 701            possibly due to the method being called outside of any @observe-decorated function execution.
 702
 703        Note:
 704            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
 705            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 706        """
 707        try:
 708            trace_id = self.get_current_trace_id()
 709
 710            if not trace_id:
 711                raise ValueError("No trace found in the current context")
 712
 713            return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
 714
 715        except Exception as e:
 716            self._log.error(f"Failed to get current trace URL: {e}")
 717
 718            return None
 719
 720    def get_current_observation_id(self):
 721        """Retrieve the ID of the current observation in context.
 722
 723        Returns:
 724            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
 725            possibly due to the method being called outside of any @observe-decorated function execution.
 726
 727        Note:
 728            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
 729            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
 730            - If called at the top level of a trace, it will return the trace ID.
 731        """
 732        stack = _observation_stack_context.get()
 733        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
 734
 735        if not stack:
 736            if should_log_warning:
 737                self._log.warning("No observation found in the current context")
 738
 739            return None
 740
 741        return stack[-1].id
 742
 743    def update_current_trace(
 744        self,
 745        name: Optional[str] = None,
 746        input: Optional[Any] = None,
 747        output: Optional[Any] = None,
 748        user_id: Optional[str] = None,
 749        session_id: Optional[str] = None,
 750        version: Optional[str] = None,
 751        release: Optional[str] = None,
 752        metadata: Optional[Any] = None,
 753        tags: Optional[List[str]] = None,
 754        public: Optional[bool] = None,
 755    ):
 756        """Set parameters for the current trace, updating the trace's metadata and context information.
 757
 758        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
 759        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
 760        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
 761
 762        Arguments:
 763            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
 764            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
 765            output (Optional[Any]): The output or result of the trace
 766            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 767            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 768            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 769            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 770            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 771            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 772
 773        Returns:
 774            None
 775
 776        Note:
 777            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
 778            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
 779            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
 780        """
 781        trace_id = self.get_current_trace_id()
 782
 783        if trace_id is None:
 784            self._log.warning("No trace found in the current context")
 785
 786            return
 787
 788        params_to_update = {
 789            k: v
 790            for k, v in {
 791                "name": name,
 792                "input": input,
 793                "output": output,
 794                "user_id": user_id,
 795                "session_id": session_id,
 796                "version": version,
 797                "release": release,
 798                "metadata": metadata,
 799                "tags": tags,
 800                "public": public,
 801            }.items()
 802            if v is not None
 803        }
 804
 805        _observation_params_context.get()[trace_id].update(params_to_update)
 806
 807    def update_current_observation(
 808        self,
 809        *,
 810        input: Optional[Any] = None,
 811        output: Optional[Any] = None,
 812        name: Optional[str] = None,
 813        version: Optional[str] = None,
 814        metadata: Optional[Any] = None,
 815        start_time: Optional[datetime] = None,
 816        end_time: Optional[datetime] = None,
 817        release: Optional[str] = None,
 818        tags: Optional[List[str]] = None,
 819        user_id: Optional[str] = None,
 820        session_id: Optional[str] = None,
 821        level: Optional[SpanLevel] = None,
 822        status_message: Optional[str] = None,
 823        completion_start_time: Optional[datetime] = None,
 824        model: Optional[str] = None,
 825        model_parameters: Optional[Dict[str, MapValue]] = None,
 826        usage: Optional[Union[BaseModel, ModelUsage]] = None,
 827        prompt: Optional[PromptClient] = None,
 828        public: Optional[bool] = None,
 829    ):
 830        """Update parameters for the current observation within an active trace context.
 831
 832        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
 833        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
 834        enhancing the observability and traceability of the execution context.
 835
 836        Note that if a param is not available on a specific observation type, it will be ignored.
 837
 838        Shared params:
 839            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
 840            - `output` (Optional[Any]): The output or result of the trace or observation
 841            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
 842            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 843            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
 844            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
 845            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 846
 847        Trace-specific params:
 848            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
 849            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 850            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 851            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
 852            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 853
 854        Span-specific params:
 855            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
 856            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
 857
 858        Generation-specific params:
 859            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
 860            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
 861            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
 862            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
 863
 864        Returns:
 865            None
 866
 867        Raises:
 868            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
 869
 870        Note:
 871            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
 872            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
 873            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
 874        """
 875        stack = _observation_stack_context.get()
 876        observation = stack[-1] if stack else None
 877
 878        if not observation:
 879            self._log.warning("No observation found in the current context")
 880
 881            return
 882
 883        update_params = {
 884            k: v
 885            for k, v in {
 886                "input": input,
 887                "output": output,
 888                "name": name,
 889                "version": version,
 890                "metadata": metadata,
 891                "start_time": start_time,
 892                "end_time": end_time,
 893                "release": release,
 894                "tags": tags,
 895                "user_id": user_id,
 896                "session_id": session_id,
 897                "level": level,
 898                "status_message": status_message,
 899                "completion_start_time": completion_start_time,
 900                "model": model,
 901                "model_parameters": model_parameters,
 902                "usage": usage,
 903                "prompt": prompt,
 904                "public": public,
 905            }.items()
 906            if v is not None
 907        }
 908
 909        _observation_params_context.get()[observation.id].update(update_params)
 910
 911    def score_current_observation(
 912        self,
 913        *,
 914        name: str,
 915        value: Union[float, str],
 916        data_type: Optional[ScoreDataType] = None,
 917        comment: Optional[str] = None,
 918        id: Optional[str] = None,
 919        config_id: Optional[str] = None,
 920    ):
 921        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
 922
 923        Arguments:
 924            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 925            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
 926            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 927              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 928            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 929            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 930            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 931
 932        Returns:
 933            None
 934
 935        Note:
 936            This method is intended to be used within the context of an active trace or observation.
 937        """
 938        try:
 939            trace_id = self.get_current_trace_id()
 940            current_observation_id = self.get_current_observation_id()
 941
 942            observation_id = (
 943                current_observation_id if current_observation_id != trace_id else None
 944            )
 945
 946            if trace_id:
 947                self.client_instance.score(
 948                    trace_id=trace_id,
 949                    observation_id=observation_id,
 950                    name=name,
 951                    value=value,
 952                    data_type=data_type,
 953                    comment=comment,
 954                    id=id,
 955                    config_id=config_id,
 956                )
 957            else:
 958                raise ValueError("No trace or observation found in the current context")
 959
 960        except Exception as e:
 961            self._log.error(f"Failed to score observation: {e}")
 962
 963    def score_current_trace(
 964        self,
 965        *,
 966        name: str,
 967        value: Union[float, str],
 968        data_type: Optional[ScoreDataType] = None,
 969        comment: Optional[str] = None,
 970        id: Optional[str] = None,
 971        config_id: Optional[str] = None,
 972    ):
 973        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 974
 975        Arguments:
 976            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 977            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 978            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 979              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 980            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 981            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 982            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 983
 984        Returns:
 985            None
 986
 987        Note:
 988            This method is intended to be used within the context of an active trace or observation.
 989        """
 990        try:
 991            trace_id = self.get_current_trace_id()
 992
 993            if trace_id:
 994                self.client_instance.score(
 995                    trace_id=trace_id,
 996                    name=name,
 997                    value=value,
 998                    data_type=data_type,
 999                    comment=comment,
1000                    id=id,
1001                    config_id=config_id,
1002                )
1003            else:
1004                raise ValueError("No trace found in the current context")
1005
1006        except Exception as e:
1007            self._log.error(f"Failed to score observation: {e}")
1008
1009    @catch_and_log_errors
1010    def flush(self):
1011        """Force immediate flush of all buffered observations to the Langfuse backend.
1012
1013        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1014        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1015
1016        Usage:
1017            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1018            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1019
1020        Returns:
1021            None
1022
1023        Raises:
1024            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1025
1026        Note:
1027            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1028            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1029            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1030        """
1031        if self.client_instance:
1032            self.client_instance.flush()
1033        else:
1034            self._log.warning("No langfuse object found in the current context")
1035
1036    def configure(
1037        self,
1038        *,
1039        public_key: Optional[str] = None,
1040        secret_key: Optional[str] = None,
1041        host: Optional[str] = None,
1042        release: Optional[str] = None,
1043        debug: Optional[bool] = None,
1044        threads: Optional[int] = None,
1045        flush_at: Optional[int] = None,
1046        flush_interval: Optional[int] = None,
1047        max_retries: Optional[int] = None,
1048        timeout: Optional[int] = None,
1049        httpx_client: Optional[httpx.Client] = None,
1050        enabled: Optional[bool] = None,
1051        mask: Optional[Callable] = None,
1052    ):
1053        """Configure the Langfuse client.
1054
1055        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1056
1057        Args:
1058            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1059            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1060            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1061            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1062            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1063            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1064            flush_at: Max batch size that's sent to the API.
1065            flush_interval: Max delay until a new batch is sent to the API.
1066            max_retries: Max number of retries in case of API/network errors.
1067            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1068            httpx_client: Pass your own httpx client for more customizability of requests.
1069            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1070            mask (Callable): Function that masks sensitive information from input and output in log messages.
1071
1072        """
1073        langfuse_singleton = LangfuseSingleton()
1074        langfuse_singleton.reset()
1075
1076        langfuse_singleton.get(
1077            public_key=public_key,
1078            secret_key=secret_key,
1079            host=host,
1080            release=release,
1081            debug=debug,
1082            threads=threads,
1083            flush_at=flush_at,
1084            flush_interval=flush_interval,
1085            max_retries=max_retries,
1086            timeout=timeout,
1087            httpx_client=httpx_client,
1088            enabled=enabled,
1089            mask=mask,
1090        )
1091
1092    @property
1093    def client_instance(self) -> Langfuse:
1094        """Get the Langfuse client instance for the current decorator context."""
1095        return LangfuseSingleton().get()
1096
1097    def _set_root_trace_id(self, trace_id: str):
1098        if _observation_stack_context.get():
1099            self._log.warning(
1100                "Root Trace ID cannot be set on a already running trace. Skipping root trace ID assignment."
1101            )
1102            return
1103
1104        _root_trace_id_context.set(trace_id)
1105
1106    def _pop_observation_params_from_context(
1107        self, observation_id: str
1108    ) -> ObservationParams:
1109        params = _observation_params_context.get()[observation_id].copy()
1110
1111        # Remove observation params to avoid leaking
1112        del _observation_params_context.get()[observation_id]
1113
1114        return params
1115
1116    def auth_check(self) -> bool:
1117        """Check if the current Langfuse client is authenticated.
1118
1119        Returns:
1120            bool: True if the client is authenticated, False otherwise
1121        """
1122        try:
1123            return self.client_instance.auth_check()
1124        except Exception as e:
1125            self._log.error(
1126                "No Langfuse object found in the current context", exc_info=e
1127            )
1128
1129            return False
def observe( self, func: Optional[Callable[~P, ~R]] = None, *, name: Optional[str] = None, as_type: Optional[Literal['generation']] = None, capture_input: bool = True, capture_output: bool = True, transform_to_string: Optional[Callable[[Iterable], str]] = None) -> Callable[[Callable[~P, ~R]], Callable[~P, ~R]]:
115    def observe(
116        self,
117        func: Optional[Callable[P, R]] = None,
118        *,
119        name: Optional[str] = None,
120        as_type: Optional[Literal["generation"]] = None,
121        capture_input: bool = True,
122        capture_output: bool = True,
123        transform_to_string: Optional[Callable[[Iterable], str]] = None,
124    ) -> Callable[[Callable[P, R]], Callable[P, R]]:
125        """Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.
126
127        It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context.
128        In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.
129
130        Attributes:
131            name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
132            as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
133            capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
134            capture_output (bool): If True, captures the return value of the function as output. Default is True.
135            transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
136
137        Returns:
138            Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.
139
140        Example:
141            For general tracing (functions/methods):
142            ```python
143            @observe()
144            def your_function(args):
145                # Your implementation here
146            ```
147            For observing language model generations:
148            ```python
149            @observe(as_type="generation")
150            def your_LLM_function(args):
151                # Your LLM invocation here
152            ```
153
154        Raises:
155            Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.
156
157        Note:
158        - Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the `langfuse_observation_id` keyword when calling the wrapped function.
159        - To update observation or trace parameters (e.g., metadata, session_id), use `langfuse.update_current_observation` and `langfuse.update_current_trace` methods within the wrapped function.
160        """
161
162        def decorator(func: Callable[P, R]) -> Callable[P, R]:
163            return (
164                self._async_observe(
165                    func,
166                    name=name,
167                    as_type=as_type,
168                    capture_input=capture_input,
169                    capture_output=capture_output,
170                    transform_to_string=transform_to_string,
171                )
172                if asyncio.iscoroutinefunction(func)
173                else self._sync_observe(
174                    func,
175                    name=name,
176                    as_type=as_type,
177                    capture_input=capture_input,
178                    capture_output=capture_output,
179                    transform_to_string=transform_to_string,
180                )
181            )
182
183        """
184        If the decorator is called without arguments, return the decorator function itself. 
185        This allows the decorator to be used with or without arguments. 
186        Python calls the decorator function with the decorated function as an argument when the decorator is used without arguments.
187        """
188        if func is None:
189            return decorator
190        else:
191            return decorator(func)

Wrap a function to create and manage Langfuse tracing around its execution, supporting both synchronous and asynchronous functions.

It captures the function's execution context, including start/end times, input/output data, and automatically handles trace/span generation within the Langfuse observation context. In case of an exception, the observation is updated with error details. The top-most decorated function is treated as a trace, with nested calls captured as spans or generations.

Attributes:
  • name (Optional[str]): Name of the created trace or span. Overwrites the function name as the default used for the trace or span name.
  • as_type (Optional[Literal["generation"]]): Specify "generation" to treat the observation as a generation type, suitable for language model invocations.
  • capture_input (bool): If True, captures the args and kwargs of the function as input. Default is True.
  • capture_output (bool): If True, captures the return value of the function as output. Default is True.
  • transform_to_string (Optional[Callable[[Iterable], str]]): When the decorated function returns a generator, this function transforms yielded values into a string representation for output capture
Returns:

Callable: A wrapped version of the original function that, upon execution, is automatically observed and managed by Langfuse.

Example:

For general tracing (functions/methods):

@observe()
def your_function(args):
    # Your implementation here

For observing language model generations:

@observe(as_type="generation")
def your_LLM_function(args):
    # Your LLM invocation here
Raises:
  • Exception: Propagates exceptions from the wrapped function after logging and updating the observation with error details.

Note:

  • Automatic observation ID and context management is provided. Optionally, an observation ID can be specified using the langfuse_observation_id keyword when calling the wrapped function.
  • To update observation or trace parameters (e.g., metadata, session_id), use langfuse.update_current_observation and langfuse.update_current_trace methods within the wrapped function.
def get_current_llama_index_handler(self):
580    def get_current_llama_index_handler(self):
581        """Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.
582
583        This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation.
584        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.
585
586        See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.
587
588        Returns:
589            LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
590
591        Note:
592            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
593            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
594        """
595        try:
596            from langfuse.llama_index import LlamaIndexCallbackHandler
597        except ImportError:
598            self._log.error(
599                "LlamaIndexCallbackHandler is not available, most likely because llama-index is not installed. pip install llama-index"
600            )
601
602            return None
603
604        stack = _observation_stack_context.get()
605        observation = stack[-1] if stack else None
606
607        if observation is None:
608            self._log.warning("No observation found in the current context")
609
610            return None
611
612        if isinstance(observation, StatefulGenerationClient):
613            self._log.warning(
614                "Current observation is of type GENERATION, LlamaIndex handler is not supported for this type of observation"
615            )
616
617            return None
618
619        callback_handler = LlamaIndexCallbackHandler()
620        callback_handler.set_root(observation)
621
622        return callback_handler

Retrieve the current LlamaIndexCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LlamaIndexCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with the LlamaIndex API based on the current observation context.

See the Langfuse documentation for more information on integrating the LlamaIndexCallbackHandler.

Returns:

LlamaIndexCallbackHandler or None: Returns a LlamaIndexCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_langchain_handler(self):
624    def get_current_langchain_handler(self):
625        """Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.
626
627        This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation.
628        It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.
629
630        See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.
631
632        Returns:
633            LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.
634
635        Note:
636            - This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
637            - If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
638        """
639        stack = _observation_stack_context.get()
640        observation = stack[-1] if stack else None
641
642        if observation is None:
643            self._log.warning("No observation found in the current context")
644
645            return None
646
647        if isinstance(observation, StatefulGenerationClient):
648            self._log.warning(
649                "Current observation is of type GENERATION, Langchain handler is not supported for this type of observation"
650            )
651
652            return None
653
654        return observation.get_langchain_handler()

Retrieve the current LangchainCallbackHandler associated with the most recent observation in the observation stack.

This method fetches the current observation from the observation stack and returns a LangchainCallbackHandler initialized with this observation. It is intended to be used within the context of a trace, allowing access to a callback handler for operations that require interaction with Langchain based on the current observation context.

See the Langfuse documentation for more information on integrating the LangchainCallbackHandler.

Returns:

LangchainCallbackHandler or None: Returns a LangchainCallbackHandler instance if there is an active observation in the current context; otherwise, returns None if no observation is found.

Note:
  • This method should be called within the context of a trace (i.e., within a function wrapped by @observe) to ensure that an observation context exists.
  • If no observation is found in the current context (e.g., if called outside of a trace or if the observation stack is empty), the method logs a warning and returns None.
def get_current_trace_id(self):
656    def get_current_trace_id(self):
657        """Retrieve the ID of the current trace from the observation stack context.
658
659        This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID,
660        such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack,
661        representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.
662
663        Returns:
664            str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
665            possibly due to the method being called outside of any @observe-decorated function execution.
666
667        Note:
668            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
669            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
670        """
671        context_trace_id = _root_trace_id_context.get()
672        if context_trace_id:
673            return context_trace_id
674
675        stack = _observation_stack_context.get()
676        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
677
678        if not stack:
679            if should_log_warning:
680                self._log.warning("No trace found in the current context")
681
682            return None
683
684        return stack[0].id

Retrieve the ID of the current trace from the observation stack context.

This method examines the observation stack to find the root trace and returns its ID. It is useful for operations that require the trace ID, such as setting trace parameters or querying trace information. The trace ID is typically the ID of the first observation in the stack, representing the entry point of the traced execution context. If you have provided a langfuse_parent_trace_id directly, it will return that instead.

Returns:

str or None: The ID of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_trace_url(self) -> Optional[str]:
696    def get_current_trace_url(self) -> Optional[str]:
697        """Retrieve the URL of the current trace in context.
698
699        Returns:
700            str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context,
701            possibly due to the method being called outside of any @observe-decorated function execution.
702
703        Note:
704            - This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
705            - If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
706        """
707        try:
708            trace_id = self.get_current_trace_id()
709
710            if not trace_id:
711                raise ValueError("No trace found in the current context")
712
713            return f"{self.client_instance.client._client_wrapper._base_url}/trace/{trace_id}"
714
715        except Exception as e:
716            self._log.error(f"Failed to get current trace URL: {e}")
717
718            return None

Retrieve the URL of the current trace in context.

Returns:

str or None: The URL of the current trace if available; otherwise, None. A return value of None indicates that there is no active trace in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace (i.e., inside a function wrapped with the @observe decorator) to ensure that a current trace is indeed present and its ID can be retrieved.
  • If called outside of a trace context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
def get_current_observation_id(self):
720    def get_current_observation_id(self):
721        """Retrieve the ID of the current observation in context.
722
723        Returns:
724            str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context,
725            possibly due to the method being called outside of any @observe-decorated function execution.
726
727        Note:
728            - This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
729            - If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
730            - If called at the top level of a trace, it will return the trace ID.
731        """
732        stack = _observation_stack_context.get()
733        should_log_warning = self._get_caller_module_name() != "langfuse.openai"
734
735        if not stack:
736            if should_log_warning:
737                self._log.warning("No observation found in the current context")
738
739            return None
740
741        return stack[-1].id

Retrieve the ID of the current observation in context.

Returns:

str or None: The ID of the current observation if available; otherwise, None. A return value of None indicates that there is no active trace or observation in the current context, possibly due to the method being called outside of any @observe-decorated function execution.

Note:
  • This method should be called within the context of a trace or observation (i.e., inside a function wrapped with the @observe decorator) to ensure that a current observation is indeed present and its ID can be retrieved.
  • If called outside of a trace or observation context, or if the observation stack has somehow been corrupted or improperly managed, this method will log a warning and return None, indicating the absence of a traceable context.
  • If called at the top level of a trace, it will return the trace ID.
def update_current_trace( self, name: Optional[str] = None, input: Optional[Any] = None, output: Optional[Any] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
743    def update_current_trace(
744        self,
745        name: Optional[str] = None,
746        input: Optional[Any] = None,
747        output: Optional[Any] = None,
748        user_id: Optional[str] = None,
749        session_id: Optional[str] = None,
750        version: Optional[str] = None,
751        release: Optional[str] = None,
752        metadata: Optional[Any] = None,
753        tags: Optional[List[str]] = None,
754        public: Optional[bool] = None,
755    ):
756        """Set parameters for the current trace, updating the trace's metadata and context information.
757
758        This method allows for dynamically updating the trace parameters at any point during the execution of a trace.
759        It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information,
760        and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.
761
762        Arguments:
763            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
764            input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
765            output (Optional[Any]): The output or result of the trace
766            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
767            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
768            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
769            release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
770            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
771            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
772
773        Returns:
774            None
775
776        Note:
777            - This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
778            - The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
779            - If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
780        """
781        trace_id = self.get_current_trace_id()
782
783        if trace_id is None:
784            self._log.warning("No trace found in the current context")
785
786            return
787
788        params_to_update = {
789            k: v
790            for k, v in {
791                "name": name,
792                "input": input,
793                "output": output,
794                "user_id": user_id,
795                "session_id": session_id,
796                "version": version,
797                "release": release,
798                "metadata": metadata,
799                "tags": tags,
800                "public": public,
801            }.items()
802            if v is not None
803        }
804
805        _observation_params_context.get()[trace_id].update(params_to_update)

Set parameters for the current trace, updating the trace's metadata and context information.

This method allows for dynamically updating the trace parameters at any point during the execution of a trace. It updates the parameters of the current trace based on the provided arguments. These parameters include metadata, session information, and other trace attributes that can be useful for categorization, filtering, and analysis in the Langfuse UI.

Arguments:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI..
  • input (Optional[Any]): The input parameters of the trace, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
Returns:

None

Note:
  • This method should be used within the context of an active trace, typically within a function that is being traced using the @observe decorator.
  • The method updates the trace parameters for the currently executing trace. In nested trace scenarios, it affects the most recent trace context.
  • If called outside of an active trace context, a warning is logged, and a ValueError is raised to indicate the absence of a traceable context.
def update_current_observation( self, *, input: Optional[Any] = None, output: Optional[Any] = None, name: Optional[str] = None, version: Optional[str] = None, metadata: Optional[Any] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, release: Optional[str] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, level: Optional[Literal['DEBUG', 'DEFAULT', 'WARNING', 'ERROR']] = None, status_message: Optional[str] = None, completion_start_time: Optional[datetime.datetime] = None, model: Optional[str] = None, model_parameters: Optional[Dict[str, Union[str, NoneType, int, bool, List[str]]]] = None, usage: Union[pydantic.main.BaseModel, langfuse.model.ModelUsage, NoneType] = None, prompt: Union[langfuse.model.TextPromptClient, langfuse.model.ChatPromptClient, NoneType] = None, public: Optional[bool] = None):
807    def update_current_observation(
808        self,
809        *,
810        input: Optional[Any] = None,
811        output: Optional[Any] = None,
812        name: Optional[str] = None,
813        version: Optional[str] = None,
814        metadata: Optional[Any] = None,
815        start_time: Optional[datetime] = None,
816        end_time: Optional[datetime] = None,
817        release: Optional[str] = None,
818        tags: Optional[List[str]] = None,
819        user_id: Optional[str] = None,
820        session_id: Optional[str] = None,
821        level: Optional[SpanLevel] = None,
822        status_message: Optional[str] = None,
823        completion_start_time: Optional[datetime] = None,
824        model: Optional[str] = None,
825        model_parameters: Optional[Dict[str, MapValue]] = None,
826        usage: Optional[Union[BaseModel, ModelUsage]] = None,
827        prompt: Optional[PromptClient] = None,
828        public: Optional[bool] = None,
829    ):
830        """Update parameters for the current observation within an active trace context.
831
832        This method dynamically adjusts the parameters of the most recent observation on the observation stack.
833        It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more,
834        enhancing the observability and traceability of the execution context.
835
836        Note that if a param is not available on a specific observation type, it will be ignored.
837
838        Shared params:
839            - `input` (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
840            - `output` (Optional[Any]): The output or result of the trace or observation
841            - `name` (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
842            - `metadata` (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
843            - `start_time` (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
844            - `end_time` (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
845            - `version` (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
846
847        Trace-specific params:
848            - `user_id` (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
849            - `session_id` (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
850            - `release` (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
851            - `tags` (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
852            - `public` (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
853
854        Span-specific params:
855            - `level` (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR".
856            - `status_message` (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.
857
858        Generation-specific params:
859            - `completion_start_time` (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
860            - `model_parameters` (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs.
861            - `usage` (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse.
862            - `prompt`(Optional[PromptClient]): The prompt object used for the generation.
863
864        Returns:
865            None
866
867        Raises:
868            ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
869
870        Note:
871            - This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
872            - It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
873            - Parameters set to `None` will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
874        """
875        stack = _observation_stack_context.get()
876        observation = stack[-1] if stack else None
877
878        if not observation:
879            self._log.warning("No observation found in the current context")
880
881            return
882
883        update_params = {
884            k: v
885            for k, v in {
886                "input": input,
887                "output": output,
888                "name": name,
889                "version": version,
890                "metadata": metadata,
891                "start_time": start_time,
892                "end_time": end_time,
893                "release": release,
894                "tags": tags,
895                "user_id": user_id,
896                "session_id": session_id,
897                "level": level,
898                "status_message": status_message,
899                "completion_start_time": completion_start_time,
900                "model": model,
901                "model_parameters": model_parameters,
902                "usage": usage,
903                "prompt": prompt,
904                "public": public,
905            }.items()
906            if v is not None
907        }
908
909        _observation_params_context.get()[observation.id].update(update_params)

Update parameters for the current observation within an active trace context.

This method dynamically adjusts the parameters of the most recent observation on the observation stack. It allows for the enrichment of observation data with additional details such as input parameters, output results, metadata, and more, enhancing the observability and traceability of the execution context.

Note that if a param is not available on a specific observation type, it will be ignored.

Shared params:
  • input (Optional[Any]): The input parameters of the trace or observation, providing context about the observed operation or function call.
  • output (Optional[Any]): The output or result of the trace or observation
  • name (Optional[str]): Identifier of the trace or observation. Useful for sorting/filtering in the UI.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • start_time (Optional[datetime]): The start time of the observation, allowing for custom time range specification.
  • end_time (Optional[datetime]): The end time of the observation, enabling precise control over the observation duration.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.

Trace-specific params: - user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. - session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. - release (Optional[str]): The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. - tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. - public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.

Span-specific params: - level (Optional[SpanLevel]): The severity or importance level of the observation, such as "INFO", "WARNING", or "ERROR". - status_message (Optional[str]): A message or description associated with the observation's status, particularly useful for error reporting.

Generation-specific params: - completion_start_time (Optional[datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. - model_parameters (Optional[Dict[str, MapValue]]): The parameters of the model used for the generation; can be any key-value pairs. - usage (Optional[Union[BaseModel, ModelUsage]]): The usage object supports the OpenAi structure with {promptTokens, completionTokens, totalTokens} and a more generic version {input, output, total, unit, inputCost, outputCost, totalCost} where unit can be of value "TOKENS", "CHARACTERS", "MILLISECONDS", "SECONDS", or "IMAGES". Refer to the docs on how to automatically infer token usage and costs in Langfuse. - prompt(Optional[PromptClient]): The prompt object used for the generation.

Returns:

None

Raises:
  • ValueError: If no current observation is found in the context, indicating that this method was called outside of an observation's execution scope.
Note:
  • This method is intended to be used within the context of an active observation, typically within a function wrapped by the @observe decorator.
  • It updates the parameters of the most recently created observation on the observation stack. Care should be taken in nested observation contexts to ensure the updates are applied as intended.
  • Parameters set to None will not overwrite existing values for those parameters. This behavior allows for selective updates without clearing previously set information.
def score_current_observation( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
911    def score_current_observation(
912        self,
913        *,
914        name: str,
915        value: Union[float, str],
916        data_type: Optional[ScoreDataType] = None,
917        comment: Optional[str] = None,
918        id: Optional[str] = None,
919        config_id: Optional[str] = None,
920    ):
921        """Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.
922
923        Arguments:
924            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
925            value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
926            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
927              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
928            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
929            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
930            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
931
932        Returns:
933            None
934
935        Note:
936            This method is intended to be used within the context of an active trace or observation.
937        """
938        try:
939            trace_id = self.get_current_trace_id()
940            current_observation_id = self.get_current_observation_id()
941
942            observation_id = (
943                current_observation_id if current_observation_id != trace_id else None
944            )
945
946            if trace_id:
947                self.client_instance.score(
948                    trace_id=trace_id,
949                    observation_id=observation_id,
950                    name=name,
951                    value=value,
952                    data_type=data_type,
953                    comment=comment,
954                    id=id,
955                    config_id=config_id,
956                )
957            else:
958                raise ValueError("No trace or observation found in the current context")
959
960        except Exception as e:
961            self._log.error(f"Failed to score observation: {e}")

Score the current observation within an active trace. If called on the top level of a trace, it will score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (float): The numerical value of the score. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

def score_current_trace( self, *, name: str, value: Union[float, str], data_type: Optional[Literal['NUMERIC', 'CATEGORICAL', 'BOOLEAN']] = None, comment: Optional[str] = None, id: Optional[str] = None, config_id: Optional[str] = None):
 963    def score_current_trace(
 964        self,
 965        *,
 966        name: str,
 967        value: Union[float, str],
 968        data_type: Optional[ScoreDataType] = None,
 969        comment: Optional[str] = None,
 970        id: Optional[str] = None,
 971        config_id: Optional[str] = None,
 972    ):
 973        """Score the current trace in context. This can be called anywhere in the nested trace to score the trace.
 974
 975        Arguments:
 976            name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
 977            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
 978            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
 979              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
 980            comment (Optional[str]): An optional comment or description providing context or additional details about the score.
 981            id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
 982            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
 983
 984        Returns:
 985            None
 986
 987        Note:
 988            This method is intended to be used within the context of an active trace or observation.
 989        """
 990        try:
 991            trace_id = self.get_current_trace_id()
 992
 993            if trace_id:
 994                self.client_instance.score(
 995                    trace_id=trace_id,
 996                    name=name,
 997                    value=value,
 998                    data_type=data_type,
 999                    comment=comment,
1000                    id=id,
1001                    config_id=config_id,
1002                )
1003            else:
1004                raise ValueError("No trace found in the current context")
1005
1006        except Exception as e:
1007            self._log.error(f"Failed to score observation: {e}")

Score the current trace in context. This can be called anywhere in the nested trace to score the trace.

Arguments:
  • name (str): The name of the score metric. This should be a clear and concise identifier for the metric being recorded.
  • value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. This could represent performance metrics, error rates, or any other quantifiable measure.
  • data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
  • comment (Optional[str]): An optional comment or description providing context or additional details about the score.
  • id (Optional[str]): An optional custom ID for the scoring event. Useful for linking scores with external systems or for detailed tracking.
  • config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
Returns:

None

Note:

This method is intended to be used within the context of an active trace or observation.

@catch_and_log_errors
def flush(self):
1009    @catch_and_log_errors
1010    def flush(self):
1011        """Force immediate flush of all buffered observations to the Langfuse backend.
1012
1013        This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers.
1014        It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.
1015
1016        Usage:
1017            - This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
1018            - It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
1019
1020        Returns:
1021            None
1022
1023        Raises:
1024            ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
1025
1026        Note:
1027            - The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
1028            - In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client.
1029            However, explicit calls to `flush` can be beneficial in certain edge cases or for debugging purposes.
1030        """
1031        if self.client_instance:
1032            self.client_instance.flush()
1033        else:
1034            self._log.warning("No langfuse object found in the current context")

Force immediate flush of all buffered observations to the Langfuse backend.

This method triggers the explicit sending of all accumulated trace and observation data that has not yet been sent to Langfuse servers. It is typically used to ensure that data is promptly available for analysis, especially at the end of an execution context or before the application exits.

Usage:
  • This method can be called at strategic points in the application where it's crucial to ensure that all telemetry data captured up to that point is made persistent and visible on the Langfuse platform.
  • It's particularly useful in scenarios where the application might terminate abruptly or in batch processing tasks that require periodic flushing of trace data.
Returns:

None

Raises:
  • ValueError: If it fails to find a Langfuse client object in the current context, indicating potential misconfiguration or initialization issues.
Note:
  • The flush operation may involve network I/O to send data to the Langfuse backend, which could impact performance if called too frequently in performance-sensitive contexts.
  • In long-running applications, it's often sufficient to rely on the automatic flushing mechanism provided by the Langfuse client. However, explicit calls to flush can be beneficial in certain edge cases or for debugging purposes.
def configure( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, mask: Optional[Callable] = None):
1036    def configure(
1037        self,
1038        *,
1039        public_key: Optional[str] = None,
1040        secret_key: Optional[str] = None,
1041        host: Optional[str] = None,
1042        release: Optional[str] = None,
1043        debug: Optional[bool] = None,
1044        threads: Optional[int] = None,
1045        flush_at: Optional[int] = None,
1046        flush_interval: Optional[int] = None,
1047        max_retries: Optional[int] = None,
1048        timeout: Optional[int] = None,
1049        httpx_client: Optional[httpx.Client] = None,
1050        enabled: Optional[bool] = None,
1051        mask: Optional[Callable] = None,
1052    ):
1053        """Configure the Langfuse client.
1054
1055        If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.
1056
1057        Args:
1058            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
1059            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
1060            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
1061            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
1062            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
1063            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
1064            flush_at: Max batch size that's sent to the API.
1065            flush_interval: Max delay until a new batch is sent to the API.
1066            max_retries: Max number of retries in case of API/network errors.
1067            timeout: Timeout of API requests in seconds. Default is 20 seconds.
1068            httpx_client: Pass your own httpx client for more customizability of requests.
1069            enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
1070            mask (Callable): Function that masks sensitive information from input and output in log messages.
1071
1072        """
1073        langfuse_singleton = LangfuseSingleton()
1074        langfuse_singleton.reset()
1075
1076        langfuse_singleton.get(
1077            public_key=public_key,
1078            secret_key=secret_key,
1079            host=host,
1080            release=release,
1081            debug=debug,
1082            threads=threads,
1083            flush_at=flush_at,
1084            flush_interval=flush_interval,
1085            max_retries=max_retries,
1086            timeout=timeout,
1087            httpx_client=httpx_client,
1088            enabled=enabled,
1089            mask=mask,
1090        )

Configure the Langfuse client.

If called, this method must be called before any other langfuse_context or observe decorated function to configure the Langfuse client with the necessary credentials and settings.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds. Default is 20 seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
  • mask (Callable): Function that masks sensitive information from input and output in log messages.
client_instance: langfuse.client.Langfuse
1092    @property
1093    def client_instance(self) -> Langfuse:
1094        """Get the Langfuse client instance for the current decorator context."""
1095        return LangfuseSingleton().get()

Get the Langfuse client instance for the current decorator context.

def auth_check(self) -> bool:
1116    def auth_check(self) -> bool:
1117        """Check if the current Langfuse client is authenticated.
1118
1119        Returns:
1120            bool: True if the client is authenticated, False otherwise
1121        """
1122        try:
1123            return self.client_instance.auth_check()
1124        except Exception as e:
1125            self._log.error(
1126                "No Langfuse object found in the current context", exc_info=e
1127            )
1128
1129            return False

Check if the current Langfuse client is authenticated.

Returns:

bool: True if the client is authenticated, False otherwise