langfuse.client

   1from contextlib import contextmanager
   2import datetime as dt
   3import logging
   4import os
   5import typing
   6import uuid
   7import httpx
   8from enum import Enum
   9import time
  10import tracemalloc
  11from typing import Any, Dict, Optional, Literal, Union, List, overload
  12import urllib.parse
  13
  14
  15from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody
  16from langfuse.api.resources.ingestion.types.create_generation_body import (
  17    CreateGenerationBody,
  18)
  19from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody
  20from langfuse.api.resources.ingestion.types.score_body import ScoreBody
  21from langfuse.api.resources.ingestion.types.trace_body import TraceBody
  22from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
  23from langfuse.api.resources.ingestion.types.update_generation_body import (
  24    UpdateGenerationBody,
  25)
  26from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody
  27from langfuse.api.resources.observations.types.observations_views import (
  28    ObservationsViews,
  29)
  30from langfuse.api.resources.prompts.types import (
  31    CreatePromptRequest_Chat,
  32    CreatePromptRequest_Text,
  33)
  34from langfuse.model import (
  35    CreateDatasetItemRequest,
  36    CreateDatasetRequest,
  37    CreateDatasetRunItemRequest,
  38    ChatMessageDict,
  39    DatasetItem,
  40    DatasetRun,
  41    DatasetStatus,
  42    ModelUsage,
  43    PromptClient,
  44    ChatPromptClient,
  45    TextPromptClient,
  46)
  47from langfuse.prompt_cache import PromptCache
  48
  49try:
  50    import pydantic.v1 as pydantic  # type: ignore
  51except ImportError:
  52    import pydantic  # type: ignore
  53
  54from langfuse.api.client import FernLangfuse
  55from langfuse.environment import get_common_release_envs
  56from langfuse.logging import clean_logger
  57from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
  58from langfuse.request import LangfuseClient
  59from langfuse.task_manager import TaskManager
  60from langfuse.types import SpanLevel
  61from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
  62
  63from .version import __version__ as version
  64
  65
  66class Langfuse(object):
  67    """Langfuse Python client.
  68
  69    Attributes:
  70        log (logging.Logger): Logger for the Langfuse client.
  71        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
  72        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
  73        client (FernLangfuse): Core interface for Langfuse API interaction.
  74        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
  75        release (str): Identifies the release number or hash of the application.
  76        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
  77
  78    Example:
  79        Initiating the Langfuse client should always be first step to use Langfuse.
  80        ```python
  81        import os
  82        from langfuse import Langfuse
  83
  84        # Set the public and secret keys as environment variables
  85        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
  86        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
  87
  88        # Initialize the Langfuse client using the credentials
  89        langfuse = Langfuse()
  90        ```
  91    """
  92
  93    log = logging.getLogger("langfuse")
  94    """Logger for the Langfuse client."""
  95
  96    host: str
  97    """Host of Langfuse API."""
  98
  99    def __init__(
 100        self,
 101        public_key: Optional[str] = None,
 102        secret_key: Optional[str] = None,
 103        host: Optional[str] = None,
 104        release: Optional[str] = None,
 105        debug: bool = False,
 106        threads: Optional[int] = None,
 107        flush_at: Optional[int] = None,
 108        flush_interval: Optional[float] = None,
 109        max_retries: Optional[int] = None,
 110        timeout: Optional[int] = None,  # seconds
 111        sdk_integration: Optional[str] = "default",
 112        httpx_client: Optional[httpx.Client] = None,
 113        enabled: Optional[bool] = True,
 114    ):
 115        """Initialize the Langfuse client.
 116
 117        Args:
 118            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 119            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 120            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 121            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 122            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 123            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 124            flush_at: Max batch size that's sent to the API.
 125            flush_interval: Max delay until a new batch is sent to the API.
 126            max_retries: Max number of retries in case of API/network errors.
 127            timeout: Timeout of API requests in seconds.
 128            httpx_client: Pass your own httpx client for more customizability of requests.
 129            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 130            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 131
 132        Raises:
 133            ValueError: If public_key or secret_key are not set and not found in environment variables.
 134
 135        Example:
 136            Initiating the Langfuse client should always be first step to use Langfuse.
 137            ```python
 138            import os
 139            from langfuse import Langfuse
 140
 141            # Set the public and secret keys as environment variables
 142            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 143            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 144
 145            # Initialize the Langfuse client using the credentials
 146            langfuse = Langfuse()
 147            ```
 148        """
 149        self.enabled = enabled
 150        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 151        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 152
 153        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 154        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 155        flush_interval = flush_interval or float(
 156            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 157        )
 158
 159        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 160        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 161
 162        if not self.enabled:
 163            self.log.warning(
 164                "Langfuse client is disabled. No observability data will be sent."
 165            )
 166
 167        elif not public_key:
 168            self.enabled = False
 169            self.log.warning(
 170                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 171            )
 172
 173        elif not secret_key:
 174            self.enabled = False
 175            self.log.warning(
 176                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 177            )
 178
 179        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 180
 181        if set_debug is True:
 182            # Ensures that debug level messages are logged when debug mode is on.
 183            # Otherwise, defaults to WARNING level.
 184            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 185            logging.basicConfig()
 186            self.log.setLevel(logging.DEBUG)
 187
 188            clean_logger()
 189        else:
 190            self.log.setLevel(logging.WARNING)
 191            clean_logger()
 192
 193        self.base_url = (
 194            host
 195            if host
 196            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 197        )
 198
 199        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 200
 201        self.client = FernLangfuse(
 202            base_url=self.base_url,
 203            username=public_key,
 204            password=secret_key,
 205            x_langfuse_sdk_name="python",
 206            x_langfuse_sdk_version=version,
 207            x_langfuse_public_key=public_key,
 208            httpx_client=self.httpx_client,
 209        )
 210
 211        langfuse_client = LangfuseClient(
 212            public_key=public_key,
 213            secret_key=secret_key,
 214            base_url=self.base_url,
 215            version=version,
 216            timeout=timeout,
 217            session=self.httpx_client,
 218        )
 219
 220        args = {
 221            "threads": threads,
 222            "flush_at": flush_at,
 223            "flush_interval": flush_interval,
 224            "max_retries": max_retries,
 225            "client": langfuse_client,
 226            "public_key": public_key,
 227            "sdk_name": "python",
 228            "sdk_version": version,
 229            "sdk_integration": sdk_integration,
 230            "enabled": self.enabled,
 231        }
 232
 233        self.task_manager = TaskManager(**args)
 234
 235        self.trace_id = None
 236
 237        self.release = self._get_release_value(release)
 238
 239        self.prompt_cache = PromptCache()
 240
 241    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 242        if release:
 243            return release
 244        elif "LANGFUSE_RELEASE" in os.environ:
 245            return os.environ["LANGFUSE_RELEASE"]
 246        else:
 247            return get_common_release_envs()
 248
 249    def get_trace_id(self) -> str:
 250        """Get the current trace id."""
 251        return self.trace_id
 252
 253    def get_trace_url(self) -> str:
 254        """Get the URL of the current trace to view it in the Langfuse UI."""
 255        return f"{self.base_url}/trace/{self.trace_id}"
 256
 257    def get_dataset(self, name: str) -> "DatasetClient":
 258        """Fetch a dataset by its name.
 259
 260        Args:
 261            name (str): The name of the dataset to fetch.
 262
 263        Returns:
 264            DatasetClient: The dataset with the given name.
 265        """
 266        try:
 267            self.log.debug(f"Getting datasets {name}")
 268            dataset = self.client.datasets.get(dataset_name=name)
 269
 270            items = [DatasetItemClient(i, langfuse=self) for i in dataset.items]
 271
 272            return DatasetClient(dataset, items=items)
 273        except Exception as e:
 274            self.log.exception(e)
 275            raise e
 276
 277    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 278        """Get the dataset item with the given id."""
 279        try:
 280            self.log.debug(f"Getting dataset item {id}")
 281            dataset_item = self.client.dataset_items.get(id=id)
 282            return DatasetItemClient(dataset_item, langfuse=self)
 283        except Exception as e:
 284            self.log.exception(e)
 285            raise e
 286
 287    def auth_check(self) -> bool:
 288        """Check if the provided credentials (public and secret key) are valid.
 289
 290        Raises:
 291            Exception: If no projects were found for the provided credentials.
 292
 293        Note:
 294            This method is blocking. It is discouraged to use it in production code.
 295        """
 296        try:
 297            projects = self.client.projects.get()
 298            self.log.debug(
 299                f"Auth check successful, found {len(projects.data)} projects"
 300            )
 301            if len(projects.data) == 0:
 302                raise Exception(
 303                    "Auth check failed, no project found for the keys provided."
 304                )
 305            return True
 306
 307        except Exception as e:
 308            self.log.exception(e)
 309            raise e
 310
 311    def get_dataset_run(
 312        self,
 313        dataset_name: str,
 314        dataset_run_name: str,
 315    ) -> DatasetRun:
 316        """Get a dataset run.
 317
 318        Args:
 319            dataset_name: Name of the dataset.
 320            dataset_run_name: Name of the dataset run.
 321
 322        Returns:
 323            DatasetRun: The dataset run.
 324        """
 325        try:
 326            self.log.debug(
 327                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 328            )
 329            return self.client.datasets.get_runs(
 330                dataset_name=dataset_name, run_name=dataset_run_name
 331            )
 332        except Exception as e:
 333            self.log.exception(e)
 334            raise e
 335
 336    def create_dataset(
 337        self,
 338        name: str,
 339        description: Optional[str] = None,
 340        metadata: Optional[Any] = None,
 341    ) -> Dataset:
 342        """Create a dataset with the given name on Langfuse.
 343
 344        Args:
 345            name: Name of the dataset to create.
 346            description: Description of the dataset. Defaults to None.
 347            metadata: Additional metadata. Defaults to None.
 348
 349        Returns:
 350            Dataset: The created dataset as returned by the Langfuse API.
 351        """
 352        try:
 353            body = CreateDatasetRequest(
 354                name=name, description=description, metadata=metadata
 355            )
 356            self.log.debug(f"Creating datasets {body}")
 357            return self.client.datasets.create(request=body)
 358        except Exception as e:
 359            self.log.exception(e)
 360            raise e
 361
 362    def create_dataset_item(
 363        self,
 364        dataset_name: str,
 365        input: Optional[Any] = None,
 366        expected_output: Optional[Any] = None,
 367        metadata: Optional[Any] = None,
 368        source_trace_id: Optional[str] = None,
 369        source_observation_id: Optional[str] = None,
 370        status: Optional[DatasetStatus] = None,
 371        id: Optional[str] = None,
 372    ) -> DatasetItem:
 373        """Create a dataset item.
 374
 375        Upserts if an item with id already exists.
 376
 377        Args:
 378            dataset_name: Name of the dataset in which the dataset item should be created.
 379            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 380            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 381            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 382            source_trace_id: Id of the source trace. Defaults to None.
 383            source_observation_id: Id of the source observation. Defaults to None.
 384            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 385            id: Id of the dataset item. Defaults to None.
 386
 387        Returns:
 388            DatasetItem: The created dataset item as returned by the Langfuse API.
 389
 390        Example:
 391            ```python
 392            from langfuse import Langfuse
 393
 394            langfuse = Langfuse()
 395
 396            # Uploading items to the Langfuse dataset named "capital_cities"
 397            langfuse.create_dataset_item(
 398                dataset_name="capital_cities",
 399                input={"input": {"country": "Italy"}},
 400                expected_output={"expected_output": "Rome"},
 401                metadata={"foo": "bar"}
 402            )
 403            ```
 404        """
 405        try:
 406            body = CreateDatasetItemRequest(
 407                datasetName=dataset_name,
 408                input=input,
 409                expectedOutput=expected_output,
 410                metadata=metadata,
 411                sourceTraceId=source_trace_id,
 412                sourceObservationId=source_observation_id,
 413                status=status,
 414                id=id,
 415            )
 416            self.log.debug(f"Creating dataset item {body}")
 417            return self.client.dataset_items.create(request=body)
 418        except Exception as e:
 419            self.log.exception(e)
 420            raise e
 421
 422    def get_trace(
 423        self,
 424        id: str,
 425    ) -> TraceWithFullDetails:
 426        """Get a trace via the Langfuse API by its id.
 427
 428        Args:
 429            id: The id of the trace to fetch.
 430
 431        Returns:
 432            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 433
 434        Raises:
 435            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 436        """
 437        try:
 438            self.log.debug(f"Getting trace {id}")
 439            return self.client.trace.get(id)
 440        except Exception as e:
 441            self.log.exception(e)
 442            raise e
 443
 444    def get_observations(
 445        self,
 446        *,
 447        page: typing.Optional[int] = None,
 448        limit: typing.Optional[int] = None,
 449        name: typing.Optional[str] = None,
 450        user_id: typing.Optional[str] = None,
 451        trace_id: typing.Optional[str] = None,
 452        parent_observation_id: typing.Optional[str] = None,
 453        type: typing.Optional[str] = None,
 454    ) -> ObservationsViews:
 455        """Get a list of observations in the current project matching the given parameters.
 456
 457        Args:
 458            page (Optional[int]): Page number of the observations to return. Defaults to None.
 459            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 460            name (Optional[str]): Name of the observations to return. Defaults to None.
 461            user_id (Optional[str]): User identifier. Defaults to None.
 462            trace_id (Optional[str]): Trace identifier. Defaults to None.
 463            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 464            type (Optional[str]): Type of the observation. Defaults to None.
 465
 466        Returns:
 467            List of ObservationsViews: List of observations in the project matching the given parameters.
 468
 469        Raises:
 470            Exception: If an error occurred during the request.
 471        """
 472        try:
 473            self.log.debug(
 474                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {type}"
 475            )
 476            return self.client.observations.get_many(
 477                page=page,
 478                limit=limit,
 479                name=name,
 480                user_id=user_id,
 481                trace_id=trace_id,
 482                parent_observation_id=parent_observation_id,
 483                type=type,
 484            )
 485        except Exception as e:
 486            self.log.exception(e)
 487            raise e
 488
 489    def get_generations(
 490        self,
 491        *,
 492        page: typing.Optional[int] = None,
 493        limit: typing.Optional[int] = None,
 494        name: typing.Optional[str] = None,
 495        user_id: typing.Optional[str] = None,
 496        trace_id: typing.Optional[str] = None,
 497        parent_observation_id: typing.Optional[str] = None,
 498    ) -> ObservationsViews:
 499        """Get a list of generations in the current project matching the given parameters.
 500
 501        Args:
 502            page (Optional[int]): Page number of the generations to return. Defaults to None.
 503            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 504            name (Optional[str]): Name of the generations to return. Defaults to None.
 505            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 506            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 507            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 508
 509        Returns:
 510            List of ObservationsViews: List of generations in the project matching the given parameters.
 511
 512        Raises:
 513            Exception: If an error occurred during the request.
 514        """
 515        return self.get_observations(
 516            page=page,
 517            limit=limit,
 518            name=name,
 519            user_id=user_id,
 520            trace_id=trace_id,
 521            parent_observation_id=parent_observation_id,
 522            type="GENERATION",
 523        )
 524
 525    def get_observation(
 526        self,
 527        id: str,
 528    ) -> Observation:
 529        """Get an observation in the current project with the given identifier.
 530
 531        Args:
 532            id: The identifier of the observation to fetch.
 533
 534        Raises:
 535            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 536        """
 537        try:
 538            self.log.debug(f"Getting observation {id}")
 539            return self.client.observations.get(id)
 540        except Exception as e:
 541            self.log.exception(e)
 542            raise e
 543
 544    @overload
 545    def get_prompt(
 546        self,
 547        name: str,
 548        version: Optional[int] = None,
 549        *,
 550        label: Optional[str] = None,
 551        type: Literal["chat"],
 552        cache_ttl_seconds: Optional[int] = None,
 553    ) -> ChatPromptClient: ...
 554
 555    @overload
 556    def get_prompt(
 557        self,
 558        name: str,
 559        version: Optional[int] = None,
 560        *,
 561        label: Optional[str] = None,
 562        type: Literal["text"] = "text",
 563        cache_ttl_seconds: Optional[int] = None,
 564    ) -> TextPromptClient: ...
 565
 566    def get_prompt(
 567        self,
 568        name: str,
 569        version: Optional[int] = None,
 570        *,
 571        label: Optional[str] = None,
 572        type: Literal["chat", "text"] = "text",
 573        cache_ttl_seconds: Optional[int] = None,
 574    ) -> PromptClient:
 575        """Get a prompt.
 576
 577        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 578        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 579        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 580        return the expired prompt as a fallback.
 581
 582        Args:
 583            name (str): The name of the prompt to retrieve.
 584
 585        Keyword Args:
 586            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 587            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 588            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 589            keyword argument. If not set, defaults to 60 seconds.
 590            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 591
 592        Returns:
 593            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
 594            - TextPromptClient, if type argument is 'text'.
 595            - ChatPromptClient, if type argument is 'chat'.
 596
 597        Raises:
 598            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
 599            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
 600        """
 601        if version is not None and label is not None:
 602            raise ValueError("Cannot specify both version and label at the same time.")
 603
 604        if not name:
 605            raise ValueError("Prompt name cannot be empty.")
 606
 607        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
 608
 609        self.log.debug(f"Getting prompt '{cache_key}'")
 610        cached_prompt = self.prompt_cache.get(cache_key)
 611
 612        if cached_prompt is None:
 613            return self._fetch_prompt_and_update_cache(
 614                name, version=version, label=label, ttl_seconds=cache_ttl_seconds
 615            )
 616
 617        if cached_prompt.is_expired():
 618            try:
 619                return self._fetch_prompt_and_update_cache(
 620                    name,
 621                    version=version,
 622                    label=label,
 623                    ttl_seconds=cache_ttl_seconds,
 624                )
 625
 626            except Exception as e:
 627                self.log.warn(
 628                    f"Returning expired prompt cache for '{cache_key}' due to fetch error: {e}"
 629                )
 630
 631                return cached_prompt.value
 632
 633        return cached_prompt.value
 634
 635    def _fetch_prompt_and_update_cache(
 636        self,
 637        name: str,
 638        *,
 639        version: Optional[int] = None,
 640        label: Optional[str] = None,
 641        ttl_seconds: Optional[int] = None,
 642    ) -> PromptClient:
 643        try:
 644            cache_key = PromptCache.generate_cache_key(
 645                name, version=version, label=label
 646            )
 647
 648            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
 649            promptResponse = self.client.prompts.get(
 650                self._url_encode(name), version=version, label=label
 651            )
 652
 653            if promptResponse.type == "chat":
 654                prompt = ChatPromptClient(promptResponse)
 655            else:
 656                prompt = TextPromptClient(promptResponse)
 657
 658            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
 659
 660            return prompt
 661
 662        except Exception as e:
 663            self.log.exception(f"Error while fetching prompt '{cache_key}': {e}")
 664            raise e
 665
 666    @overload
 667    def create_prompt(
 668        self,
 669        *,
 670        name: str,
 671        prompt: List[ChatMessageDict],
 672        is_active: Optional[bool] = None,  # deprecated
 673        labels: List[str] = [],
 674        tags: Optional[List[str]] = None,
 675        type: Optional[Literal["chat"]],
 676        config: Optional[Any] = None,
 677    ) -> ChatPromptClient: ...
 678
 679    @overload
 680    def create_prompt(
 681        self,
 682        *,
 683        name: str,
 684        prompt: str,
 685        is_active: Optional[bool] = None,  # deprecated
 686        labels: List[str] = [],
 687        tags: Optional[List[str]] = None,
 688        type: Optional[Literal["text"]] = "text",
 689        config: Optional[Any] = None,
 690    ) -> TextPromptClient: ...
 691
 692    def create_prompt(
 693        self,
 694        *,
 695        name: str,
 696        prompt: Union[str, List[ChatMessageDict]],
 697        is_active: Optional[bool] = None,  # deprecated
 698        labels: List[str] = [],
 699        tags: Optional[List[str]] = None,
 700        type: Optional[Literal["chat", "text"]] = "text",
 701        config: Optional[Any] = None,
 702    ) -> PromptClient:
 703        """Create a new prompt in Langfuse.
 704
 705        Keyword Args:
 706            name : The name of the prompt to be created.
 707            prompt : The content of the prompt to be created.
 708            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
 709            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
 710            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
 711            config: Additional structured data to be saved with the prompt. Defaults to None.
 712            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
 713
 714        Returns:
 715            TextPromptClient: The prompt if type argument is 'text'.
 716            ChatPromptClient: The prompt if type argument is 'chat'.
 717        """
 718        try:
 719            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
 720
 721            # Handle deprecated is_active flag
 722            if is_active:
 723                self.log.warning(
 724                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
 725                )
 726
 727                labels = labels if "production" in labels else labels + ["production"]
 728
 729            if type == "chat":
 730                if not isinstance(prompt, list):
 731                    raise ValueError(
 732                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
 733                    )
 734                request = CreatePromptRequest_Chat(
 735                    name=name,
 736                    prompt=prompt,
 737                    labels=labels,
 738                    tags=tags,
 739                    config=config or {},
 740                    type="chat",
 741                )
 742                server_prompt = self.client.prompts.create(request=request)
 743
 744                return ChatPromptClient(prompt=server_prompt)
 745
 746            if not isinstance(prompt, str):
 747                raise ValueError("For 'text' type, 'prompt' must be a string.")
 748
 749            request = CreatePromptRequest_Text(
 750                name=name,
 751                prompt=prompt,
 752                labels=labels,
 753                tags=tags,
 754                config=config or {},
 755                type="text",
 756            )
 757
 758            server_prompt = self.client.prompts.create(request=request)
 759            return TextPromptClient(prompt=server_prompt)
 760
 761        except Exception as e:
 762            self.log.exception(e)
 763            raise e
 764
 765    def _url_encode(self, url: str) -> str:
 766        return urllib.parse.quote(url)
 767
 768    def trace(
 769        self,
 770        *,
 771        id: typing.Optional[str] = None,
 772        name: typing.Optional[str] = None,
 773        user_id: typing.Optional[str] = None,
 774        session_id: typing.Optional[str] = None,
 775        version: typing.Optional[str] = None,
 776        input: typing.Optional[typing.Any] = None,
 777        output: typing.Optional[typing.Any] = None,
 778        metadata: typing.Optional[typing.Any] = None,
 779        tags: typing.Optional[typing.List[str]] = None,
 780        timestamp: typing.Optional[dt.datetime] = None,
 781        public: typing.Optional[bool] = None,
 782        **kwargs,
 783    ) -> "StatefulTraceClient":
 784        """Create a trace.
 785
 786        Args:
 787            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
 788            name: Identifier of the trace. Useful for sorting/filtering in the UI.
 789            input: The input of the trace. Can be any JSON object.
 790            output: The output of the trace. Can be any JSON object.
 791            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 792            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
 793            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 794            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 795            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 796            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
 797            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
 798            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 799            **kwargs: Additional keyword arguments that can be included in the trace.
 800
 801        Returns:
 802            StatefulTraceClient: The created trace.
 803
 804        Example:
 805            ```python
 806            from langfuse import Langfuse
 807
 808            langfuse = Langfuse()
 809
 810            trace = langfuse.trace(
 811                name="example-application",
 812                user_id="user-1234")
 813            )
 814            ```
 815        """
 816        new_id = id or str(uuid.uuid4())
 817        self.trace_id = new_id
 818        try:
 819            new_dict = {
 820                "id": new_id,
 821                "name": name,
 822                "userId": user_id,
 823                "sessionId": session_id
 824                or kwargs.get("sessionId", None),  # backward compatibility
 825                "release": self.release,
 826                "version": version,
 827                "metadata": metadata,
 828                "input": input,
 829                "output": output,
 830                "tags": tags,
 831                "timestamp": timestamp or _get_timestamp(),
 832                "public": public,
 833            }
 834            if kwargs is not None:
 835                new_dict.update(kwargs)
 836
 837            new_body = TraceBody(**new_dict)
 838
 839            self.log.debug(f"Creating trace {new_body}")
 840            event = {
 841                "id": str(uuid.uuid4()),
 842                "type": "trace-create",
 843                "body": new_body.dict(exclude_none=True),
 844            }
 845
 846            self.task_manager.add_task(
 847                event,
 848            )
 849
 850        except Exception as e:
 851            self.log.exception(e)
 852        finally:
 853            self._log_memory_usage()
 854
 855            return StatefulTraceClient(
 856                self.client, new_id, StateType.TRACE, new_id, self.task_manager
 857            )
 858
 859    def _log_memory_usage(self):
 860        try:
 861            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
 862            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
 863            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
 864
 865            if (
 866                not is_malloc_tracing_enabled
 867                or report_interval <= 0
 868                or round(time.monotonic()) % report_interval != 0
 869            ):
 870                return
 871
 872            snapshot = tracemalloc.take_snapshot().statistics("lineno")
 873
 874            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
 875            memory_usage_total_items = [f"{stat}" for stat in snapshot]
 876            memory_usage_langfuse_items = [
 877                stat for stat in memory_usage_total_items if "/langfuse/" in stat
 878            ]
 879
 880            logged_memory_usage = {
 881                "all_files": [f"{stat}" for stat in memory_usage_total_items][
 882                    :top_k_items
 883                ],
 884                "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
 885                    :top_k_items
 886                ],
 887                "total_usage": f"{total_memory_usage:.2f} MB",
 888                "langfuse_queue_length": self.task_manager._queue.qsize(),
 889            }
 890
 891            self.log.debug("Memory usage: ", logged_memory_usage)
 892
 893            event = SdkLogBody(log=logged_memory_usage)
 894            self.task_manager.add_task(
 895                {
 896                    "id": str(uuid.uuid4()),
 897                    "type": "sdk-log",
 898                    "timestamp": _get_timestamp(),
 899                    "body": event.dict(),
 900                }
 901            )
 902
 903        except Exception as e:
 904            self.log.exception(e)
 905
 906    def score(
 907        self,
 908        *,
 909        name: str,
 910        value: float,
 911        trace_id: typing.Optional[str] = None,
 912        id: typing.Optional[str] = None,
 913        comment: typing.Optional[str] = None,
 914        observation_id: typing.Optional[str] = None,
 915        **kwargs,
 916    ) -> "StatefulClient":
 917        """Create a score attached to a trace (and optionally an observation).
 918
 919        Args:
 920            name (str): Identifier of the score.
 921            value (float): The value of the score. Can be any number, often standardized to 0..1
 922            trace_id (str): The id of the trace to which the score should be attached.
 923            comment (Optional[str]): Additional context/explanation of the score.
 924            observation_id (Optional[str]): The id of the observation to which the score should be attached.
 925            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
 926            **kwargs: Additional keyword arguments to include in the score.
 927
 928        Returns:
 929            StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided).
 930
 931        Example:
 932            ```python
 933            from langfuse import Langfuse
 934
 935            langfuse = Langfuse()
 936
 937            # Create a trace
 938            trace = langfuse.trace(name="example-application")
 939
 940            # Get id of created trace
 941            trace_id = trace.id
 942
 943            # Add score to the trace
 944            trace = langfuse.score(
 945                trace_id=trace_id,
 946                name="user-explicit-feedback",
 947                value=1,
 948                comment="I like how personalized the response is"
 949            )
 950            ```
 951        """
 952        trace_id = trace_id or self.trace_id or str(uuid.uuid4())
 953        new_id = id or str(uuid.uuid4())
 954        try:
 955            new_dict = {
 956                "id": new_id,
 957                "trace_id": trace_id,
 958                "observation_id": observation_id,
 959                "name": name,
 960                "value": value,
 961                "comment": comment,
 962                **kwargs,
 963            }
 964
 965            self.log.debug(f"Creating score {new_dict}...")
 966            new_body = ScoreBody(**new_dict)
 967
 968            event = {
 969                "id": str(uuid.uuid4()),
 970                "type": "score-create",
 971                "body": new_body.dict(exclude_none=True),
 972            }
 973            self.task_manager.add_task(event)
 974
 975        except Exception as e:
 976            self.log.exception(e)
 977        finally:
 978            if observation_id is not None:
 979                return StatefulClient(
 980                    self.client,
 981                    observation_id,
 982                    StateType.OBSERVATION,
 983                    trace_id,
 984                    self.task_manager,
 985                )
 986            else:
 987                return StatefulClient(
 988                    self.client, new_id, StateType.TRACE, new_id, self.task_manager
 989                )
 990
 991    def span(
 992        self,
 993        *,
 994        id: typing.Optional[str] = None,
 995        trace_id: typing.Optional[str] = None,
 996        parent_observation_id: typing.Optional[str] = None,
 997        name: typing.Optional[str] = None,
 998        start_time: typing.Optional[dt.datetime] = None,
 999        end_time: typing.Optional[dt.datetime] = None,
1000        metadata: typing.Optional[typing.Any] = None,
1001        level: typing.Optional[SpanLevel] = None,
1002        status_message: typing.Optional[str] = None,
1003        input: typing.Optional[typing.Any] = None,
1004        output: typing.Optional[typing.Any] = None,
1005        version: typing.Optional[str] = None,
1006        **kwargs,
1007    ) -> "StatefulSpanClient":
1008        """Create a span.
1009
1010        A span represents durations of units of work in a trace.
1011        Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1012
1013        If no trace_id is provided, a new trace is created just for this span.
1014
1015        Args:
1016            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
1017            trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated.
1018            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1019            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
1020            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
1021            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
1022            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
1023            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1024            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
1025            input (Optional[dict]): The input to the span. Can be any JSON object.
1026            output (Optional[dict]): The output to the span. Can be any JSON object.
1027            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1028            **kwargs: Additional keyword arguments to include in the span.
1029
1030        Returns:
1031            StatefulSpanClient: The created span.
1032
1033        Example:
1034            ```python
1035            from langfuse import Langfuse
1036
1037            langfuse = Langfuse()
1038
1039            trace = langfuse.trace(name = "llm-feature")
1040
1041            # Create a span
1042            retrieval = langfuse.span(name = "retrieval", trace_id = trace.id)
1043
1044            # Create a nested span
1045            nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id)
1046            ```
1047        """
1048        new_span_id = id or str(uuid.uuid4())
1049        new_trace_id = trace_id or str(uuid.uuid4())
1050        self.trace_id = new_trace_id
1051        try:
1052            span_body = {
1053                "id": new_span_id,
1054                "trace_id": new_trace_id,
1055                "name": name,
1056                "start_time": start_time or _get_timestamp(),
1057                "metadata": metadata,
1058                "input": input,
1059                "output": output,
1060                "level": level,
1061                "status_message": status_message,
1062                "parent_observation_id": parent_observation_id,
1063                "version": version,
1064                "end_time": end_time,
1065                "trace": {"release": self.release},
1066                **kwargs,
1067            }
1068
1069            if trace_id is None:
1070                self._generate_trace(new_trace_id, name or new_trace_id)
1071
1072            self.log.debug(f"Creating span {span_body}...")
1073
1074            span_body = CreateSpanBody(**span_body)
1075
1076            event = {
1077                "id": str(uuid.uuid4()),
1078                "type": "span-create",
1079                "body": span_body.dict(exclude_none=True),
1080            }
1081
1082            self.log.debug(f"Creating span {event}...")
1083            self.task_manager.add_task(event)
1084
1085        except Exception as e:
1086            self.log.exception(e)
1087        finally:
1088            self._log_memory_usage()
1089
1090            return StatefulSpanClient(
1091                self.client,
1092                new_span_id,
1093                StateType.OBSERVATION,
1094                new_trace_id,
1095                self.task_manager,
1096            )
1097
1098    def event(
1099        self,
1100        *,
1101        id: typing.Optional[str] = None,
1102        trace_id: typing.Optional[str] = None,
1103        parent_observation_id: typing.Optional[str] = None,
1104        name: typing.Optional[str] = None,
1105        start_time: typing.Optional[dt.datetime] = None,
1106        metadata: typing.Optional[typing.Any] = None,
1107        input: typing.Optional[typing.Any] = None,
1108        output: typing.Optional[typing.Any] = None,
1109        level: typing.Optional[SpanLevel] = None,
1110        status_message: typing.Optional[str] = None,
1111        version: typing.Optional[str] = None,
1112        **kwargs,
1113    ) -> "StatefulSpanClient":
1114        """Create an event.
1115
1116        An event represents a discrete event in a trace.
1117        Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1118
1119        If no trace_id is provided, a new trace is created just for this event.
1120
1121        Args:
1122            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
1123            trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event.
1124            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1125            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
1126            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
1127            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
1128            input (Optional[Any]): The input to the event. Can be any JSON object.
1129            output (Optional[Any]): The output to the event. Can be any JSON object.
1130            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1131            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
1132            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
1133            **kwargs: Additional keyword arguments to include in the event.
1134
1135        Returns:
1136            StatefulSpanClient: The created event.
1137
1138        Example:
1139            ```python
1140            from langfuse import Langfuse
1141
1142            langfuse = Langfuse()
1143
1144            trace = langfuse.trace(name = "llm-feature")
1145
1146            # Create an event
1147            retrieval = langfuse.event(name = "retrieval", trace_id = trace.id)
1148            ```
1149        """
1150        event_id = id or str(uuid.uuid4())
1151        new_trace_id = trace_id or str(uuid.uuid4())
1152        self.trace_id = new_trace_id
1153        try:
1154            event_body = {
1155                "id": event_id,
1156                "trace_id": new_trace_id,
1157                "name": name,
1158                "start_time": start_time or _get_timestamp(),
1159                "metadata": metadata,
1160                "input": input,
1161                "output": output,
1162                "level": level,
1163                "status_message": status_message,
1164                "parent_observation_id": parent_observation_id,
1165                "version": version,
1166                "trace": {"release": self.release},
1167                **kwargs,
1168            }
1169
1170            if trace_id is None:
1171                self._generate_trace(new_trace_id, name or new_trace_id)
1172
1173            request = CreateEventBody(**event_body)
1174
1175            event = {
1176                "id": str(uuid.uuid4()),
1177                "type": "event-create",
1178                "body": request.dict(exclude_none=True),
1179            }
1180
1181            self.log.debug(f"Creating event {event}...")
1182            self.task_manager.add_task(event)
1183
1184        except Exception as e:
1185            self.log.exception(e)
1186        finally:
1187            return StatefulSpanClient(
1188                self.client,
1189                event_id,
1190                StateType.OBSERVATION,
1191                new_trace_id,
1192                self.task_manager,
1193            )
1194
1195    def generation(
1196        self,
1197        *,
1198        id: typing.Optional[str] = None,
1199        trace_id: typing.Optional[str] = None,
1200        parent_observation_id: typing.Optional[str] = None,
1201        name: typing.Optional[str] = None,
1202        start_time: typing.Optional[dt.datetime] = None,
1203        end_time: typing.Optional[dt.datetime] = None,
1204        completion_start_time: typing.Optional[dt.datetime] = None,
1205        metadata: typing.Optional[typing.Any] = None,
1206        level: typing.Optional[SpanLevel] = None,
1207        status_message: typing.Optional[str] = None,
1208        version: typing.Optional[str] = None,
1209        model: typing.Optional[str] = None,
1210        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1211        input: typing.Optional[typing.Any] = None,
1212        output: typing.Optional[typing.Any] = None,
1213        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1214        prompt: typing.Optional[PromptClient] = None,
1215        **kwargs,
1216    ) -> "StatefulGenerationClient":
1217        """Create a generation.
1218
1219        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
1220
1221        Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1222
1223        If no trace_id is provided, a new trace is created just for this generation.
1224
1225        Args:
1226            id (Optional[str]): The id of the generation can be set, defaults to random id.
1227            trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created
1228            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1229            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1230            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1231            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1232            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1233            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1234            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1235            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1236            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1237            model (Optional[str]): The name of the model used for the generation.
1238            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1239            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1240            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1241            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1242            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1243            **kwargs: Additional keyword arguments to include in the generation.
1244
1245        Returns:
1246            StatefulGenerationClient: The created generation.
1247
1248        Example:
1249            ```python
1250            from langfuse import Langfuse
1251
1252            langfuse = Langfuse()
1253
1254            # Create a generation in Langfuse
1255            generation = langfuse.generation(
1256                name="summary-generation",
1257                model="gpt-3.5-turbo",
1258                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
1259                input=[{"role": "system", "content": "You are a helpful assistant."},
1260                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
1261                metadata={"interface": "whatsapp"}
1262            )
1263            ```
1264        """
1265        new_trace_id = trace_id or str(uuid.uuid4())
1266        new_generation_id = id or str(uuid.uuid4())
1267        self.trace_id = new_trace_id
1268        try:
1269            generation_body = {
1270                "id": new_generation_id,
1271                "trace_id": new_trace_id,
1272                "release": self.release,
1273                "name": name,
1274                "start_time": start_time or _get_timestamp(),
1275                "metadata": metadata,
1276                "input": input,
1277                "output": output,
1278                "level": level,
1279                "status_message": status_message,
1280                "parent_observation_id": parent_observation_id,
1281                "version": version,
1282                "end_time": end_time,
1283                "completion_start_time": completion_start_time,
1284                "model": model,
1285                "model_parameters": model_parameters,
1286                "usage": _convert_usage_input(usage) if usage is not None else None,
1287                "trace": {"release": self.release},
1288                **_create_prompt_context(prompt),
1289                **kwargs,
1290            }
1291
1292            if trace_id is None:
1293                trace = {
1294                    "id": new_trace_id,
1295                    "release": self.release,
1296                    "name": name,
1297                }
1298                request = TraceBody(**trace)
1299
1300                event = {
1301                    "id": str(uuid.uuid4()),
1302                    "type": "trace-create",
1303                    "body": request.dict(exclude_none=True),
1304                }
1305
1306                self.log.debug(f"Creating trace {event}...")
1307
1308                self.task_manager.add_task(event)
1309
1310            self.log.debug(f"Creating generation max {generation_body} {usage}...")
1311            request = CreateGenerationBody(**generation_body)
1312
1313            event = {
1314                "id": str(uuid.uuid4()),
1315                "type": "generation-create",
1316                "body": request.dict(exclude_none=True),
1317            }
1318
1319            self.log.debug(f"Creating top-level generation {event} ...")
1320            self.task_manager.add_task(event)
1321
1322        except Exception as e:
1323            self.log.exception(e)
1324        finally:
1325            return StatefulGenerationClient(
1326                self.client,
1327                new_generation_id,
1328                StateType.OBSERVATION,
1329                new_trace_id,
1330                self.task_manager,
1331            )
1332
1333    def _generate_trace(self, trace_id: str, name: str):
1334        trace_dict = {
1335            "id": trace_id,
1336            "release": self.release,
1337            "name": name,
1338        }
1339
1340        trace_body = TraceBody(**trace_dict)
1341
1342        event = {
1343            "id": str(uuid.uuid4()),
1344            "type": "trace-create",
1345            "body": trace_body.dict(exclude_none=True),
1346        }
1347
1348        self.log.debug(f"Creating trace {event}...")
1349        self.task_manager.add_task(event)
1350
1351    def join(self):
1352        """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter.
1353
1354        If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes.
1355        To guarantee all messages have been delivered, you still need to call flush().
1356        """
1357        try:
1358            return self.task_manager.join()
1359        except Exception as e:
1360            self.log.exception(e)
1361
1362    def flush(self):
1363        """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down.
1364
1365        Example:
1366            ```python
1367            from langfuse import Langfuse
1368
1369            langfuse = Langfuse()
1370
1371            # Some operations with Langfuse
1372
1373            # Flushing all events to end Langfuse cleanly
1374            langfuse.flush()
1375            ```
1376        """
1377        try:
1378            return self.task_manager.flush()
1379        except Exception as e:
1380            self.log.exception(e)
1381
1382    def shutdown(self):
1383        """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated.
1384
1385        This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API.
1386        As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
1387        """
1388        try:
1389            return self.task_manager.shutdown()
1390        except Exception as e:
1391            self.log.exception(e)
1392
1393
1394class StateType(Enum):
1395    """Enum to distinguish observation and trace states.
1396
1397    Attributes:
1398        OBSERVATION (int): Observation state.
1399        TRACE (int): Trace state.
1400    """
1401
1402    OBSERVATION = 1
1403    TRACE = 0
1404
1405
1406class StatefulClient(object):
1407    """Base class for handling stateful operations in the Langfuse system.
1408
1409    This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events,
1410    associating them with either an observation or a trace based on the specified state type.
1411
1412    Attributes:
1413        client (FernLangfuse): Core interface for Langfuse API interactions.
1414        id (str): Unique identifier of the stateful client (either observation or trace).
1415        state_type (StateType): Enum indicating whether the client is an observation or a trace.
1416        trace_id (str): Id of the trace associated with the stateful client.
1417        task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1418    """
1419
1420    log = logging.getLogger("langfuse")
1421
1422    def __init__(
1423        self,
1424        client: FernLangfuse,
1425        id: str,
1426        state_type: StateType,
1427        trace_id: str,
1428        task_manager: TaskManager,
1429    ):
1430        """Initialize the StatefulClient.
1431
1432        Args:
1433            client (FernLangfuse): Core interface for Langfuse API interactions.
1434            id (str): Unique identifier of the stateful client (either observation or trace).
1435            state_type (StateType): Enum indicating whether the client is an observation or a trace.
1436            trace_id (str): Id of the trace associated with the stateful client.
1437            task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1438        """
1439        self.client = client
1440        self.trace_id = trace_id
1441        self.id = id
1442        self.state_type = state_type
1443        self.task_manager = task_manager
1444
1445    def _add_state_to_event(self, body: dict):
1446        if self.state_type == StateType.OBSERVATION:
1447            body["parent_observation_id"] = self.id
1448            body["trace_id"] = self.trace_id
1449        else:
1450            body["trace_id"] = self.id
1451        return body
1452
1453    def _add_default_values(self, body: dict):
1454        if body.get("start_time") is None:
1455            body["start_time"] = _get_timestamp()
1456        return body
1457
1458    def generation(
1459        self,
1460        *,
1461        id: typing.Optional[str] = None,
1462        name: typing.Optional[str] = None,
1463        start_time: typing.Optional[dt.datetime] = None,
1464        end_time: typing.Optional[dt.datetime] = None,
1465        metadata: typing.Optional[typing.Any] = None,
1466        level: typing.Optional[SpanLevel] = None,
1467        status_message: typing.Optional[str] = None,
1468        version: typing.Optional[str] = None,
1469        completion_start_time: typing.Optional[dt.datetime] = None,
1470        model: typing.Optional[str] = None,
1471        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1472        input: typing.Optional[typing.Any] = None,
1473        output: typing.Optional[typing.Any] = None,
1474        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1475        prompt: typing.Optional[PromptClient] = None,
1476        **kwargs,
1477    ) -> "StatefulGenerationClient":
1478        """Create a generation nested within the current observation or trace.
1479
1480        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
1481
1482        Args:
1483            id (Optional[str]): The id of the generation can be set, defaults to random id.
1484            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1485            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1486            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1487            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1488            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1489            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1490            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1491            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1492            model (Optional[str]): The name of the model used for the generation.
1493            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1494            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1495            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1496            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1497            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1498            **kwargs: Additional keyword arguments to include in the generation.
1499
1500        Returns:
1501            StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations.
1502
1503        Example:
1504            ```python
1505            from langfuse import Langfuse
1506
1507            langfuse = Langfuse()
1508
1509            # Create a trace
1510            trace = langfuse.trace(name = "llm-feature")
1511
1512            # Create a nested generation in Langfuse
1513            generation = trace.generation(
1514                name="summary-generation",
1515                model="gpt-3.5-turbo",
1516                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
1517                input=[{"role": "system", "content": "You are a helpful assistant."},
1518                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
1519                metadata={"interface": "whatsapp"}
1520            )
1521            ```
1522        """
1523        generation_id = id or str(uuid.uuid4())
1524        try:
1525            generation_body = {
1526                "id": generation_id,
1527                "name": name,
1528                "start_time": start_time or _get_timestamp(),
1529                "metadata": metadata,
1530                "level": level,
1531                "status_message": status_message,
1532                "version": version,
1533                "end_time": end_time,
1534                "completion_start_time": completion_start_time,
1535                "model": model,
1536                "model_parameters": model_parameters,
1537                "input": input,
1538                "output": output,
1539                "usage": _convert_usage_input(usage) if usage is not None else None,
1540                **_create_prompt_context(prompt),
1541                **kwargs,
1542            }
1543
1544            generation_body = self._add_state_to_event(generation_body)
1545            new_body = self._add_default_values(generation_body)
1546
1547            new_body = CreateGenerationBody(**new_body)
1548
1549            event = {
1550                "id": str(uuid.uuid4()),
1551                "type": "generation-create",
1552                "body": new_body.dict(exclude_none=True, exclude_unset=False),
1553            }
1554
1555            self.log.debug(f"Creating generation {new_body}...")
1556            self.task_manager.add_task(event)
1557
1558        except Exception as e:
1559            self.log.exception(e)
1560        finally:
1561            return StatefulGenerationClient(
1562                self.client,
1563                generation_id,
1564                StateType.OBSERVATION,
1565                self.trace_id,
1566                task_manager=self.task_manager,
1567            )
1568
1569    def span(
1570        self,
1571        *,
1572        id: typing.Optional[str] = None,
1573        name: typing.Optional[str] = None,
1574        start_time: typing.Optional[dt.datetime] = None,
1575        end_time: typing.Optional[dt.datetime] = None,
1576        metadata: typing.Optional[typing.Any] = None,
1577        input: typing.Optional[typing.Any] = None,
1578        output: typing.Optional[typing.Any] = None,
1579        level: typing.Optional[SpanLevel] = None,
1580        status_message: typing.Optional[str] = None,
1581        version: typing.Optional[str] = None,
1582        **kwargs,
1583    ) -> "StatefulSpanClient":
1584        """Create a span nested within the current observation or trace.
1585
1586        A span represents durations of units of work in a trace.
1587
1588        Args:
1589            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
1590            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
1591            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
1592            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
1593            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
1594            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1595            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
1596            input (Optional[dict]): The input to the span. Can be any JSON object.
1597            output (Optional[dict]): The output to the span. Can be any JSON object.
1598            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1599            **kwargs: Additional keyword arguments to include in the span.
1600
1601        Returns:
1602            StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations.
1603
1604        Example:
1605            ```python
1606            from langfuse import Langfuse
1607
1608            langfuse = Langfuse()
1609
1610            # Create a trace
1611            trace = langfuse.trace(name = "llm-feature")
1612
1613            # Create a span
1614            retrieval = langfuse.span(name = "retrieval")
1615            ```
1616        """
1617        span_id = id or str(uuid.uuid4())
1618        try:
1619            span_body = {
1620                "id": span_id,
1621                "name": name,
1622                "start_time": start_time or _get_timestamp(),
1623                "metadata": metadata,
1624                "input": input,
1625                "output": output,
1626                "level": level,
1627                "status_message": status_message,
1628                "version": version,
1629                "end_time": end_time,
1630                **kwargs,
1631            }
1632
1633            self.log.debug(f"Creating span {span_body}...")
1634
1635            new_dict = self._add_state_to_event(span_body)
1636            new_body = self._add_default_values(new_dict)
1637
1638            event = CreateSpanBody(**new_body)
1639
1640            event = {
1641                "id": str(uuid.uuid4()),
1642                "type": "span-create",
1643                "body": event.dict(exclude_none=True),
1644            }
1645
1646            self.task_manager.add_task(event)
1647        except Exception as e:
1648            self.log.exception(e)
1649        finally:
1650            return StatefulSpanClient(
1651                self.client,
1652                span_id,
1653                StateType.OBSERVATION,
1654                self.trace_id,
1655                task_manager=self.task_manager,
1656            )
1657
1658    def score(
1659        self,
1660        *,
1661        id: typing.Optional[str] = None,
1662        name: str,
1663        value: float,
1664        comment: typing.Optional[str] = None,
1665        **kwargs,
1666    ) -> "StatefulClient":
1667        """Create a score attached for the current observation or trace.
1668
1669        Args:
1670            name (str): Identifier of the score.
1671            value (float): The value of the score. Can be any number, often standardized to 0..1
1672            comment (Optional[str]): Additional context/explanation of the score.
1673            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
1674            **kwargs: Additional keyword arguments to include in the score.
1675
1676        Returns:
1677            StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining.
1678
1679        Example:
1680            ```python
1681            from langfuse import Langfuse
1682
1683            langfuse = Langfuse()
1684
1685            # Create a trace
1686            trace = langfuse.trace(name="example-application")
1687
1688            # Add score to the trace
1689            trace = trace.score(
1690                name="user-explicit-feedback",
1691                value=1,
1692                comment="I like how personalized the response is"
1693            )
1694            ```
1695        """
1696        score_id = id or str(uuid.uuid4())
1697        try:
1698            new_score = {
1699                "id": score_id,
1700                "trace_id": self.trace_id,
1701                "name": name,
1702                "value": value,
1703                "comment": comment,
1704                **kwargs,
1705            }
1706
1707            self.log.debug(f"Creating score {new_score}...")
1708
1709            new_dict = self._add_state_to_event(new_score)
1710
1711            if self.state_type == StateType.OBSERVATION:
1712                new_dict["observationId"] = self.id
1713
1714            request = ScoreBody(**new_dict)
1715
1716            event = {
1717                "id": str(uuid.uuid4()),
1718                "type": "score-create",
1719                "body": request.dict(exclude_none=True),
1720            }
1721
1722            self.task_manager.add_task(event)
1723
1724        except Exception as e:
1725            self.log.exception(e)
1726        finally:
1727            return StatefulClient(
1728                self.client,
1729                self.id,
1730                StateType.OBSERVATION,
1731                self.trace_id,
1732                task_manager=self.task_manager,
1733            )
1734
1735    def event(
1736        self,
1737        *,
1738        id: typing.Optional[str] = None,
1739        name: typing.Optional[str] = None,
1740        start_time: typing.Optional[dt.datetime] = None,
1741        metadata: typing.Optional[typing.Any] = None,
1742        input: typing.Optional[typing.Any] = None,
1743        output: typing.Optional[typing.Any] = None,
1744        level: typing.Optional[SpanLevel] = None,
1745        status_message: typing.Optional[str] = None,
1746        version: typing.Optional[str] = None,
1747        **kwargs,
1748    ) -> "StatefulClient":
1749        """Create an event nested within the current observation or trace.
1750
1751        An event represents a discrete event in a trace.
1752
1753        Args:
1754            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
1755            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
1756            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
1757            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
1758            input (Optional[Any]): The input to the event. Can be any JSON object.
1759            output (Optional[Any]): The output to the event. Can be any JSON object.
1760            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1761            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
1762            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
1763            **kwargs: Additional keyword arguments to include in the event.
1764
1765        Returns:
1766            StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations.
1767
1768        Example:
1769            ```python
1770            from langfuse import Langfuse
1771
1772            langfuse = Langfuse()
1773
1774            # Create a trace
1775            trace = langfuse.trace(name = "llm-feature")
1776
1777            # Create an event
1778            retrieval = trace.event(name = "retrieval")
1779            ```
1780        """
1781        event_id = id or str(uuid.uuid4())
1782        try:
1783            event_body = {
1784                "id": event_id,
1785                "name": name,
1786                "start_time": start_time or _get_timestamp(),
1787                "metadata": metadata,
1788                "input": input,
1789                "output": output,
1790                "level": level,
1791                "status_message": status_message,
1792                "version": version,
1793                **kwargs,
1794            }
1795
1796            new_dict = self._add_state_to_event(event_body)
1797            new_body = self._add_default_values(new_dict)
1798
1799            request = CreateEventBody(**new_body)
1800
1801            event = {
1802                "id": str(uuid.uuid4()),
1803                "type": "event-create",
1804                "body": request.dict(exclude_none=True),
1805            }
1806
1807            self.log.debug(f"Creating event {event}...")
1808            self.task_manager.add_task(event)
1809
1810        except Exception as e:
1811            self.log.exception(e)
1812        finally:
1813            return StatefulClient(
1814                self.client, event_id, self.state_type, self.trace_id, self.task_manager
1815            )
1816
1817    def get_trace_url(self):
1818        """Get the URL to see the current trace in the Langfuse UI."""
1819        return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}"
1820
1821
1822class StatefulGenerationClient(StatefulClient):
1823    """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient.
1824
1825    This client extends the capabilities of the StatefulClient to specifically handle generation,
1826    allowing for the creation, update, and termination of generation processes in Langfuse.
1827
1828    Attributes:
1829        client (FernLangfuse): Core interface for Langfuse API interaction.
1830        id (str): Unique identifier of the generation.
1831        state_type (StateType): Type of the stateful entity (observation or trace).
1832        trace_id (str): Id of trace associated with the generation.
1833        task_manager (TaskManager): Manager for handling asynchronous tasks.
1834    """
1835
1836    log = logging.getLogger("langfuse")
1837
1838    def __init__(
1839        self,
1840        client: FernLangfuse,
1841        id: str,
1842        state_type: StateType,
1843        trace_id: str,
1844        task_manager: TaskManager,
1845    ):
1846        """Initialize the StatefulGenerationClient."""
1847        super().__init__(client, id, state_type, trace_id, task_manager)
1848
1849    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
1850    def update(
1851        self,
1852        *,
1853        name: typing.Optional[str] = None,
1854        start_time: typing.Optional[dt.datetime] = None,
1855        end_time: typing.Optional[dt.datetime] = None,
1856        completion_start_time: typing.Optional[dt.datetime] = None,
1857        metadata: typing.Optional[typing.Any] = None,
1858        level: typing.Optional[SpanLevel] = None,
1859        status_message: typing.Optional[str] = None,
1860        version: typing.Optional[str] = None,
1861        model: typing.Optional[str] = None,
1862        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1863        input: typing.Optional[typing.Any] = None,
1864        output: typing.Optional[typing.Any] = None,
1865        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1866        prompt: typing.Optional[PromptClient] = None,
1867        **kwargs,
1868    ) -> "StatefulGenerationClient":
1869        """Update the generation.
1870
1871        Args:
1872            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1873            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1874            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1875            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1876            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1877            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1878            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1879            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1880            model (Optional[str]): The name of the model used for the generation.
1881            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1882            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1883            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1884            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1885            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1886            **kwargs: Additional keyword arguments to include in the generation.
1887
1888        Returns:
1889            StatefulGenerationClient: The updated generation. Passthrough for chaining.
1890
1891        Example:
1892            ```python
1893            from langfuse import Langfuse
1894
1895            langfuse = Langfuse()
1896
1897            # Create a trace
1898            trace = langfuse.trace(name = "llm-feature")
1899
1900            # Create a nested generation in Langfuse
1901            generation = trace.generation(name="summary-generation")
1902
1903            # Update the generation
1904            generation = generation.update(metadata={"interface": "whatsapp"})
1905            ```
1906        """
1907        try:
1908            generation_body = {
1909                "id": self.id,
1910                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
1911                "name": name,
1912                "start_time": start_time,
1913                "metadata": metadata,
1914                "level": level,
1915                "status_message": status_message,
1916                "version": version,
1917                "end_time": end_time,
1918                "completion_start_time": completion_start_time,
1919                "model": model,
1920                "model_parameters": model_parameters,
1921                "input": input,
1922                "output": output,
1923                "usage": _convert_usage_input(usage) if usage is not None else None,
1924                **_create_prompt_context(prompt),
1925                **kwargs,
1926            }
1927
1928            self.log.debug(f"Update generation {generation_body}...")
1929
1930            request = UpdateGenerationBody(**generation_body)
1931
1932            event = {
1933                "id": str(uuid.uuid4()),
1934                "type": "generation-update",
1935                "body": request.dict(exclude_none=True, exclude_unset=False),
1936            }
1937
1938            self.log.debug(f"Update generation {event}...")
1939            self.task_manager.add_task(event)
1940
1941        except Exception as e:
1942            self.log.exception(e)
1943        finally:
1944            return StatefulGenerationClient(
1945                self.client,
1946                self.id,
1947                StateType.OBSERVATION,
1948                self.trace_id,
1949                task_manager=self.task_manager,
1950            )
1951
1952    def end(
1953        self,
1954        *,
1955        name: typing.Optional[str] = None,
1956        start_time: typing.Optional[dt.datetime] = None,
1957        end_time: typing.Optional[dt.datetime] = None,
1958        completion_start_time: typing.Optional[dt.datetime] = None,
1959        metadata: typing.Optional[typing.Any] = None,
1960        level: typing.Optional[SpanLevel] = None,
1961        status_message: typing.Optional[str] = None,
1962        version: typing.Optional[str] = None,
1963        model: typing.Optional[str] = None,
1964        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1965        input: typing.Optional[typing.Any] = None,
1966        output: typing.Optional[typing.Any] = None,
1967        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1968        prompt: typing.Optional[PromptClient] = None,
1969        **kwargs,
1970    ) -> "StatefulGenerationClient":
1971        """End the generation, optionally updating its properties.
1972
1973        Args:
1974            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1975            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1976            end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time.
1977            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1978            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1979            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1980            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1981            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1982            model (Optional[str]): The name of the model used for the generation.
1983            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1984            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1985            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1986            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1987            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1988            **kwargs: Additional keyword arguments to include in the generation.
1989
1990        Returns:
1991            StatefulGenerationClient: The ended generation. Passthrough for chaining.
1992
1993        Example:
1994            ```python
1995            from langfuse import Langfuse
1996
1997            langfuse = Langfuse()
1998
1999            # Create a trace
2000            trace = langfuse.trace(name = "llm-feature")
2001
2002            # Create a nested generation in Langfuse
2003            generation = trace.generation(name="summary-generation")
2004
2005            # End the generation and update its properties
2006            generation = generation.end(metadata={"interface": "whatsapp"})
2007            ```
2008        """
2009        return self.update(
2010            name=name,
2011            start_time=start_time,
2012            end_time=end_time or _get_timestamp(),
2013            metadata=metadata,
2014            level=level,
2015            status_message=status_message,
2016            version=version,
2017            completion_start_time=completion_start_time,
2018            model=model,
2019            model_parameters=model_parameters,
2020            input=input,
2021            output=output,
2022            usage=usage,
2023            prompt=prompt,
2024            **kwargs,
2025        )
2026
2027
2028class StatefulSpanClient(StatefulClient):
2029    """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient.
2030
2031    Attributes:
2032        client (FernLangfuse): Core interface for Langfuse API interaction.
2033        id (str): Unique identifier of the span.
2034        state_type (StateType): Type of the stateful entity (observation or trace).
2035        trace_id (str): Id of trace associated with the span.
2036        task_manager (TaskManager): Manager for handling asynchronous tasks.
2037    """
2038
2039    log = logging.getLogger("langfuse")
2040
2041    def __init__(
2042        self,
2043        client: FernLangfuse,
2044        id: str,
2045        state_type: StateType,
2046        trace_id: str,
2047        task_manager: TaskManager,
2048    ):
2049        """Initialize the StatefulSpanClient."""
2050        super().__init__(client, id, state_type, trace_id, task_manager)
2051
2052    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
2053    def update(
2054        self,
2055        *,
2056        name: typing.Optional[str] = None,
2057        start_time: typing.Optional[dt.datetime] = None,
2058        end_time: typing.Optional[dt.datetime] = None,
2059        metadata: typing.Optional[typing.Any] = None,
2060        input: typing.Optional[typing.Any] = None,
2061        output: typing.Optional[typing.Any] = None,
2062        level: typing.Optional[SpanLevel] = None,
2063        status_message: typing.Optional[str] = None,
2064        version: typing.Optional[str] = None,
2065        **kwargs,
2066    ) -> "StatefulSpanClient":
2067        """Update the span.
2068
2069        Args:
2070            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2071            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2072            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2073            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2074            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2075            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2076            input (Optional[dict]): The input to the span. Can be any JSON object.
2077            output (Optional[dict]): The output to the span. Can be any JSON object.
2078            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2079            **kwargs: Additional keyword arguments to include in the span.
2080
2081        Returns:
2082            StatefulSpanClient: The updated span. Passthrough for chaining.
2083
2084        Example:
2085            ```python
2086            from langfuse import Langfuse
2087
2088            langfuse = Langfuse()
2089
2090            # Create a trace
2091            trace = langfuse.trace(name = "llm-feature")
2092
2093            # Create a nested span in Langfuse
2094            span = trace.span(name="retrieval")
2095
2096            # Update the span
2097            span = span.update(metadata={"interface": "whatsapp"})
2098            ```
2099        """
2100        try:
2101            span_body = {
2102                "id": self.id,
2103                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
2104                "name": name,
2105                "start_time": start_time,
2106                "metadata": metadata,
2107                "input": input,
2108                "output": output,
2109                "level": level,
2110                "status_message": status_message,
2111                "version": version,
2112                "end_time": end_time,
2113                **kwargs,
2114            }
2115            self.log.debug(f"Update span {span_body}...")
2116
2117            request = UpdateSpanBody(**span_body)
2118
2119            event = {
2120                "id": str(uuid.uuid4()),
2121                "type": "span-update",
2122                "body": request.dict(exclude_none=True),
2123            }
2124
2125            self.task_manager.add_task(event)
2126        except Exception as e:
2127            self.log.exception(e)
2128        finally:
2129            return StatefulSpanClient(
2130                self.client,
2131                self.id,
2132                StateType.OBSERVATION,
2133                self.trace_id,
2134                task_manager=self.task_manager,
2135            )
2136
2137    def end(
2138        self,
2139        *,
2140        name: typing.Optional[str] = None,
2141        start_time: typing.Optional[dt.datetime] = None,
2142        end_time: typing.Optional[dt.datetime] = None,
2143        metadata: typing.Optional[typing.Any] = None,
2144        input: typing.Optional[typing.Any] = None,
2145        output: typing.Optional[typing.Any] = None,
2146        level: typing.Optional[SpanLevel] = None,
2147        status_message: typing.Optional[str] = None,
2148        version: typing.Optional[str] = None,
2149        **kwargs,
2150    ) -> "StatefulSpanClient":
2151        """End the span, optionally updating its properties.
2152
2153        Args:
2154            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2155            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2156            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2157            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2158            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2159            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2160            input (Optional[dict]): The input to the span. Can be any JSON object.
2161            output (Optional[dict]): The output to the span. Can be any JSON object.
2162            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2163            **kwargs: Additional keyword arguments to include in the span.
2164
2165        Returns:
2166            StatefulSpanClient: The updated span. Passthrough for chaining.
2167
2168        Example:
2169            ```python
2170            from langfuse import Langfuse
2171
2172            langfuse = Langfuse()
2173
2174            # Create a trace
2175            trace = langfuse.trace(name = "llm-feature")
2176
2177            # Create a nested span in Langfuse
2178            span = trace.span(name="retrieval")
2179
2180            # End the span and update its properties
2181            span = span.end(metadata={"interface": "whatsapp"})
2182            ```
2183        """
2184        try:
2185            span_body = {
2186                "name": name,
2187                "start_time": start_time,
2188                "metadata": metadata,
2189                "input": input,
2190                "output": output,
2191                "level": level,
2192                "status_message": status_message,
2193                "version": version,
2194                "end_time": end_time or _get_timestamp(),
2195                **kwargs,
2196            }
2197            return self.update(**span_body)
2198
2199        except Exception as e:
2200            self.log.warning(e)
2201        finally:
2202            return StatefulSpanClient(
2203                self.client,
2204                self.id,
2205                StateType.OBSERVATION,
2206                self.trace_id,
2207                task_manager=self.task_manager,
2208            )
2209
2210    def get_langchain_handler(self, update_parent: bool = False):
2211        """Get langchain callback handler associated with the current span.
2212
2213        Args:
2214            update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run.
2215
2216        Returns:
2217            CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient.
2218        """
2219        from langfuse.callback import CallbackHandler
2220
2221        return CallbackHandler(
2222            stateful_client=self, update_stateful_client=update_parent
2223        )
2224
2225
2226class StatefulTraceClient(StatefulClient):
2227    """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient.
2228
2229    Attributes:
2230        client (FernLangfuse): Core interface for Langfuse API interaction.
2231        id (str): Unique identifier of the trace.
2232        state_type (StateType): Type of the stateful entity (observation or trace).
2233        trace_id (str): The trace ID associated with this client.
2234        task_manager (TaskManager): Manager for handling asynchronous tasks.
2235    """
2236
2237    log = logging.getLogger("langfuse")
2238
2239    def __init__(
2240        self,
2241        client: FernLangfuse,
2242        id: str,
2243        state_type: StateType,
2244        trace_id: str,
2245        task_manager: TaskManager,
2246    ):
2247        """Initialize the StatefulTraceClient."""
2248        super().__init__(client, id, state_type, trace_id, task_manager)
2249        self.task_manager = task_manager
2250
2251    def update(
2252        self,
2253        *,
2254        name: typing.Optional[str] = None,
2255        user_id: typing.Optional[str] = None,
2256        session_id: typing.Optional[str] = None,
2257        version: typing.Optional[str] = None,
2258        release: typing.Optional[str] = None,
2259        input: typing.Optional[typing.Any] = None,
2260        output: typing.Optional[typing.Any] = None,
2261        metadata: typing.Optional[typing.Any] = None,
2262        tags: typing.Optional[typing.List[str]] = None,
2263        public: typing.Optional[bool] = None,
2264        **kwargs,
2265    ) -> "StatefulTraceClient":
2266        """Update the trace.
2267
2268        Args:
2269            name: Identifier of the trace. Useful for sorting/filtering in the UI.
2270            input: The input of the trace. Can be any JSON object.
2271            output: The output of the trace. Can be any JSON object.
2272            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
2273            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
2274            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
2275            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
2276            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
2277            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
2278            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
2279            **kwargs: Additional keyword arguments that can be included in the trace.
2280
2281        Returns:
2282            StatefulTraceClient: The updated trace. Passthrough for chaining.
2283
2284        Example:
2285            ```python
2286            from langfuse import Langfuse
2287
2288            langfuse = Langfuse()
2289
2290            # Create a trace
2291            trace = langfuse.trace(
2292                name="example-application",
2293                user_id="user-1234")
2294            )
2295
2296            # Update the trace
2297            trace = trace.update(
2298                output={"result": "success"},
2299                metadata={"interface": "whatsapp"}
2300            )
2301            ```
2302        """
2303        try:
2304            trace_body = {
2305                "id": self.id,
2306                "name": name,
2307                "userId": user_id,
2308                "sessionId": session_id
2309                or kwargs.get("sessionId", None),  # backward compatibility
2310                "version": version,
2311                "release": release,
2312                "input": input,
2313                "output": output,
2314                "metadata": metadata,
2315                "public": public,
2316                "tags": tags,
2317                **kwargs,
2318            }
2319            self.log.debug(f"Update trace {trace_body}...")
2320
2321            request = TraceBody(**trace_body)
2322
2323            event = {
2324                "id": str(uuid.uuid4()),
2325                "type": "trace-create",
2326                "body": request.dict(exclude_none=True),
2327            }
2328
2329            self.task_manager.add_task(event)
2330
2331        except Exception as e:
2332            self.log.exception(e)
2333        finally:
2334            return StatefulTraceClient(
2335                self.client,
2336                self.id,
2337                StateType.TRACE,
2338                self.trace_id,
2339                task_manager=self.task_manager,
2340            )
2341
2342    def get_langchain_handler(self, update_parent: bool = False):
2343        """Get langchain callback handler associated with the current trace.
2344
2345        This method creates and returns a CallbackHandler instance, linking it with the current
2346        trace. Use this if you want to group multiple Langchain runs within a single trace.
2347
2348        Args:
2349            update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run.
2350
2351        Raises:
2352            ImportError: If the 'langchain' module is not installed, indicating missing functionality.
2353
2354        Returns:
2355            CallbackHandler: Langchain callback handler linked to the current trace.
2356
2357        Example:
2358            ```python
2359            from langfuse import Langfuse
2360
2361            langfuse = Langfuse()
2362
2363            # Create a trace
2364            trace = langfuse.trace(name = "llm-feature")
2365
2366            # Get a langchain callback handler
2367            handler = trace.get_langchain_handler()
2368            ```
2369        """
2370        try:
2371            from langfuse.callback import CallbackHandler
2372
2373            self.log.debug(f"Creating new handler for trace {self.id}")
2374
2375            return CallbackHandler(
2376                stateful_client=self,
2377                debug=self.log.level == logging.DEBUG,
2378                update_stateful_client=update_parent,
2379            )
2380        except Exception as e:
2381            self.log.exception(e)
2382
2383    def getNewHandler(self):
2384        """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated."""
2385        return self.get_langchain_handler()
2386
2387
2388class DatasetItemClient:
2389    """Class for managing dataset items in Langfuse.
2390
2391    Args:
2392        id (str): Unique identifier of the dataset item.
2393        status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
2394        input (Any): Input data of the dataset item.
2395        expected_output (Optional[Any]): Expected output of the dataset item.
2396        metadata (Optional[Any]): Additional metadata of the dataset item.
2397        source_trace_id (Optional[str]): Identifier of the source trace.
2398        source_observation_id (Optional[str]): Identifier of the source observation.
2399        dataset_id (str): Identifier of the dataset to which this item belongs.
2400        dataset_name (str): Name of the dataset to which this item belongs.
2401        created_at (datetime): Timestamp of dataset item creation.
2402        updated_at (datetime): Timestamp of the last update to the dataset item.
2403        langfuse (Langfuse): Instance of Langfuse client for API interactions.
2404
2405    Example:
2406        ```python
2407        from langfuse import Langfuse
2408
2409        langfuse = Langfuse()
2410
2411        dataset = langfuse.get_dataset("<dataset_name>")
2412
2413        for item in dataset.items:
2414            # Generate a completion using the input of every item
2415            completion, generation = llm_app.run(item.input)
2416
2417            # Evaluate the completion
2418            generation.score(
2419                name="example-score",
2420                value=1
2421            )
2422        ```
2423    """
2424
2425    log = logging.getLogger("langfuse")
2426
2427    id: str
2428    status: DatasetStatus
2429    input: typing.Any
2430    expected_output: typing.Optional[typing.Any]
2431    metadata: Optional[Any]
2432    source_trace_id: typing.Optional[str]
2433    source_observation_id: typing.Optional[str]
2434    dataset_id: str
2435    dataset_name: str
2436    created_at: dt.datetime
2437    updated_at: dt.datetime
2438
2439    langfuse: Langfuse
2440
2441    def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse):
2442        """Initialize the DatasetItemClient."""
2443        self.id = dataset_item.id
2444        self.status = dataset_item.status
2445        self.input = dataset_item.input
2446        self.expected_output = dataset_item.expected_output
2447        self.metadata = dataset_item.metadata
2448        self.source_trace_id = dataset_item.source_trace_id
2449        self.source_observation_id = dataset_item.source_observation_id
2450        self.dataset_id = dataset_item.dataset_id
2451        self.dataset_name = dataset_item.dataset_name
2452        self.created_at = dataset_item.created_at
2453        self.updated_at = dataset_item.updated_at
2454
2455        self.langfuse = langfuse
2456
2457    def flush(self, observation: StatefulClient, run_name: str):
2458        """Flushes an observations task manager's queue.
2459
2460        Used before creating a dataset run item to ensure all events are persistent.
2461
2462        Args:
2463            observation (StatefulClient): The observation or trace client associated with the dataset item.
2464            run_name (str): The name of the dataset run.
2465        """
2466        observation.task_manager.flush()
2467
2468    def link(
2469        self,
2470        trace_or_observation: typing.Union[StatefulClient, str, None],
2471        run_name: str,
2472        run_metadata: Optional[Any] = None,
2473        run_description: Optional[str] = None,
2474        trace_id: Optional[str] = None,
2475        observation_id: Optional[str] = None,
2476    ):
2477        """Link the dataset item to observation within a specific dataset run. Creates a dataset run item.
2478
2479        Args:
2480            trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID.
2481            run_name (str): The name of the dataset run.
2482            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
2483            run_description (Optional[str]): Description of the dataset run.
2484            trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided.
2485            observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided.
2486        """
2487        parsed_trace_id: str = None
2488        parsed_observation_id: str = None
2489
2490        if isinstance(trace_or_observation, StatefulClient):
2491            # flush the queue before creating the dataset run item
2492            # to ensure that all events are persisted.
2493            if trace_or_observation.state_type == StateType.TRACE:
2494                parsed_trace_id = trace_or_observation.trace_id
2495            elif trace_or_observation.state_type == StateType.OBSERVATION:
2496                parsed_observation_id = trace_or_observation.id
2497                parsed_trace_id = trace_or_observation.trace_id
2498        # legacy support for observation_id
2499        elif isinstance(trace_or_observation, str):
2500            parsed_observation_id = trace_or_observation
2501        elif trace_or_observation is None:
2502            if trace_id is not None:
2503                parsed_trace_id = trace_id
2504                if observation_id is not None:
2505                    parsed_observation_id = observation_id
2506            else:
2507                raise ValueError(
2508                    "trace_id must be provided if trace_or_observation is None"
2509                )
2510        else:
2511            raise ValueError(
2512                "trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item"
2513            )
2514
2515        self.log.debug(
2516            f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}"
2517        )
2518        self.langfuse.client.dataset_run_items.create(
2519            request=CreateDatasetRunItemRequest(
2520                runName=run_name,
2521                datasetItemId=self.id,
2522                traceId=parsed_trace_id,
2523                observationId=parsed_observation_id,
2524                metadata=run_metadata,
2525                runDescription=run_description,
2526            )
2527        )
2528
2529    def get_langchain_handler(
2530        self,
2531        *,
2532        run_name: str,
2533        run_description: Optional[str] = None,
2534        run_metadata: Optional[Any] = None,
2535    ):
2536        """Create and get a langchain callback handler linked to this dataset item.
2537
2538        Args:
2539            run_name (str): The name of the dataset run to be used in the callback handler.
2540            run_description (Optional[str]): Description of the dataset run.
2541            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
2542
2543        Returns:
2544            CallbackHandler: An instance of CallbackHandler linked to the dataset item.
2545        """
2546        metadata = {
2547            "dataset_item_id": self.id,
2548            "run_name": run_name,
2549            "dataset_id": self.dataset_id,
2550        }
2551        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
2552
2553        self.link(
2554            trace, run_name, run_metadata=run_metadata, run_description=run_description
2555        )
2556
2557        return trace.get_langchain_handler(update_parent=True)
2558
2559    @contextmanager
2560    def observe(
2561        self,
2562        *,
2563        run_name: str,
2564        run_description: Optional[str] = None,
2565        run_metadata: Optional[Any] = None,
2566        trace_id: Optional[str] = None,
2567    ):
2568        """Observes a dataset run within the Langfuse client.
2569
2570        Args:
2571            run_name (str): The name of the dataset run.
2572            root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created.
2573            run_description (Optional[str]): The description of the dataset run.
2574            run_metadata (Optional[Any]): Additional metadata for the dataset run.
2575
2576        Yields:
2577            StatefulTraceClient: The trace associated with the dataset run.
2578        """
2579        from langfuse.decorators import langfuse_context
2580
2581        root_trace_id = trace_id or str(uuid.uuid4())
2582
2583        langfuse_context._set_root_trace_id(root_trace_id)
2584
2585        try:
2586            yield root_trace_id
2587
2588        finally:
2589            self.link(
2590                run_name=run_name,
2591                run_metadata=run_metadata,
2592                run_description=run_description,
2593                trace_or_observation=None,
2594                trace_id=root_trace_id,
2595            )
2596
2597    @contextmanager
2598    def observe_llama_index(
2599        self,
2600        *,
2601        run_name: str,
2602        run_description: Optional[str] = None,
2603        run_metadata: Optional[Any] = None,
2604        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
2605    ):
2606        """Context manager for observing LlamaIndex operations linked to this dataset item.
2607
2608        This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging
2609        and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all
2610        operations performed within the context are linked to the appropriate dataset item and run in Langfuse.
2611
2612        Args:
2613            run_name (str): The name of the dataset run.
2614            run_description (Optional[str]): Description of the dataset run. Defaults to None.
2615            run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None.
2616            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass
2617                to the LlamaIndex integration constructor. Defaults to an empty dictionary.
2618
2619        Yields:
2620            LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations.
2621
2622        Example:
2623            ```python
2624            dataset_item = dataset.items[0]
2625
2626            with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler:
2627                # Perform LlamaIndex operations here
2628                some_llama_index_operation()
2629            ```
2630
2631        Raises:
2632            ImportError: If required modules for LlamaIndex integration are not available.
2633        """
2634        metadata = {
2635            "dataset_item_id": self.id,
2636            "run_name": run_name,
2637            "dataset_id": self.dataset_id,
2638        }
2639        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
2640        self.link(
2641            trace, run_name, run_metadata=run_metadata, run_description=run_description
2642        )
2643
2644        try:
2645            import llama_index.core
2646            from llama_index.core import Settings
2647            from llama_index.core.callbacks import CallbackManager
2648
2649            from langfuse.llama_index import LlamaIndexCallbackHandler
2650
2651            callback_handler = LlamaIndexCallbackHandler(
2652                **llama_index_integration_constructor_kwargs,
2653            )
2654            callback_handler.set_root(trace, update_root=True)
2655
2656            # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler
2657            # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it
2658            prev_global_handler = llama_index.core.global_handler
2659            prev_langfuse_handler = None
2660
2661            if isinstance(prev_global_handler, LlamaIndexCallbackHandler):
2662                llama_index.core.global_handler = None
2663
2664            if Settings.callback_manager is None:
2665                Settings.callback_manager = CallbackManager([callback_handler])
2666            else:
2667                for handler in Settings.callback_manager.handlers:
2668                    if isinstance(handler, LlamaIndexCallbackHandler):
2669                        prev_langfuse_handler = handler
2670                        Settings.callback_manager.remove_handler(handler)
2671
2672                Settings.callback_manager.add_handler(callback_handler)
2673
2674        except Exception as e:
2675            self.log.exception(e)
2676
2677        try:
2678            yield callback_handler
2679        finally:
2680            # Reset the handlers
2681            Settings.callback_manager.remove_handler(callback_handler)
2682            if prev_langfuse_handler is not None:
2683                Settings.callback_manager.add_handler(prev_langfuse_handler)
2684
2685            llama_index.core.global_handler = prev_global_handler
2686
2687    def get_llama_index_handler(
2688        self,
2689        *,
2690        run_name: str,
2691        run_description: Optional[str] = None,
2692        run_metadata: Optional[Any] = None,
2693        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
2694    ):
2695        """Create and get a llama-index callback handler linked to this dataset item.
2696
2697        Args:
2698            run_name (str): The name of the dataset run to be used in the callback handler.
2699            run_description (Optional[str]): Description of the dataset run.
2700            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
2701            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor.
2702
2703        Returns:
2704            LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item.
2705        """
2706        metadata = {
2707            "dataset_item_id": self.id,
2708            "run_name": run_name,
2709            "dataset_id": self.dataset_id,
2710        }
2711        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
2712
2713        self.link(
2714            trace, run_name, run_metadata=run_metadata, run_description=run_description
2715        )
2716
2717        try:
2718            from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler
2719
2720            callback_handler = LlamaIndexCallbackHandler(
2721                **llama_index_integration_constructor_kwargs,
2722            )
2723            callback_handler.set_root(trace, update_root=True)
2724
2725            return callback_handler
2726        except Exception as e:
2727            self.log.exception(e)
2728
2729
2730class DatasetClient:
2731    """Class for managing datasets in Langfuse.
2732
2733    Attributes:
2734        id (str): Unique identifier of the dataset.
2735        name (str): Name of the dataset.
2736        description (Optional[str]): Description of the dataset.
2737        metadata (Optional[typing.Any]): Additional metadata of the dataset.
2738        project_id (str): Identifier of the project to which the dataset belongs.
2739        dataset_name (str): Name of the dataset.
2740        created_at (datetime): Timestamp of dataset creation.
2741        updated_at (datetime): Timestamp of the last update to the dataset.
2742        items (List[DatasetItemClient]): List of dataset items associated with the dataset.
2743        runs (List[str]): List of dataset runs associated with the dataset.
2744
2745    Example:
2746        Print the input of each dataset item in a dataset.
2747        ```python
2748        from langfuse import Langfuse
2749
2750        langfuse = Langfuse()
2751
2752        dataset = langfuse.get_dataset("<dataset_name>")
2753
2754        for item in dataset.items:
2755            print(item.input)
2756        ```
2757    """
2758
2759    id: str
2760    name: str
2761    description: Optional[str]
2762    project_id: str
2763    dataset_name: str  # for backward compatibility, to be deprecated
2764    metadata: Optional[Any]
2765    created_at: dt.datetime
2766    updated_at: dt.datetime
2767    items: typing.List[DatasetItemClient]
2768    runs: typing.List[str]
2769
2770    def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]):
2771        """Initialize the DatasetClient."""
2772        self.id = dataset.id
2773        self.name = dataset.name
2774        self.description = dataset.description
2775        self.project_id = dataset.project_id
2776        self.metadata = dataset.metadata
2777        self.dataset_name = dataset.name  # for backward compatibility, to be deprecated
2778        self.created_at = dataset.created_at
2779        self.updated_at = dataset.updated_at
2780        self.items = items
2781        self.runs = dataset.runs
class Langfuse:
  67class Langfuse(object):
  68    """Langfuse Python client.
  69
  70    Attributes:
  71        log (logging.Logger): Logger for the Langfuse client.
  72        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
  73        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
  74        client (FernLangfuse): Core interface for Langfuse API interaction.
  75        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
  76        release (str): Identifies the release number or hash of the application.
  77        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
  78
  79    Example:
  80        Initiating the Langfuse client should always be first step to use Langfuse.
  81        ```python
  82        import os
  83        from langfuse import Langfuse
  84
  85        # Set the public and secret keys as environment variables
  86        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
  87        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
  88
  89        # Initialize the Langfuse client using the credentials
  90        langfuse = Langfuse()
  91        ```
  92    """
  93
  94    log = logging.getLogger("langfuse")
  95    """Logger for the Langfuse client."""
  96
  97    host: str
  98    """Host of Langfuse API."""
  99
 100    def __init__(
 101        self,
 102        public_key: Optional[str] = None,
 103        secret_key: Optional[str] = None,
 104        host: Optional[str] = None,
 105        release: Optional[str] = None,
 106        debug: bool = False,
 107        threads: Optional[int] = None,
 108        flush_at: Optional[int] = None,
 109        flush_interval: Optional[float] = None,
 110        max_retries: Optional[int] = None,
 111        timeout: Optional[int] = None,  # seconds
 112        sdk_integration: Optional[str] = "default",
 113        httpx_client: Optional[httpx.Client] = None,
 114        enabled: Optional[bool] = True,
 115    ):
 116        """Initialize the Langfuse client.
 117
 118        Args:
 119            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 120            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 121            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 122            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 123            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 124            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 125            flush_at: Max batch size that's sent to the API.
 126            flush_interval: Max delay until a new batch is sent to the API.
 127            max_retries: Max number of retries in case of API/network errors.
 128            timeout: Timeout of API requests in seconds.
 129            httpx_client: Pass your own httpx client for more customizability of requests.
 130            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 131            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 132
 133        Raises:
 134            ValueError: If public_key or secret_key are not set and not found in environment variables.
 135
 136        Example:
 137            Initiating the Langfuse client should always be first step to use Langfuse.
 138            ```python
 139            import os
 140            from langfuse import Langfuse
 141
 142            # Set the public and secret keys as environment variables
 143            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 144            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 145
 146            # Initialize the Langfuse client using the credentials
 147            langfuse = Langfuse()
 148            ```
 149        """
 150        self.enabled = enabled
 151        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 152        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 153
 154        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 155        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 156        flush_interval = flush_interval or float(
 157            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 158        )
 159
 160        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 161        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 162
 163        if not self.enabled:
 164            self.log.warning(
 165                "Langfuse client is disabled. No observability data will be sent."
 166            )
 167
 168        elif not public_key:
 169            self.enabled = False
 170            self.log.warning(
 171                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 172            )
 173
 174        elif not secret_key:
 175            self.enabled = False
 176            self.log.warning(
 177                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 178            )
 179
 180        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 181
 182        if set_debug is True:
 183            # Ensures that debug level messages are logged when debug mode is on.
 184            # Otherwise, defaults to WARNING level.
 185            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 186            logging.basicConfig()
 187            self.log.setLevel(logging.DEBUG)
 188
 189            clean_logger()
 190        else:
 191            self.log.setLevel(logging.WARNING)
 192            clean_logger()
 193
 194        self.base_url = (
 195            host
 196            if host
 197            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 198        )
 199
 200        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 201
 202        self.client = FernLangfuse(
 203            base_url=self.base_url,
 204            username=public_key,
 205            password=secret_key,
 206            x_langfuse_sdk_name="python",
 207            x_langfuse_sdk_version=version,
 208            x_langfuse_public_key=public_key,
 209            httpx_client=self.httpx_client,
 210        )
 211
 212        langfuse_client = LangfuseClient(
 213            public_key=public_key,
 214            secret_key=secret_key,
 215            base_url=self.base_url,
 216            version=version,
 217            timeout=timeout,
 218            session=self.httpx_client,
 219        )
 220
 221        args = {
 222            "threads": threads,
 223            "flush_at": flush_at,
 224            "flush_interval": flush_interval,
 225            "max_retries": max_retries,
 226            "client": langfuse_client,
 227            "public_key": public_key,
 228            "sdk_name": "python",
 229            "sdk_version": version,
 230            "sdk_integration": sdk_integration,
 231            "enabled": self.enabled,
 232        }
 233
 234        self.task_manager = TaskManager(**args)
 235
 236        self.trace_id = None
 237
 238        self.release = self._get_release_value(release)
 239
 240        self.prompt_cache = PromptCache()
 241
 242    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 243        if release:
 244            return release
 245        elif "LANGFUSE_RELEASE" in os.environ:
 246            return os.environ["LANGFUSE_RELEASE"]
 247        else:
 248            return get_common_release_envs()
 249
 250    def get_trace_id(self) -> str:
 251        """Get the current trace id."""
 252        return self.trace_id
 253
 254    def get_trace_url(self) -> str:
 255        """Get the URL of the current trace to view it in the Langfuse UI."""
 256        return f"{self.base_url}/trace/{self.trace_id}"
 257
 258    def get_dataset(self, name: str) -> "DatasetClient":
 259        """Fetch a dataset by its name.
 260
 261        Args:
 262            name (str): The name of the dataset to fetch.
 263
 264        Returns:
 265            DatasetClient: The dataset with the given name.
 266        """
 267        try:
 268            self.log.debug(f"Getting datasets {name}")
 269            dataset = self.client.datasets.get(dataset_name=name)
 270
 271            items = [DatasetItemClient(i, langfuse=self) for i in dataset.items]
 272
 273            return DatasetClient(dataset, items=items)
 274        except Exception as e:
 275            self.log.exception(e)
 276            raise e
 277
 278    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 279        """Get the dataset item with the given id."""
 280        try:
 281            self.log.debug(f"Getting dataset item {id}")
 282            dataset_item = self.client.dataset_items.get(id=id)
 283            return DatasetItemClient(dataset_item, langfuse=self)
 284        except Exception as e:
 285            self.log.exception(e)
 286            raise e
 287
 288    def auth_check(self) -> bool:
 289        """Check if the provided credentials (public and secret key) are valid.
 290
 291        Raises:
 292            Exception: If no projects were found for the provided credentials.
 293
 294        Note:
 295            This method is blocking. It is discouraged to use it in production code.
 296        """
 297        try:
 298            projects = self.client.projects.get()
 299            self.log.debug(
 300                f"Auth check successful, found {len(projects.data)} projects"
 301            )
 302            if len(projects.data) == 0:
 303                raise Exception(
 304                    "Auth check failed, no project found for the keys provided."
 305                )
 306            return True
 307
 308        except Exception as e:
 309            self.log.exception(e)
 310            raise e
 311
 312    def get_dataset_run(
 313        self,
 314        dataset_name: str,
 315        dataset_run_name: str,
 316    ) -> DatasetRun:
 317        """Get a dataset run.
 318
 319        Args:
 320            dataset_name: Name of the dataset.
 321            dataset_run_name: Name of the dataset run.
 322
 323        Returns:
 324            DatasetRun: The dataset run.
 325        """
 326        try:
 327            self.log.debug(
 328                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 329            )
 330            return self.client.datasets.get_runs(
 331                dataset_name=dataset_name, run_name=dataset_run_name
 332            )
 333        except Exception as e:
 334            self.log.exception(e)
 335            raise e
 336
 337    def create_dataset(
 338        self,
 339        name: str,
 340        description: Optional[str] = None,
 341        metadata: Optional[Any] = None,
 342    ) -> Dataset:
 343        """Create a dataset with the given name on Langfuse.
 344
 345        Args:
 346            name: Name of the dataset to create.
 347            description: Description of the dataset. Defaults to None.
 348            metadata: Additional metadata. Defaults to None.
 349
 350        Returns:
 351            Dataset: The created dataset as returned by the Langfuse API.
 352        """
 353        try:
 354            body = CreateDatasetRequest(
 355                name=name, description=description, metadata=metadata
 356            )
 357            self.log.debug(f"Creating datasets {body}")
 358            return self.client.datasets.create(request=body)
 359        except Exception as e:
 360            self.log.exception(e)
 361            raise e
 362
 363    def create_dataset_item(
 364        self,
 365        dataset_name: str,
 366        input: Optional[Any] = None,
 367        expected_output: Optional[Any] = None,
 368        metadata: Optional[Any] = None,
 369        source_trace_id: Optional[str] = None,
 370        source_observation_id: Optional[str] = None,
 371        status: Optional[DatasetStatus] = None,
 372        id: Optional[str] = None,
 373    ) -> DatasetItem:
 374        """Create a dataset item.
 375
 376        Upserts if an item with id already exists.
 377
 378        Args:
 379            dataset_name: Name of the dataset in which the dataset item should be created.
 380            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 381            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 382            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 383            source_trace_id: Id of the source trace. Defaults to None.
 384            source_observation_id: Id of the source observation. Defaults to None.
 385            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 386            id: Id of the dataset item. Defaults to None.
 387
 388        Returns:
 389            DatasetItem: The created dataset item as returned by the Langfuse API.
 390
 391        Example:
 392            ```python
 393            from langfuse import Langfuse
 394
 395            langfuse = Langfuse()
 396
 397            # Uploading items to the Langfuse dataset named "capital_cities"
 398            langfuse.create_dataset_item(
 399                dataset_name="capital_cities",
 400                input={"input": {"country": "Italy"}},
 401                expected_output={"expected_output": "Rome"},
 402                metadata={"foo": "bar"}
 403            )
 404            ```
 405        """
 406        try:
 407            body = CreateDatasetItemRequest(
 408                datasetName=dataset_name,
 409                input=input,
 410                expectedOutput=expected_output,
 411                metadata=metadata,
 412                sourceTraceId=source_trace_id,
 413                sourceObservationId=source_observation_id,
 414                status=status,
 415                id=id,
 416            )
 417            self.log.debug(f"Creating dataset item {body}")
 418            return self.client.dataset_items.create(request=body)
 419        except Exception as e:
 420            self.log.exception(e)
 421            raise e
 422
 423    def get_trace(
 424        self,
 425        id: str,
 426    ) -> TraceWithFullDetails:
 427        """Get a trace via the Langfuse API by its id.
 428
 429        Args:
 430            id: The id of the trace to fetch.
 431
 432        Returns:
 433            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 434
 435        Raises:
 436            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 437        """
 438        try:
 439            self.log.debug(f"Getting trace {id}")
 440            return self.client.trace.get(id)
 441        except Exception as e:
 442            self.log.exception(e)
 443            raise e
 444
 445    def get_observations(
 446        self,
 447        *,
 448        page: typing.Optional[int] = None,
 449        limit: typing.Optional[int] = None,
 450        name: typing.Optional[str] = None,
 451        user_id: typing.Optional[str] = None,
 452        trace_id: typing.Optional[str] = None,
 453        parent_observation_id: typing.Optional[str] = None,
 454        type: typing.Optional[str] = None,
 455    ) -> ObservationsViews:
 456        """Get a list of observations in the current project matching the given parameters.
 457
 458        Args:
 459            page (Optional[int]): Page number of the observations to return. Defaults to None.
 460            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 461            name (Optional[str]): Name of the observations to return. Defaults to None.
 462            user_id (Optional[str]): User identifier. Defaults to None.
 463            trace_id (Optional[str]): Trace identifier. Defaults to None.
 464            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 465            type (Optional[str]): Type of the observation. Defaults to None.
 466
 467        Returns:
 468            List of ObservationsViews: List of observations in the project matching the given parameters.
 469
 470        Raises:
 471            Exception: If an error occurred during the request.
 472        """
 473        try:
 474            self.log.debug(
 475                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {type}"
 476            )
 477            return self.client.observations.get_many(
 478                page=page,
 479                limit=limit,
 480                name=name,
 481                user_id=user_id,
 482                trace_id=trace_id,
 483                parent_observation_id=parent_observation_id,
 484                type=type,
 485            )
 486        except Exception as e:
 487            self.log.exception(e)
 488            raise e
 489
 490    def get_generations(
 491        self,
 492        *,
 493        page: typing.Optional[int] = None,
 494        limit: typing.Optional[int] = None,
 495        name: typing.Optional[str] = None,
 496        user_id: typing.Optional[str] = None,
 497        trace_id: typing.Optional[str] = None,
 498        parent_observation_id: typing.Optional[str] = None,
 499    ) -> ObservationsViews:
 500        """Get a list of generations in the current project matching the given parameters.
 501
 502        Args:
 503            page (Optional[int]): Page number of the generations to return. Defaults to None.
 504            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 505            name (Optional[str]): Name of the generations to return. Defaults to None.
 506            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 507            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 508            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 509
 510        Returns:
 511            List of ObservationsViews: List of generations in the project matching the given parameters.
 512
 513        Raises:
 514            Exception: If an error occurred during the request.
 515        """
 516        return self.get_observations(
 517            page=page,
 518            limit=limit,
 519            name=name,
 520            user_id=user_id,
 521            trace_id=trace_id,
 522            parent_observation_id=parent_observation_id,
 523            type="GENERATION",
 524        )
 525
 526    def get_observation(
 527        self,
 528        id: str,
 529    ) -> Observation:
 530        """Get an observation in the current project with the given identifier.
 531
 532        Args:
 533            id: The identifier of the observation to fetch.
 534
 535        Raises:
 536            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 537        """
 538        try:
 539            self.log.debug(f"Getting observation {id}")
 540            return self.client.observations.get(id)
 541        except Exception as e:
 542            self.log.exception(e)
 543            raise e
 544
 545    @overload
 546    def get_prompt(
 547        self,
 548        name: str,
 549        version: Optional[int] = None,
 550        *,
 551        label: Optional[str] = None,
 552        type: Literal["chat"],
 553        cache_ttl_seconds: Optional[int] = None,
 554    ) -> ChatPromptClient: ...
 555
 556    @overload
 557    def get_prompt(
 558        self,
 559        name: str,
 560        version: Optional[int] = None,
 561        *,
 562        label: Optional[str] = None,
 563        type: Literal["text"] = "text",
 564        cache_ttl_seconds: Optional[int] = None,
 565    ) -> TextPromptClient: ...
 566
 567    def get_prompt(
 568        self,
 569        name: str,
 570        version: Optional[int] = None,
 571        *,
 572        label: Optional[str] = None,
 573        type: Literal["chat", "text"] = "text",
 574        cache_ttl_seconds: Optional[int] = None,
 575    ) -> PromptClient:
 576        """Get a prompt.
 577
 578        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 579        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 580        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 581        return the expired prompt as a fallback.
 582
 583        Args:
 584            name (str): The name of the prompt to retrieve.
 585
 586        Keyword Args:
 587            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 588            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 589            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 590            keyword argument. If not set, defaults to 60 seconds.
 591            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 592
 593        Returns:
 594            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
 595            - TextPromptClient, if type argument is 'text'.
 596            - ChatPromptClient, if type argument is 'chat'.
 597
 598        Raises:
 599            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
 600            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
 601        """
 602        if version is not None and label is not None:
 603            raise ValueError("Cannot specify both version and label at the same time.")
 604
 605        if not name:
 606            raise ValueError("Prompt name cannot be empty.")
 607
 608        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
 609
 610        self.log.debug(f"Getting prompt '{cache_key}'")
 611        cached_prompt = self.prompt_cache.get(cache_key)
 612
 613        if cached_prompt is None:
 614            return self._fetch_prompt_and_update_cache(
 615                name, version=version, label=label, ttl_seconds=cache_ttl_seconds
 616            )
 617
 618        if cached_prompt.is_expired():
 619            try:
 620                return self._fetch_prompt_and_update_cache(
 621                    name,
 622                    version=version,
 623                    label=label,
 624                    ttl_seconds=cache_ttl_seconds,
 625                )
 626
 627            except Exception as e:
 628                self.log.warn(
 629                    f"Returning expired prompt cache for '{cache_key}' due to fetch error: {e}"
 630                )
 631
 632                return cached_prompt.value
 633
 634        return cached_prompt.value
 635
 636    def _fetch_prompt_and_update_cache(
 637        self,
 638        name: str,
 639        *,
 640        version: Optional[int] = None,
 641        label: Optional[str] = None,
 642        ttl_seconds: Optional[int] = None,
 643    ) -> PromptClient:
 644        try:
 645            cache_key = PromptCache.generate_cache_key(
 646                name, version=version, label=label
 647            )
 648
 649            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
 650            promptResponse = self.client.prompts.get(
 651                self._url_encode(name), version=version, label=label
 652            )
 653
 654            if promptResponse.type == "chat":
 655                prompt = ChatPromptClient(promptResponse)
 656            else:
 657                prompt = TextPromptClient(promptResponse)
 658
 659            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
 660
 661            return prompt
 662
 663        except Exception as e:
 664            self.log.exception(f"Error while fetching prompt '{cache_key}': {e}")
 665            raise e
 666
 667    @overload
 668    def create_prompt(
 669        self,
 670        *,
 671        name: str,
 672        prompt: List[ChatMessageDict],
 673        is_active: Optional[bool] = None,  # deprecated
 674        labels: List[str] = [],
 675        tags: Optional[List[str]] = None,
 676        type: Optional[Literal["chat"]],
 677        config: Optional[Any] = None,
 678    ) -> ChatPromptClient: ...
 679
 680    @overload
 681    def create_prompt(
 682        self,
 683        *,
 684        name: str,
 685        prompt: str,
 686        is_active: Optional[bool] = None,  # deprecated
 687        labels: List[str] = [],
 688        tags: Optional[List[str]] = None,
 689        type: Optional[Literal["text"]] = "text",
 690        config: Optional[Any] = None,
 691    ) -> TextPromptClient: ...
 692
 693    def create_prompt(
 694        self,
 695        *,
 696        name: str,
 697        prompt: Union[str, List[ChatMessageDict]],
 698        is_active: Optional[bool] = None,  # deprecated
 699        labels: List[str] = [],
 700        tags: Optional[List[str]] = None,
 701        type: Optional[Literal["chat", "text"]] = "text",
 702        config: Optional[Any] = None,
 703    ) -> PromptClient:
 704        """Create a new prompt in Langfuse.
 705
 706        Keyword Args:
 707            name : The name of the prompt to be created.
 708            prompt : The content of the prompt to be created.
 709            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
 710            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
 711            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
 712            config: Additional structured data to be saved with the prompt. Defaults to None.
 713            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
 714
 715        Returns:
 716            TextPromptClient: The prompt if type argument is 'text'.
 717            ChatPromptClient: The prompt if type argument is 'chat'.
 718        """
 719        try:
 720            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
 721
 722            # Handle deprecated is_active flag
 723            if is_active:
 724                self.log.warning(
 725                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
 726                )
 727
 728                labels = labels if "production" in labels else labels + ["production"]
 729
 730            if type == "chat":
 731                if not isinstance(prompt, list):
 732                    raise ValueError(
 733                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
 734                    )
 735                request = CreatePromptRequest_Chat(
 736                    name=name,
 737                    prompt=prompt,
 738                    labels=labels,
 739                    tags=tags,
 740                    config=config or {},
 741                    type="chat",
 742                )
 743                server_prompt = self.client.prompts.create(request=request)
 744
 745                return ChatPromptClient(prompt=server_prompt)
 746
 747            if not isinstance(prompt, str):
 748                raise ValueError("For 'text' type, 'prompt' must be a string.")
 749
 750            request = CreatePromptRequest_Text(
 751                name=name,
 752                prompt=prompt,
 753                labels=labels,
 754                tags=tags,
 755                config=config or {},
 756                type="text",
 757            )
 758
 759            server_prompt = self.client.prompts.create(request=request)
 760            return TextPromptClient(prompt=server_prompt)
 761
 762        except Exception as e:
 763            self.log.exception(e)
 764            raise e
 765
 766    def _url_encode(self, url: str) -> str:
 767        return urllib.parse.quote(url)
 768
 769    def trace(
 770        self,
 771        *,
 772        id: typing.Optional[str] = None,
 773        name: typing.Optional[str] = None,
 774        user_id: typing.Optional[str] = None,
 775        session_id: typing.Optional[str] = None,
 776        version: typing.Optional[str] = None,
 777        input: typing.Optional[typing.Any] = None,
 778        output: typing.Optional[typing.Any] = None,
 779        metadata: typing.Optional[typing.Any] = None,
 780        tags: typing.Optional[typing.List[str]] = None,
 781        timestamp: typing.Optional[dt.datetime] = None,
 782        public: typing.Optional[bool] = None,
 783        **kwargs,
 784    ) -> "StatefulTraceClient":
 785        """Create a trace.
 786
 787        Args:
 788            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
 789            name: Identifier of the trace. Useful for sorting/filtering in the UI.
 790            input: The input of the trace. Can be any JSON object.
 791            output: The output of the trace. Can be any JSON object.
 792            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
 793            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
 794            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
 795            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
 796            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
 797            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
 798            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
 799            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
 800            **kwargs: Additional keyword arguments that can be included in the trace.
 801
 802        Returns:
 803            StatefulTraceClient: The created trace.
 804
 805        Example:
 806            ```python
 807            from langfuse import Langfuse
 808
 809            langfuse = Langfuse()
 810
 811            trace = langfuse.trace(
 812                name="example-application",
 813                user_id="user-1234")
 814            )
 815            ```
 816        """
 817        new_id = id or str(uuid.uuid4())
 818        self.trace_id = new_id
 819        try:
 820            new_dict = {
 821                "id": new_id,
 822                "name": name,
 823                "userId": user_id,
 824                "sessionId": session_id
 825                or kwargs.get("sessionId", None),  # backward compatibility
 826                "release": self.release,
 827                "version": version,
 828                "metadata": metadata,
 829                "input": input,
 830                "output": output,
 831                "tags": tags,
 832                "timestamp": timestamp or _get_timestamp(),
 833                "public": public,
 834            }
 835            if kwargs is not None:
 836                new_dict.update(kwargs)
 837
 838            new_body = TraceBody(**new_dict)
 839
 840            self.log.debug(f"Creating trace {new_body}")
 841            event = {
 842                "id": str(uuid.uuid4()),
 843                "type": "trace-create",
 844                "body": new_body.dict(exclude_none=True),
 845            }
 846
 847            self.task_manager.add_task(
 848                event,
 849            )
 850
 851        except Exception as e:
 852            self.log.exception(e)
 853        finally:
 854            self._log_memory_usage()
 855
 856            return StatefulTraceClient(
 857                self.client, new_id, StateType.TRACE, new_id, self.task_manager
 858            )
 859
 860    def _log_memory_usage(self):
 861        try:
 862            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
 863            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
 864            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
 865
 866            if (
 867                not is_malloc_tracing_enabled
 868                or report_interval <= 0
 869                or round(time.monotonic()) % report_interval != 0
 870            ):
 871                return
 872
 873            snapshot = tracemalloc.take_snapshot().statistics("lineno")
 874
 875            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
 876            memory_usage_total_items = [f"{stat}" for stat in snapshot]
 877            memory_usage_langfuse_items = [
 878                stat for stat in memory_usage_total_items if "/langfuse/" in stat
 879            ]
 880
 881            logged_memory_usage = {
 882                "all_files": [f"{stat}" for stat in memory_usage_total_items][
 883                    :top_k_items
 884                ],
 885                "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
 886                    :top_k_items
 887                ],
 888                "total_usage": f"{total_memory_usage:.2f} MB",
 889                "langfuse_queue_length": self.task_manager._queue.qsize(),
 890            }
 891
 892            self.log.debug("Memory usage: ", logged_memory_usage)
 893
 894            event = SdkLogBody(log=logged_memory_usage)
 895            self.task_manager.add_task(
 896                {
 897                    "id": str(uuid.uuid4()),
 898                    "type": "sdk-log",
 899                    "timestamp": _get_timestamp(),
 900                    "body": event.dict(),
 901                }
 902            )
 903
 904        except Exception as e:
 905            self.log.exception(e)
 906
 907    def score(
 908        self,
 909        *,
 910        name: str,
 911        value: float,
 912        trace_id: typing.Optional[str] = None,
 913        id: typing.Optional[str] = None,
 914        comment: typing.Optional[str] = None,
 915        observation_id: typing.Optional[str] = None,
 916        **kwargs,
 917    ) -> "StatefulClient":
 918        """Create a score attached to a trace (and optionally an observation).
 919
 920        Args:
 921            name (str): Identifier of the score.
 922            value (float): The value of the score. Can be any number, often standardized to 0..1
 923            trace_id (str): The id of the trace to which the score should be attached.
 924            comment (Optional[str]): Additional context/explanation of the score.
 925            observation_id (Optional[str]): The id of the observation to which the score should be attached.
 926            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
 927            **kwargs: Additional keyword arguments to include in the score.
 928
 929        Returns:
 930            StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided).
 931
 932        Example:
 933            ```python
 934            from langfuse import Langfuse
 935
 936            langfuse = Langfuse()
 937
 938            # Create a trace
 939            trace = langfuse.trace(name="example-application")
 940
 941            # Get id of created trace
 942            trace_id = trace.id
 943
 944            # Add score to the trace
 945            trace = langfuse.score(
 946                trace_id=trace_id,
 947                name="user-explicit-feedback",
 948                value=1,
 949                comment="I like how personalized the response is"
 950            )
 951            ```
 952        """
 953        trace_id = trace_id or self.trace_id or str(uuid.uuid4())
 954        new_id = id or str(uuid.uuid4())
 955        try:
 956            new_dict = {
 957                "id": new_id,
 958                "trace_id": trace_id,
 959                "observation_id": observation_id,
 960                "name": name,
 961                "value": value,
 962                "comment": comment,
 963                **kwargs,
 964            }
 965
 966            self.log.debug(f"Creating score {new_dict}...")
 967            new_body = ScoreBody(**new_dict)
 968
 969            event = {
 970                "id": str(uuid.uuid4()),
 971                "type": "score-create",
 972                "body": new_body.dict(exclude_none=True),
 973            }
 974            self.task_manager.add_task(event)
 975
 976        except Exception as e:
 977            self.log.exception(e)
 978        finally:
 979            if observation_id is not None:
 980                return StatefulClient(
 981                    self.client,
 982                    observation_id,
 983                    StateType.OBSERVATION,
 984                    trace_id,
 985                    self.task_manager,
 986                )
 987            else:
 988                return StatefulClient(
 989                    self.client, new_id, StateType.TRACE, new_id, self.task_manager
 990                )
 991
 992    def span(
 993        self,
 994        *,
 995        id: typing.Optional[str] = None,
 996        trace_id: typing.Optional[str] = None,
 997        parent_observation_id: typing.Optional[str] = None,
 998        name: typing.Optional[str] = None,
 999        start_time: typing.Optional[dt.datetime] = None,
1000        end_time: typing.Optional[dt.datetime] = None,
1001        metadata: typing.Optional[typing.Any] = None,
1002        level: typing.Optional[SpanLevel] = None,
1003        status_message: typing.Optional[str] = None,
1004        input: typing.Optional[typing.Any] = None,
1005        output: typing.Optional[typing.Any] = None,
1006        version: typing.Optional[str] = None,
1007        **kwargs,
1008    ) -> "StatefulSpanClient":
1009        """Create a span.
1010
1011        A span represents durations of units of work in a trace.
1012        Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1013
1014        If no trace_id is provided, a new trace is created just for this span.
1015
1016        Args:
1017            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
1018            trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated.
1019            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1020            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
1021            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
1022            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
1023            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
1024            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1025            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
1026            input (Optional[dict]): The input to the span. Can be any JSON object.
1027            output (Optional[dict]): The output to the span. Can be any JSON object.
1028            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1029            **kwargs: Additional keyword arguments to include in the span.
1030
1031        Returns:
1032            StatefulSpanClient: The created span.
1033
1034        Example:
1035            ```python
1036            from langfuse import Langfuse
1037
1038            langfuse = Langfuse()
1039
1040            trace = langfuse.trace(name = "llm-feature")
1041
1042            # Create a span
1043            retrieval = langfuse.span(name = "retrieval", trace_id = trace.id)
1044
1045            # Create a nested span
1046            nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id)
1047            ```
1048        """
1049        new_span_id = id or str(uuid.uuid4())
1050        new_trace_id = trace_id or str(uuid.uuid4())
1051        self.trace_id = new_trace_id
1052        try:
1053            span_body = {
1054                "id": new_span_id,
1055                "trace_id": new_trace_id,
1056                "name": name,
1057                "start_time": start_time or _get_timestamp(),
1058                "metadata": metadata,
1059                "input": input,
1060                "output": output,
1061                "level": level,
1062                "status_message": status_message,
1063                "parent_observation_id": parent_observation_id,
1064                "version": version,
1065                "end_time": end_time,
1066                "trace": {"release": self.release},
1067                **kwargs,
1068            }
1069
1070            if trace_id is None:
1071                self._generate_trace(new_trace_id, name or new_trace_id)
1072
1073            self.log.debug(f"Creating span {span_body}...")
1074
1075            span_body = CreateSpanBody(**span_body)
1076
1077            event = {
1078                "id": str(uuid.uuid4()),
1079                "type": "span-create",
1080                "body": span_body.dict(exclude_none=True),
1081            }
1082
1083            self.log.debug(f"Creating span {event}...")
1084            self.task_manager.add_task(event)
1085
1086        except Exception as e:
1087            self.log.exception(e)
1088        finally:
1089            self._log_memory_usage()
1090
1091            return StatefulSpanClient(
1092                self.client,
1093                new_span_id,
1094                StateType.OBSERVATION,
1095                new_trace_id,
1096                self.task_manager,
1097            )
1098
1099    def event(
1100        self,
1101        *,
1102        id: typing.Optional[str] = None,
1103        trace_id: typing.Optional[str] = None,
1104        parent_observation_id: typing.Optional[str] = None,
1105        name: typing.Optional[str] = None,
1106        start_time: typing.Optional[dt.datetime] = None,
1107        metadata: typing.Optional[typing.Any] = None,
1108        input: typing.Optional[typing.Any] = None,
1109        output: typing.Optional[typing.Any] = None,
1110        level: typing.Optional[SpanLevel] = None,
1111        status_message: typing.Optional[str] = None,
1112        version: typing.Optional[str] = None,
1113        **kwargs,
1114    ) -> "StatefulSpanClient":
1115        """Create an event.
1116
1117        An event represents a discrete event in a trace.
1118        Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1119
1120        If no trace_id is provided, a new trace is created just for this event.
1121
1122        Args:
1123            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
1124            trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event.
1125            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1126            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
1127            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
1128            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
1129            input (Optional[Any]): The input to the event. Can be any JSON object.
1130            output (Optional[Any]): The output to the event. Can be any JSON object.
1131            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1132            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
1133            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
1134            **kwargs: Additional keyword arguments to include in the event.
1135
1136        Returns:
1137            StatefulSpanClient: The created event.
1138
1139        Example:
1140            ```python
1141            from langfuse import Langfuse
1142
1143            langfuse = Langfuse()
1144
1145            trace = langfuse.trace(name = "llm-feature")
1146
1147            # Create an event
1148            retrieval = langfuse.event(name = "retrieval", trace_id = trace.id)
1149            ```
1150        """
1151        event_id = id or str(uuid.uuid4())
1152        new_trace_id = trace_id or str(uuid.uuid4())
1153        self.trace_id = new_trace_id
1154        try:
1155            event_body = {
1156                "id": event_id,
1157                "trace_id": new_trace_id,
1158                "name": name,
1159                "start_time": start_time or _get_timestamp(),
1160                "metadata": metadata,
1161                "input": input,
1162                "output": output,
1163                "level": level,
1164                "status_message": status_message,
1165                "parent_observation_id": parent_observation_id,
1166                "version": version,
1167                "trace": {"release": self.release},
1168                **kwargs,
1169            }
1170
1171            if trace_id is None:
1172                self._generate_trace(new_trace_id, name or new_trace_id)
1173
1174            request = CreateEventBody(**event_body)
1175
1176            event = {
1177                "id": str(uuid.uuid4()),
1178                "type": "event-create",
1179                "body": request.dict(exclude_none=True),
1180            }
1181
1182            self.log.debug(f"Creating event {event}...")
1183            self.task_manager.add_task(event)
1184
1185        except Exception as e:
1186            self.log.exception(e)
1187        finally:
1188            return StatefulSpanClient(
1189                self.client,
1190                event_id,
1191                StateType.OBSERVATION,
1192                new_trace_id,
1193                self.task_manager,
1194            )
1195
1196    def generation(
1197        self,
1198        *,
1199        id: typing.Optional[str] = None,
1200        trace_id: typing.Optional[str] = None,
1201        parent_observation_id: typing.Optional[str] = None,
1202        name: typing.Optional[str] = None,
1203        start_time: typing.Optional[dt.datetime] = None,
1204        end_time: typing.Optional[dt.datetime] = None,
1205        completion_start_time: typing.Optional[dt.datetime] = None,
1206        metadata: typing.Optional[typing.Any] = None,
1207        level: typing.Optional[SpanLevel] = None,
1208        status_message: typing.Optional[str] = None,
1209        version: typing.Optional[str] = None,
1210        model: typing.Optional[str] = None,
1211        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1212        input: typing.Optional[typing.Any] = None,
1213        output: typing.Optional[typing.Any] = None,
1214        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1215        prompt: typing.Optional[PromptClient] = None,
1216        **kwargs,
1217    ) -> "StatefulGenerationClient":
1218        """Create a generation.
1219
1220        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
1221
1222        Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1223
1224        If no trace_id is provided, a new trace is created just for this generation.
1225
1226        Args:
1227            id (Optional[str]): The id of the generation can be set, defaults to random id.
1228            trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created
1229            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1230            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1231            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1232            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1233            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1234            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1235            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1236            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1237            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1238            model (Optional[str]): The name of the model used for the generation.
1239            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1240            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1241            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1242            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1243            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1244            **kwargs: Additional keyword arguments to include in the generation.
1245
1246        Returns:
1247            StatefulGenerationClient: The created generation.
1248
1249        Example:
1250            ```python
1251            from langfuse import Langfuse
1252
1253            langfuse = Langfuse()
1254
1255            # Create a generation in Langfuse
1256            generation = langfuse.generation(
1257                name="summary-generation",
1258                model="gpt-3.5-turbo",
1259                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
1260                input=[{"role": "system", "content": "You are a helpful assistant."},
1261                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
1262                metadata={"interface": "whatsapp"}
1263            )
1264            ```
1265        """
1266        new_trace_id = trace_id or str(uuid.uuid4())
1267        new_generation_id = id or str(uuid.uuid4())
1268        self.trace_id = new_trace_id
1269        try:
1270            generation_body = {
1271                "id": new_generation_id,
1272                "trace_id": new_trace_id,
1273                "release": self.release,
1274                "name": name,
1275                "start_time": start_time or _get_timestamp(),
1276                "metadata": metadata,
1277                "input": input,
1278                "output": output,
1279                "level": level,
1280                "status_message": status_message,
1281                "parent_observation_id": parent_observation_id,
1282                "version": version,
1283                "end_time": end_time,
1284                "completion_start_time": completion_start_time,
1285                "model": model,
1286                "model_parameters": model_parameters,
1287                "usage": _convert_usage_input(usage) if usage is not None else None,
1288                "trace": {"release": self.release},
1289                **_create_prompt_context(prompt),
1290                **kwargs,
1291            }
1292
1293            if trace_id is None:
1294                trace = {
1295                    "id": new_trace_id,
1296                    "release": self.release,
1297                    "name": name,
1298                }
1299                request = TraceBody(**trace)
1300
1301                event = {
1302                    "id": str(uuid.uuid4()),
1303                    "type": "trace-create",
1304                    "body": request.dict(exclude_none=True),
1305                }
1306
1307                self.log.debug(f"Creating trace {event}...")
1308
1309                self.task_manager.add_task(event)
1310
1311            self.log.debug(f"Creating generation max {generation_body} {usage}...")
1312            request = CreateGenerationBody(**generation_body)
1313
1314            event = {
1315                "id": str(uuid.uuid4()),
1316                "type": "generation-create",
1317                "body": request.dict(exclude_none=True),
1318            }
1319
1320            self.log.debug(f"Creating top-level generation {event} ...")
1321            self.task_manager.add_task(event)
1322
1323        except Exception as e:
1324            self.log.exception(e)
1325        finally:
1326            return StatefulGenerationClient(
1327                self.client,
1328                new_generation_id,
1329                StateType.OBSERVATION,
1330                new_trace_id,
1331                self.task_manager,
1332            )
1333
1334    def _generate_trace(self, trace_id: str, name: str):
1335        trace_dict = {
1336            "id": trace_id,
1337            "release": self.release,
1338            "name": name,
1339        }
1340
1341        trace_body = TraceBody(**trace_dict)
1342
1343        event = {
1344            "id": str(uuid.uuid4()),
1345            "type": "trace-create",
1346            "body": trace_body.dict(exclude_none=True),
1347        }
1348
1349        self.log.debug(f"Creating trace {event}...")
1350        self.task_manager.add_task(event)
1351
1352    def join(self):
1353        """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter.
1354
1355        If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes.
1356        To guarantee all messages have been delivered, you still need to call flush().
1357        """
1358        try:
1359            return self.task_manager.join()
1360        except Exception as e:
1361            self.log.exception(e)
1362
1363    def flush(self):
1364        """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down.
1365
1366        Example:
1367            ```python
1368            from langfuse import Langfuse
1369
1370            langfuse = Langfuse()
1371
1372            # Some operations with Langfuse
1373
1374            # Flushing all events to end Langfuse cleanly
1375            langfuse.flush()
1376            ```
1377        """
1378        try:
1379            return self.task_manager.flush()
1380        except Exception as e:
1381            self.log.exception(e)
1382
1383    def shutdown(self):
1384        """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated.
1385
1386        This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API.
1387        As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
1388        """
1389        try:
1390            return self.task_manager.shutdown()
1391        except Exception as e:
1392            self.log.exception(e)

Langfuse Python client.

Attributes:
  • log (logging.Logger): Logger for the Langfuse client.
  • base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
  • httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
  • client (FernLangfuse): Core interface for Langfuse API interaction.
  • task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
  • release (str): Identifies the release number or hash of the application.
  • prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
Example:

Initiating the Langfuse client should always be first step to use Langfuse.

import os
from langfuse import Langfuse

# Set the public and secret keys as environment variables
os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
os.environ['LANGFUSE_SECRET_KEY'] = secret_key

# Initialize the Langfuse client using the credentials
langfuse = Langfuse()
Langfuse( public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: bool = False, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[float] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, sdk_integration: Optional[str] = 'default', httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = True)
100    def __init__(
101        self,
102        public_key: Optional[str] = None,
103        secret_key: Optional[str] = None,
104        host: Optional[str] = None,
105        release: Optional[str] = None,
106        debug: bool = False,
107        threads: Optional[int] = None,
108        flush_at: Optional[int] = None,
109        flush_interval: Optional[float] = None,
110        max_retries: Optional[int] = None,
111        timeout: Optional[int] = None,  # seconds
112        sdk_integration: Optional[str] = "default",
113        httpx_client: Optional[httpx.Client] = None,
114        enabled: Optional[bool] = True,
115    ):
116        """Initialize the Langfuse client.
117
118        Args:
119            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
120            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
121            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
122            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
123            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
124            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
125            flush_at: Max batch size that's sent to the API.
126            flush_interval: Max delay until a new batch is sent to the API.
127            max_retries: Max number of retries in case of API/network errors.
128            timeout: Timeout of API requests in seconds.
129            httpx_client: Pass your own httpx client for more customizability of requests.
130            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
131            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
132
133        Raises:
134            ValueError: If public_key or secret_key are not set and not found in environment variables.
135
136        Example:
137            Initiating the Langfuse client should always be first step to use Langfuse.
138            ```python
139            import os
140            from langfuse import Langfuse
141
142            # Set the public and secret keys as environment variables
143            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
144            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
145
146            # Initialize the Langfuse client using the credentials
147            langfuse = Langfuse()
148            ```
149        """
150        self.enabled = enabled
151        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
152        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
153
154        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
155        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
156        flush_interval = flush_interval or float(
157            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
158        )
159
160        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
161        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
162
163        if not self.enabled:
164            self.log.warning(
165                "Langfuse client is disabled. No observability data will be sent."
166            )
167
168        elif not public_key:
169            self.enabled = False
170            self.log.warning(
171                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
172            )
173
174        elif not secret_key:
175            self.enabled = False
176            self.log.warning(
177                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
178            )
179
180        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
181
182        if set_debug is True:
183            # Ensures that debug level messages are logged when debug mode is on.
184            # Otherwise, defaults to WARNING level.
185            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
186            logging.basicConfig()
187            self.log.setLevel(logging.DEBUG)
188
189            clean_logger()
190        else:
191            self.log.setLevel(logging.WARNING)
192            clean_logger()
193
194        self.base_url = (
195            host
196            if host
197            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
198        )
199
200        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
201
202        self.client = FernLangfuse(
203            base_url=self.base_url,
204            username=public_key,
205            password=secret_key,
206            x_langfuse_sdk_name="python",
207            x_langfuse_sdk_version=version,
208            x_langfuse_public_key=public_key,
209            httpx_client=self.httpx_client,
210        )
211
212        langfuse_client = LangfuseClient(
213            public_key=public_key,
214            secret_key=secret_key,
215            base_url=self.base_url,
216            version=version,
217            timeout=timeout,
218            session=self.httpx_client,
219        )
220
221        args = {
222            "threads": threads,
223            "flush_at": flush_at,
224            "flush_interval": flush_interval,
225            "max_retries": max_retries,
226            "client": langfuse_client,
227            "public_key": public_key,
228            "sdk_name": "python",
229            "sdk_version": version,
230            "sdk_integration": sdk_integration,
231            "enabled": self.enabled,
232        }
233
234        self.task_manager = TaskManager(**args)
235
236        self.trace_id = None
237
238        self.release = self._get_release_value(release)
239
240        self.prompt_cache = PromptCache()

Initialize the Langfuse client.

Arguments:
  • public_key: Public API key of Langfuse project. Can be set via LANGFUSE_PUBLIC_KEY environment variable.
  • secret_key: Secret API key of Langfuse project. Can be set via LANGFUSE_SECRET_KEY environment variable.
  • host: Host of Langfuse API. Can be set via LANGFUSE_HOST environment variable. Defaults to https://cloud.langfuse.com.
  • release: Release number/hash of the application to provide analytics grouped by release. Can be set via LANGFUSE_RELEASE environment variable.
  • debug: Enables debug mode for more verbose logging. Can be set via LANGFUSE_DEBUG environment variable.
  • threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
  • flush_at: Max batch size that's sent to the API.
  • flush_interval: Max delay until a new batch is sent to the API.
  • max_retries: Max number of retries in case of API/network errors.
  • timeout: Timeout of API requests in seconds.
  • httpx_client: Pass your own httpx client for more customizability of requests.
  • sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
  • enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
Raises:
  • ValueError: If public_key or secret_key are not set and not found in environment variables.
Example:

Initiating the Langfuse client should always be first step to use Langfuse.

import os
from langfuse import Langfuse

# Set the public and secret keys as environment variables
os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
os.environ['LANGFUSE_SECRET_KEY'] = secret_key

# Initialize the Langfuse client using the credentials
langfuse = Langfuse()
log = <Logger langfuse (WARNING)>

Logger for the Langfuse client.

host: str

Host of Langfuse API.

enabled
base_url
httpx_client
client
task_manager
trace_id
release
prompt_cache
def get_trace_id(self) -> str:
250    def get_trace_id(self) -> str:
251        """Get the current trace id."""
252        return self.trace_id

Get the current trace id.

def get_trace_url(self) -> str:
254    def get_trace_url(self) -> str:
255        """Get the URL of the current trace to view it in the Langfuse UI."""
256        return f"{self.base_url}/trace/{self.trace_id}"

Get the URL of the current trace to view it in the Langfuse UI.

def get_dataset(self, name: str) -> DatasetClient:
258    def get_dataset(self, name: str) -> "DatasetClient":
259        """Fetch a dataset by its name.
260
261        Args:
262            name (str): The name of the dataset to fetch.
263
264        Returns:
265            DatasetClient: The dataset with the given name.
266        """
267        try:
268            self.log.debug(f"Getting datasets {name}")
269            dataset = self.client.datasets.get(dataset_name=name)
270
271            items = [DatasetItemClient(i, langfuse=self) for i in dataset.items]
272
273            return DatasetClient(dataset, items=items)
274        except Exception as e:
275            self.log.exception(e)
276            raise e

Fetch a dataset by its name.

Arguments:
  • name (str): The name of the dataset to fetch.
Returns:

DatasetClient: The dataset with the given name.

def get_dataset_item(self, id: str) -> DatasetItemClient:
278    def get_dataset_item(self, id: str) -> "DatasetItemClient":
279        """Get the dataset item with the given id."""
280        try:
281            self.log.debug(f"Getting dataset item {id}")
282            dataset_item = self.client.dataset_items.get(id=id)
283            return DatasetItemClient(dataset_item, langfuse=self)
284        except Exception as e:
285            self.log.exception(e)
286            raise e

Get the dataset item with the given id.

def auth_check(self) -> bool:
288    def auth_check(self) -> bool:
289        """Check if the provided credentials (public and secret key) are valid.
290
291        Raises:
292            Exception: If no projects were found for the provided credentials.
293
294        Note:
295            This method is blocking. It is discouraged to use it in production code.
296        """
297        try:
298            projects = self.client.projects.get()
299            self.log.debug(
300                f"Auth check successful, found {len(projects.data)} projects"
301            )
302            if len(projects.data) == 0:
303                raise Exception(
304                    "Auth check failed, no project found for the keys provided."
305                )
306            return True
307
308        except Exception as e:
309            self.log.exception(e)
310            raise e

Check if the provided credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def get_dataset_run( self, dataset_name: str, dataset_run_name: str) -> langfuse.api.resources.commons.types.dataset_run.DatasetRun:
312    def get_dataset_run(
313        self,
314        dataset_name: str,
315        dataset_run_name: str,
316    ) -> DatasetRun:
317        """Get a dataset run.
318
319        Args:
320            dataset_name: Name of the dataset.
321            dataset_run_name: Name of the dataset run.
322
323        Returns:
324            DatasetRun: The dataset run.
325        """
326        try:
327            self.log.debug(
328                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
329            )
330            return self.client.datasets.get_runs(
331                dataset_name=dataset_name, run_name=dataset_run_name
332            )
333        except Exception as e:
334            self.log.exception(e)
335            raise e

Get a dataset run.

Arguments:
  • dataset_name: Name of the dataset.
  • dataset_run_name: Name of the dataset run.
Returns:

DatasetRun: The dataset run.

def create_dataset( self, name: str, description: Optional[str] = None, metadata: Optional[Any] = None) -> langfuse.api.resources.commons.types.dataset.Dataset:
337    def create_dataset(
338        self,
339        name: str,
340        description: Optional[str] = None,
341        metadata: Optional[Any] = None,
342    ) -> Dataset:
343        """Create a dataset with the given name on Langfuse.
344
345        Args:
346            name: Name of the dataset to create.
347            description: Description of the dataset. Defaults to None.
348            metadata: Additional metadata. Defaults to None.
349
350        Returns:
351            Dataset: The created dataset as returned by the Langfuse API.
352        """
353        try:
354            body = CreateDatasetRequest(
355                name=name, description=description, metadata=metadata
356            )
357            self.log.debug(f"Creating datasets {body}")
358            return self.client.datasets.create(request=body)
359        except Exception as e:
360            self.log.exception(e)
361            raise e

Create a dataset with the given name on Langfuse.

Arguments:
  • name: Name of the dataset to create.
  • description: Description of the dataset. Defaults to None.
  • metadata: Additional metadata. Defaults to None.
Returns:

Dataset: The created dataset as returned by the Langfuse API.

def create_dataset_item( self, dataset_name: str, input: Optional[Any] = None, expected_output: Optional[Any] = None, metadata: Optional[Any] = None, source_trace_id: Optional[str] = None, source_observation_id: Optional[str] = None, status: Optional[langfuse.api.resources.commons.types.dataset_status.DatasetStatus] = None, id: Optional[str] = None) -> langfuse.api.resources.commons.types.dataset_item.DatasetItem:
363    def create_dataset_item(
364        self,
365        dataset_name: str,
366        input: Optional[Any] = None,
367        expected_output: Optional[Any] = None,
368        metadata: Optional[Any] = None,
369        source_trace_id: Optional[str] = None,
370        source_observation_id: Optional[str] = None,
371        status: Optional[DatasetStatus] = None,
372        id: Optional[str] = None,
373    ) -> DatasetItem:
374        """Create a dataset item.
375
376        Upserts if an item with id already exists.
377
378        Args:
379            dataset_name: Name of the dataset in which the dataset item should be created.
380            input: Input data. Defaults to None. Can contain any dict, list or scalar.
381            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
382            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
383            source_trace_id: Id of the source trace. Defaults to None.
384            source_observation_id: Id of the source observation. Defaults to None.
385            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
386            id: Id of the dataset item. Defaults to None.
387
388        Returns:
389            DatasetItem: The created dataset item as returned by the Langfuse API.
390
391        Example:
392            ```python
393            from langfuse import Langfuse
394
395            langfuse = Langfuse()
396
397            # Uploading items to the Langfuse dataset named "capital_cities"
398            langfuse.create_dataset_item(
399                dataset_name="capital_cities",
400                input={"input": {"country": "Italy"}},
401                expected_output={"expected_output": "Rome"},
402                metadata={"foo": "bar"}
403            )
404            ```
405        """
406        try:
407            body = CreateDatasetItemRequest(
408                datasetName=dataset_name,
409                input=input,
410                expectedOutput=expected_output,
411                metadata=metadata,
412                sourceTraceId=source_trace_id,
413                sourceObservationId=source_observation_id,
414                status=status,
415                id=id,
416            )
417            self.log.debug(f"Creating dataset item {body}")
418            return self.client.dataset_items.create(request=body)
419        except Exception as e:
420            self.log.exception(e)
421            raise e

Create a dataset item.

Upserts if an item with id already exists.

Arguments:
  • dataset_name: Name of the dataset in which the dataset item should be created.
  • input: Input data. Defaults to None. Can contain any dict, list or scalar.
  • expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
  • metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
  • source_trace_id: Id of the source trace. Defaults to None.
  • source_observation_id: Id of the source observation. Defaults to None.
  • status: Status of the dataset item. Defaults to ACTIVE for newly created items.
  • id: Id of the dataset item. Defaults to None.
Returns:

DatasetItem: The created dataset item as returned by the Langfuse API.

Example:
from langfuse import Langfuse

langfuse = Langfuse()

# Uploading items to the Langfuse dataset named "capital_cities"
langfuse.create_dataset_item(
    dataset_name="capital_cities",
    input={"input": {"country": "Italy"}},
    expected_output={"expected_output": "Rome"},
    metadata={"foo": "bar"}
)
def get_trace( self, id: str) -> langfuse.api.resources.commons.types.trace_with_full_details.TraceWithFullDetails:
423    def get_trace(
424        self,
425        id: str,
426    ) -> TraceWithFullDetails:
427        """Get a trace via the Langfuse API by its id.
428
429        Args:
430            id: The id of the trace to fetch.
431
432        Returns:
433            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
434
435        Raises:
436            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
437        """
438        try:
439            self.log.debug(f"Getting trace {id}")
440            return self.client.trace.get(id)
441        except Exception as e:
442            self.log.exception(e)
443            raise e

Get a trace via the Langfuse API by its id.

Arguments:
  • id: The id of the trace to fetch.
Returns:

TraceWithFullDetails: The trace with full details as returned by the Langfuse API.

Raises:
  • Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
def get_observations( self, *, page: Optional[int] = None, limit: Optional[int] = None, name: Optional[str] = None, user_id: Optional[str] = None, trace_id: Optional[str] = None, parent_observation_id: Optional[str] = None, type: Optional[str] = None) -> langfuse.api.resources.observations.types.observations_views.ObservationsViews:
445    def get_observations(
446        self,
447        *,
448        page: typing.Optional[int] = None,
449        limit: typing.Optional[int] = None,
450        name: typing.Optional[str] = None,
451        user_id: typing.Optional[str] = None,
452        trace_id: typing.Optional[str] = None,
453        parent_observation_id: typing.Optional[str] = None,
454        type: typing.Optional[str] = None,
455    ) -> ObservationsViews:
456        """Get a list of observations in the current project matching the given parameters.
457
458        Args:
459            page (Optional[int]): Page number of the observations to return. Defaults to None.
460            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
461            name (Optional[str]): Name of the observations to return. Defaults to None.
462            user_id (Optional[str]): User identifier. Defaults to None.
463            trace_id (Optional[str]): Trace identifier. Defaults to None.
464            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
465            type (Optional[str]): Type of the observation. Defaults to None.
466
467        Returns:
468            List of ObservationsViews: List of observations in the project matching the given parameters.
469
470        Raises:
471            Exception: If an error occurred during the request.
472        """
473        try:
474            self.log.debug(
475                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {type}"
476            )
477            return self.client.observations.get_many(
478                page=page,
479                limit=limit,
480                name=name,
481                user_id=user_id,
482                trace_id=trace_id,
483                parent_observation_id=parent_observation_id,
484                type=type,
485            )
486        except Exception as e:
487            self.log.exception(e)
488            raise e

Get a list of observations in the current project matching the given parameters.

Arguments:
  • page (Optional[int]): Page number of the observations to return. Defaults to None.
  • limit (Optional[int]): Maximum number of observations to return. Defaults to None.
  • name (Optional[str]): Name of the observations to return. Defaults to None.
  • user_id (Optional[str]): User identifier. Defaults to None.
  • trace_id (Optional[str]): Trace identifier. Defaults to None.
  • parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
  • type (Optional[str]): Type of the observation. Defaults to None.
Returns:

List of ObservationsViews: List of observations in the project matching the given parameters.

Raises:
  • Exception: If an error occurred during the request.
def get_generations( self, *, page: Optional[int] = None, limit: Optional[int] = None, name: Optional[str] = None, user_id: Optional[str] = None, trace_id: Optional[str] = None, parent_observation_id: Optional[str] = None) -> langfuse.api.resources.observations.types.observations_views.ObservationsViews:
490    def get_generations(
491        self,
492        *,
493        page: typing.Optional[int] =