langfuse.client

   1import datetime as dt
   2import logging
   3import os
   4import time
   5import tracemalloc
   6import typing
   7import urllib.parse
   8import uuid
   9import warnings
  10from contextlib import contextmanager
  11from dataclasses import dataclass
  12from enum import Enum
  13from typing import Any, Dict, List, Literal, Optional, Sequence, Union, overload
  14
  15import backoff
  16import httpx
  17
  18from langfuse.api.resources.commons.types.dataset_run_with_items import (
  19    DatasetRunWithItems,
  20)
  21from langfuse.api.resources.commons.types.observations_view import ObservationsView
  22from langfuse.api.resources.commons.types.session import Session
  23from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails
  24from langfuse.api.resources.datasets.types.paginated_dataset_runs import (
  25    PaginatedDatasetRuns,
  26)
  27from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody
  28from langfuse.api.resources.ingestion.types.create_generation_body import (
  29    CreateGenerationBody,
  30)
  31from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody
  32from langfuse.api.resources.ingestion.types.score_body import ScoreBody
  33from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
  34from langfuse.api.resources.ingestion.types.trace_body import TraceBody
  35from langfuse.api.resources.ingestion.types.update_generation_body import (
  36    UpdateGenerationBody,
  37)
  38from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody
  39from langfuse.api.resources.observations.types.observations_views import (
  40    ObservationsViews,
  41)
  42from langfuse.api.resources.prompts.types import (
  43    CreatePromptRequest_Chat,
  44    CreatePromptRequest_Text,
  45    Prompt_Chat,
  46    Prompt_Text,
  47)
  48from langfuse.api.resources.trace.types.traces import Traces
  49from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
  50    MetaResponse,
  51)
  52from langfuse.model import (
  53    ChatMessageDict,
  54    ChatPromptClient,
  55    CreateDatasetItemRequest,
  56    CreateDatasetRequest,
  57    CreateDatasetRunItemRequest,
  58    DatasetItem,
  59    DatasetStatus,
  60    ModelUsage,
  61    PromptClient,
  62    TextPromptClient,
  63)
  64from langfuse.parse_error import handle_fern_exception
  65from langfuse.prompt_cache import PromptCache
  66
  67try:
  68    import pydantic.v1 as pydantic  # type: ignore
  69except ImportError:
  70    import pydantic  # type: ignore
  71
  72from langfuse._task_manager.task_manager import TaskManager
  73from langfuse.api.client import FernLangfuse
  74from langfuse.environment import get_common_release_envs
  75from langfuse.logging import clean_logger
  76from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
  77from langfuse.request import LangfuseClient
  78from langfuse.types import MaskFunction, ScoreDataType, SpanLevel
  79from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
  80
  81from .version import __version__ as version
  82
  83
  84@dataclass
  85class FetchTracesResponse:
  86    """Response object for fetch_traces method."""
  87
  88    data: typing.List[TraceWithDetails]
  89    meta: MetaResponse
  90
  91
  92@dataclass
  93class FetchTraceResponse:
  94    """Response object for fetch_trace method."""
  95
  96    data: TraceWithFullDetails
  97
  98
  99@dataclass
 100class FetchObservationsResponse:
 101    """Response object for fetch_observations method."""
 102
 103    data: typing.List[ObservationsView]
 104    meta: MetaResponse
 105
 106
 107@dataclass
 108class FetchObservationResponse:
 109    """Response object for fetch_observation method."""
 110
 111    data: Observation
 112
 113
 114@dataclass
 115class FetchSessionsResponse:
 116    """Response object for fetch_sessions method."""
 117
 118    data: typing.List[Session]
 119    meta: MetaResponse
 120
 121
 122class Langfuse(object):
 123    """Langfuse Python client.
 124
 125    Attributes:
 126        log (logging.Logger): Logger for the Langfuse client.
 127        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
 128        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
 129        client (FernLangfuse): Core interface for Langfuse API interaction.
 130        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
 131        release (str): Identifies the release number or hash of the application.
 132        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
 133
 134    Example:
 135        Initiating the Langfuse client should always be first step to use Langfuse.
 136        ```python
 137        import os
 138        from langfuse import Langfuse
 139
 140        # Set the public and secret keys as environment variables
 141        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 142        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 143
 144        # Initialize the Langfuse client using the credentials
 145        langfuse = Langfuse()
 146        ```
 147    """
 148
 149    log = logging.getLogger("langfuse")
 150    """Logger for the Langfuse client."""
 151
 152    host: str
 153    """Host of Langfuse API."""
 154
 155    def __init__(
 156        self,
 157        public_key: Optional[str] = None,
 158        secret_key: Optional[str] = None,
 159        host: Optional[str] = None,
 160        release: Optional[str] = None,
 161        debug: bool = False,
 162        threads: Optional[int] = None,
 163        flush_at: Optional[int] = None,
 164        flush_interval: Optional[float] = None,
 165        max_retries: Optional[int] = None,
 166        timeout: Optional[int] = None,  # seconds
 167        sdk_integration: Optional[str] = "default",
 168        httpx_client: Optional[httpx.Client] = None,
 169        enabled: Optional[bool] = True,
 170        sample_rate: Optional[float] = None,
 171        mask: Optional[MaskFunction] = None,
 172    ):
 173        """Initialize the Langfuse client.
 174
 175        Args:
 176            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 177            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 178            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 179            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 180            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 181            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 182            flush_at: Max batch size that's sent to the API.
 183            flush_interval: Max delay until a new batch is sent to the API.
 184            max_retries: Max number of retries in case of API/network errors.
 185            timeout: Timeout of API requests in seconds. Defaults to 20 seconds.
 186            httpx_client: Pass your own httpx client for more customizability of requests.
 187            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 188            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 189            sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
 190            mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
 191
 192        Raises:
 193            ValueError: If public_key or secret_key are not set and not found in environment variables.
 194
 195        Example:
 196            Initiating the Langfuse client should always be first step to use Langfuse.
 197            ```python
 198            import os
 199            from langfuse import Langfuse
 200
 201            # Set the public and secret keys as environment variables
 202            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 203            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 204
 205            # Initialize the Langfuse client using the credentials
 206            langfuse = Langfuse()
 207            ```
 208        """
 209        self.enabled = enabled
 210        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 211        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 212        sample_rate = (
 213            sample_rate
 214            if sample_rate
 215            is not None  # needs explicit None check, as 0 is a valid value
 216            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 217        )
 218
 219        if sample_rate is not None and (
 220            sample_rate > 1 or sample_rate < 0
 221        ):  # default value 1 will be set in the taskmanager
 222            self.enabled = False
 223            self.log.warning(
 224                "Langfuse client is disabled since the sample rate provided is not between 0 and 1."
 225            )
 226
 227        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 228        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 229        flush_interval = flush_interval or float(
 230            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 231        )
 232
 233        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 234        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 235
 236        if not self.enabled:
 237            self.log.warning(
 238                "Langfuse client is disabled. No observability data will be sent."
 239            )
 240
 241        elif not public_key:
 242            self.enabled = False
 243            self.log.warning(
 244                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 245            )
 246
 247        elif not secret_key:
 248            self.enabled = False
 249            self.log.warning(
 250                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 251            )
 252
 253        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 254
 255        if set_debug is True:
 256            # Ensures that debug level messages are logged when debug mode is on.
 257            # Otherwise, defaults to WARNING level.
 258            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 259            logging.basicConfig()
 260            # Set level for all loggers under langfuse package
 261            logging.getLogger("langfuse").setLevel(logging.DEBUG)
 262
 263            clean_logger()
 264        else:
 265            logging.getLogger("langfuse").setLevel(logging.WARNING)
 266            clean_logger()
 267
 268        self.base_url = (
 269            host
 270            if host
 271            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 272        )
 273
 274        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 275
 276        self.client = FernLangfuse(
 277            base_url=self.base_url,
 278            username=public_key,
 279            password=secret_key,
 280            x_langfuse_sdk_name="python",
 281            x_langfuse_sdk_version=version,
 282            x_langfuse_public_key=public_key,
 283            httpx_client=self.httpx_client,
 284        )
 285
 286        langfuse_client = LangfuseClient(
 287            public_key=public_key,
 288            secret_key=secret_key,
 289            base_url=self.base_url,
 290            version=version,
 291            timeout=timeout,
 292            session=self.httpx_client,
 293        )
 294
 295        args = {
 296            "threads": threads,
 297            "flush_at": flush_at,
 298            "flush_interval": flush_interval,
 299            "max_retries": max_retries,
 300            "client": langfuse_client,
 301            "api_client": self.client,
 302            "public_key": public_key,
 303            "sdk_name": "python",
 304            "sdk_version": version,
 305            "sdk_integration": sdk_integration,
 306            "enabled": self.enabled,
 307            "sample_rate": sample_rate,
 308            "mask": mask,
 309        }
 310
 311        self.task_manager = TaskManager(**args)
 312
 313        self.trace_id = None
 314
 315        self.release = self._get_release_value(release)
 316
 317        self.prompt_cache = PromptCache()
 318
 319    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 320        if release:
 321            return release
 322        elif "LANGFUSE_RELEASE" in os.environ:
 323            return os.environ["LANGFUSE_RELEASE"]
 324        else:
 325            return get_common_release_envs()
 326
 327    def get_trace_id(self) -> str:
 328        """Get the current trace id."""
 329        return self.trace_id
 330
 331    def get_trace_url(self) -> str:
 332        """Get the URL of the current trace to view it in the Langfuse UI."""
 333        return f"{self.base_url}/trace/{self.trace_id}"
 334
 335    def get_dataset(
 336        self, name: str, *, fetch_items_page_size: Optional[int] = 50
 337    ) -> "DatasetClient":
 338        """Fetch a dataset by its name.
 339
 340        Args:
 341            name (str): The name of the dataset to fetch.
 342            fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
 343
 344        Returns:
 345            DatasetClient: The dataset with the given name.
 346        """
 347        try:
 348            self.log.debug(f"Getting datasets {name}")
 349            dataset = self.client.datasets.get(dataset_name=name)
 350
 351            dataset_items = []
 352            page = 1
 353            while True:
 354                new_items = self.client.dataset_items.list(
 355                    dataset_name=self._url_encode(name),
 356                    page=page,
 357                    limit=fetch_items_page_size,
 358                )
 359                dataset_items.extend(new_items.data)
 360                if new_items.meta.total_pages <= page:
 361                    break
 362                page += 1
 363
 364            items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
 365
 366            return DatasetClient(dataset, items=items)
 367        except Exception as e:
 368            handle_fern_exception(e)
 369            raise e
 370
 371    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 372        """Get the dataset item with the given id."""
 373        try:
 374            self.log.debug(f"Getting dataset item {id}")
 375            dataset_item = self.client.dataset_items.get(id=id)
 376            return DatasetItemClient(dataset_item, langfuse=self)
 377        except Exception as e:
 378            handle_fern_exception(e)
 379            raise e
 380
 381    def auth_check(self) -> bool:
 382        """Check if the provided credentials (public and secret key) are valid.
 383
 384        Raises:
 385            Exception: If no projects were found for the provided credentials.
 386
 387        Note:
 388            This method is blocking. It is discouraged to use it in production code.
 389        """
 390        try:
 391            projects = self.client.projects.get()
 392            self.log.debug(
 393                f"Auth check successful, found {len(projects.data)} projects"
 394            )
 395            if len(projects.data) == 0:
 396                raise Exception(
 397                    "Auth check failed, no project found for the keys provided."
 398                )
 399            return True
 400
 401        except Exception as e:
 402            handle_fern_exception(e)
 403            raise e
 404
 405    def get_dataset_runs(
 406        self,
 407        dataset_name: str,
 408        *,
 409        page: typing.Optional[int] = None,
 410        limit: typing.Optional[int] = None,
 411    ) -> PaginatedDatasetRuns:
 412        """Get all dataset runs.
 413
 414        Args:
 415            dataset_name (str): Name of the dataset.
 416            page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None.
 417            limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50.
 418
 419        Returns:
 420            PaginatedDatasetRuns: The dataset runs.
 421        """
 422        try:
 423            self.log.debug("Getting dataset runs")
 424            return self.client.datasets.get_runs(
 425                dataset_name=self._url_encode(dataset_name), page=page, limit=limit
 426            )
 427        except Exception as e:
 428            handle_fern_exception(e)
 429            raise e
 430
 431    def get_dataset_run(
 432        self,
 433        dataset_name: str,
 434        dataset_run_name: str,
 435    ) -> DatasetRunWithItems:
 436        """Get a dataset run.
 437
 438        Args:
 439            dataset_name: Name of the dataset.
 440            dataset_run_name: Name of the dataset run.
 441
 442        Returns:
 443            DatasetRunWithItems: The dataset run.
 444        """
 445        try:
 446            self.log.debug(
 447                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 448            )
 449            return self.client.datasets.get_run(
 450                dataset_name=self._url_encode(dataset_name),
 451                run_name=self._url_encode(dataset_run_name),
 452            )
 453        except Exception as e:
 454            handle_fern_exception(e)
 455            raise e
 456
 457    def create_dataset(
 458        self,
 459        name: str,
 460        description: Optional[str] = None,
 461        metadata: Optional[Any] = None,
 462    ) -> Dataset:
 463        """Create a dataset with the given name on Langfuse.
 464
 465        Args:
 466            name: Name of the dataset to create.
 467            description: Description of the dataset. Defaults to None.
 468            metadata: Additional metadata. Defaults to None.
 469
 470        Returns:
 471            Dataset: The created dataset as returned by the Langfuse API.
 472        """
 473        try:
 474            body = CreateDatasetRequest(
 475                name=name, description=description, metadata=metadata
 476            )
 477            self.log.debug(f"Creating datasets {body}")
 478            return self.client.datasets.create(request=body)
 479        except Exception as e:
 480            handle_fern_exception(e)
 481            raise e
 482
 483    def create_dataset_item(
 484        self,
 485        dataset_name: str,
 486        input: Optional[Any] = None,
 487        expected_output: Optional[Any] = None,
 488        metadata: Optional[Any] = None,
 489        source_trace_id: Optional[str] = None,
 490        source_observation_id: Optional[str] = None,
 491        status: Optional[DatasetStatus] = None,
 492        id: Optional[str] = None,
 493    ) -> DatasetItem:
 494        """Create a dataset item.
 495
 496        Upserts if an item with id already exists.
 497
 498        Args:
 499            dataset_name: Name of the dataset in which the dataset item should be created.
 500            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 501            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 502            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 503            source_trace_id: Id of the source trace. Defaults to None.
 504            source_observation_id: Id of the source observation. Defaults to None.
 505            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 506            id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
 507
 508        Returns:
 509            DatasetItem: The created dataset item as returned by the Langfuse API.
 510
 511        Example:
 512            ```python
 513            from langfuse import Langfuse
 514
 515            langfuse = Langfuse()
 516
 517            # Uploading items to the Langfuse dataset named "capital_cities"
 518            langfuse.create_dataset_item(
 519                dataset_name="capital_cities",
 520                input={"input": {"country": "Italy"}},
 521                expected_output={"expected_output": "Rome"},
 522                metadata={"foo": "bar"}
 523            )
 524            ```
 525        """
 526        try:
 527            body = CreateDatasetItemRequest(
 528                datasetName=dataset_name,
 529                input=input,
 530                expectedOutput=expected_output,
 531                metadata=metadata,
 532                sourceTraceId=source_trace_id,
 533                sourceObservationId=source_observation_id,
 534                status=status,
 535                id=id,
 536            )
 537            self.log.debug(f"Creating dataset item {body}")
 538            return self.client.dataset_items.create(request=body)
 539        except Exception as e:
 540            handle_fern_exception(e)
 541            raise e
 542
 543    def fetch_trace(
 544        self,
 545        id: str,
 546    ) -> FetchTraceResponse:
 547        """Fetch a trace via the Langfuse API by its id.
 548
 549        Args:
 550            id: The id of the trace to fetch.
 551
 552        Returns:
 553            FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`.
 554
 555        Raises:
 556            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 557        """
 558        try:
 559            self.log.debug(f"Getting trace {id}")
 560            trace = self.client.trace.get(id)
 561            return FetchTraceResponse(data=trace)
 562        except Exception as e:
 563            handle_fern_exception(e)
 564            raise e
 565
 566    def get_trace(
 567        self,
 568        id: str,
 569    ) -> TraceWithFullDetails:
 570        """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead.
 571
 572        Args:
 573            id: The id of the trace to fetch.
 574
 575        Returns:
 576            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 577
 578        Raises:
 579            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 580        """
 581        warnings.warn(
 582            "get_trace is deprecated, use fetch_trace instead.",
 583            DeprecationWarning,
 584        )
 585
 586        try:
 587            self.log.debug(f"Getting trace {id}")
 588            return self.client.trace.get(id)
 589        except Exception as e:
 590            handle_fern_exception(e)
 591            raise e
 592
 593    def fetch_traces(
 594        self,
 595        *,
 596        page: Optional[int] = None,
 597        limit: Optional[int] = None,
 598        user_id: Optional[str] = None,
 599        name: Optional[str] = None,
 600        session_id: Optional[str] = None,
 601        from_timestamp: Optional[dt.datetime] = None,
 602        to_timestamp: Optional[dt.datetime] = None,
 603        order_by: Optional[str] = None,
 604        tags: Optional[Union[str, Sequence[str]]] = None,
 605    ) -> FetchTracesResponse:
 606        """Fetch a list of traces in the current project matching the given parameters.
 607
 608        Args:
 609            page (Optional[int]): Page number, starts at 1. Defaults to None.
 610            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 611            name (Optional[str]): Filter by name of traces. Defaults to None.
 612            user_id (Optional[str]): Filter by user_id. Defaults to None.
 613            session_id (Optional[str]): Filter by session_id. Defaults to None.
 614            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 615            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 616            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 617            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 618
 619        Returns:
 620            FetchTracesResponse, list of traces on `data` and metadata on `meta`.
 621
 622        Raises:
 623            Exception: If an error occurred during the request.
 624        """
 625        try:
 626            self.log.debug(
 627                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 628            )
 629            res = self.client.trace.list(
 630                page=page,
 631                limit=limit,
 632                name=name,
 633                user_id=user_id,
 634                session_id=session_id,
 635                from_timestamp=from_timestamp,
 636                to_timestamp=to_timestamp,
 637                order_by=order_by,
 638                tags=tags,
 639            )
 640            return FetchTracesResponse(data=res.data, meta=res.meta)
 641        except Exception as e:
 642            handle_fern_exception(e)
 643            raise e
 644
 645    def get_traces(
 646        self,
 647        *,
 648        page: Optional[int] = None,
 649        limit: Optional[int] = None,
 650        user_id: Optional[str] = None,
 651        name: Optional[str] = None,
 652        session_id: Optional[str] = None,
 653        from_timestamp: Optional[dt.datetime] = None,
 654        to_timestamp: Optional[dt.datetime] = None,
 655        order_by: Optional[str] = None,
 656        tags: Optional[Union[str, Sequence[str]]] = None,
 657    ) -> Traces:
 658        """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead.
 659
 660        Args:
 661            page (Optional[int]): Page number, starts at 1. Defaults to None.
 662            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 663            name (Optional[str]): Filter by name of traces. Defaults to None.
 664            user_id (Optional[str]): Filter by user_id. Defaults to None.
 665            session_id (Optional[str]): Filter by session_id. Defaults to None.
 666            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 667            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 668            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 669            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 670
 671        Returns:
 672            List of Traces
 673
 674        Raises:
 675            Exception: If an error occurred during the request.
 676        """
 677        warnings.warn(
 678            "get_traces is deprecated, use fetch_traces instead.",
 679            DeprecationWarning,
 680        )
 681        try:
 682            self.log.debug(
 683                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 684            )
 685            return self.client.trace.list(
 686                page=page,
 687                limit=limit,
 688                name=name,
 689                user_id=user_id,
 690                session_id=session_id,
 691                from_timestamp=from_timestamp,
 692                to_timestamp=to_timestamp,
 693                order_by=order_by,
 694                tags=tags,
 695            )
 696        except Exception as e:
 697            handle_fern_exception(e)
 698            raise e
 699
 700    def fetch_observations(
 701        self,
 702        *,
 703        page: typing.Optional[int] = None,
 704        limit: typing.Optional[int] = None,
 705        name: typing.Optional[str] = None,
 706        user_id: typing.Optional[str] = None,
 707        trace_id: typing.Optional[str] = None,
 708        parent_observation_id: typing.Optional[str] = None,
 709        from_start_time: typing.Optional[dt.datetime] = None,
 710        to_start_time: typing.Optional[dt.datetime] = None,
 711        type: typing.Optional[str] = None,
 712    ) -> FetchObservationsResponse:
 713        """Get a list of observations in the current project matching the given parameters.
 714
 715        Args:
 716            page (Optional[int]): Page number of the observations to return. Defaults to None.
 717            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 718            name (Optional[str]): Name of the observations to return. Defaults to None.
 719            user_id (Optional[str]): User identifier. Defaults to None.
 720            trace_id (Optional[str]): Trace identifier. Defaults to None.
 721            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 722            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 723            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 724            type (Optional[str]): Type of the observation. Defaults to None.
 725
 726        Returns:
 727            FetchObservationsResponse, list of observations on `data` and metadata on `meta`.
 728
 729        Raises:
 730            Exception: If an error occurred during the request.
 731        """
 732        try:
 733            self.log.debug(
 734                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 735            )
 736            res = self.client.observations.get_many(
 737                page=page,
 738                limit=limit,
 739                name=name,
 740                user_id=user_id,
 741                trace_id=trace_id,
 742                parent_observation_id=parent_observation_id,
 743                from_start_time=from_start_time,
 744                to_start_time=to_start_time,
 745                type=type,
 746            )
 747            return FetchObservationsResponse(data=res.data, meta=res.meta)
 748        except Exception as e:
 749            self.log.exception(e)
 750            raise e
 751
 752    def get_observations(
 753        self,
 754        *,
 755        page: typing.Optional[int] = None,
 756        limit: typing.Optional[int] = None,
 757        name: typing.Optional[str] = None,
 758        user_id: typing.Optional[str] = None,
 759        trace_id: typing.Optional[str] = None,
 760        parent_observation_id: typing.Optional[str] = None,
 761        from_start_time: typing.Optional[dt.datetime] = None,
 762        to_start_time: typing.Optional[dt.datetime] = None,
 763        type: typing.Optional[str] = None,
 764    ) -> ObservationsViews:
 765        """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead.
 766
 767        Args:
 768            page (Optional[int]): Page number of the observations to return. Defaults to None.
 769            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 770            name (Optional[str]): Name of the observations to return. Defaults to None.
 771            user_id (Optional[str]): User identifier. Defaults to None.
 772            trace_id (Optional[str]): Trace identifier. Defaults to None.
 773            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 774            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 775            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 776            type (Optional[str]): Type of the observation. Defaults to None.
 777
 778        Returns:
 779            List of ObservationsViews: List of observations in the project matching the given parameters.
 780
 781        Raises:
 782            Exception: If an error occurred during the request.
 783        """
 784        warnings.warn(
 785            "get_observations is deprecated, use fetch_observations instead.",
 786            DeprecationWarning,
 787        )
 788        try:
 789            self.log.debug(
 790                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 791            )
 792            return self.client.observations.get_many(
 793                page=page,
 794                limit=limit,
 795                name=name,
 796                user_id=user_id,
 797                trace_id=trace_id,
 798                parent_observation_id=parent_observation_id,
 799                from_start_time=from_start_time,
 800                to_start_time=to_start_time,
 801                type=type,
 802            )
 803        except Exception as e:
 804            handle_fern_exception(e)
 805            raise e
 806
 807    def get_generations(
 808        self,
 809        *,
 810        page: typing.Optional[int] = None,
 811        limit: typing.Optional[int] = None,
 812        name: typing.Optional[str] = None,
 813        user_id: typing.Optional[str] = None,
 814        trace_id: typing.Optional[str] = None,
 815        from_start_time: typing.Optional[dt.datetime] = None,
 816        to_start_time: typing.Optional[dt.datetime] = None,
 817        parent_observation_id: typing.Optional[str] = None,
 818    ) -> ObservationsViews:
 819        """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead.
 820
 821        Args:
 822            page (Optional[int]): Page number of the generations to return. Defaults to None.
 823            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 824            name (Optional[str]): Name of the generations to return. Defaults to None.
 825            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 826            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 827            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 828            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 829            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 830
 831        Returns:
 832            List of ObservationsViews: List of generations in the project matching the given parameters.
 833
 834        Raises:
 835            Exception: If an error occurred during the request.
 836        """
 837        warnings.warn(
 838            "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.",
 839            DeprecationWarning,
 840        )
 841        return self.get_observations(
 842            page=page,
 843            limit=limit,
 844            name=name,
 845            user_id=user_id,
 846            trace_id=trace_id,
 847            parent_observation_id=parent_observation_id,
 848            from_start_time=from_start_time,
 849            to_start_time=to_start_time,
 850            type="GENERATION",
 851        )
 852
 853    def fetch_observation(
 854        self,
 855        id: str,
 856    ) -> FetchObservationResponse:
 857        """Get an observation in the current project with the given identifier.
 858
 859        Args:
 860            id: The identifier of the observation to fetch.
 861
 862        Returns:
 863            FetchObservationResponse: The observation with the given id on `data`.
 864
 865        Raises:
 866            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 867        """
 868        try:
 869            self.log.debug(f"Getting observation {id}")
 870            observation = self.client.observations.get(id)
 871            return FetchObservationResponse(data=observation)
 872        except Exception as e:
 873            handle_fern_exception(e)
 874            raise e
 875
 876    def get_observation(
 877        self,
 878        id: str,
 879    ) -> Observation:
 880        """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead.
 881
 882        Args:
 883            id: The identifier of the observation to fetch.
 884
 885        Raises:
 886            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 887        """
 888        warnings.warn(
 889            "get_observation is deprecated, use fetch_observation instead.",
 890            DeprecationWarning,
 891        )
 892        try:
 893            self.log.debug(f"Getting observation {id}")
 894            return self.client.observations.get(id)
 895        except Exception as e:
 896            handle_fern_exception(e)
 897            raise e
 898
 899    def fetch_sessions(
 900        self,
 901        *,
 902        page: typing.Optional[int] = None,
 903        limit: typing.Optional[int] = None,
 904        from_timestamp: typing.Optional[dt.datetime] = None,
 905        to_timestamp: typing.Optional[dt.datetime] = None,
 906    ) -> FetchSessionsResponse:
 907        """Get a list of sessions in the current project.
 908
 909        Args:
 910            page (Optional[int]): Page number of the sessions to return. Defaults to None.
 911            limit (Optional[int]): Maximum number of sessions to return. Defaults to None.
 912            from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None.
 913            to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None.
 914
 915        Returns:
 916            FetchSessionsResponse, list of sessions on `data` and metadata on `meta`.
 917
 918        Raises:
 919            Exception: If an error occurred during the request.
 920        """
 921        try:
 922            self.log.debug(
 923                f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}"
 924            )
 925            res = self.client.sessions.list(
 926                page=page,
 927                limit=limit,
 928                from_timestamp=from_timestamp,
 929                to_timestamp=to_timestamp,
 930            )
 931            return FetchSessionsResponse(data=res.data, meta=res.meta)
 932        except Exception as e:
 933            handle_fern_exception(e)
 934            raise e
 935
 936    @overload
 937    def get_prompt(
 938        self,
 939        name: str,
 940        version: Optional[int] = None,
 941        *,
 942        label: Optional[str] = None,
 943        type: Literal["chat"],
 944        cache_ttl_seconds: Optional[int] = None,
 945        fallback: Optional[List[ChatMessageDict]] = None,
 946        max_retries: Optional[int] = None,
 947        fetch_timeout_seconds: Optional[int] = None,
 948    ) -> ChatPromptClient: ...
 949
 950    @overload
 951    def get_prompt(
 952        self,
 953        name: str,
 954        version: Optional[int] = None,
 955        *,
 956        label: Optional[str] = None,
 957        type: Literal["text"] = "text",
 958        cache_ttl_seconds: Optional[int] = None,
 959        fallback: Optional[str] = None,
 960        max_retries: Optional[int] = None,
 961        fetch_timeout_seconds: Optional[int] = None,
 962    ) -> TextPromptClient: ...
 963
 964    def get_prompt(
 965        self,
 966        name: str,
 967        version: Optional[int] = None,
 968        *,
 969        label: Optional[str] = None,
 970        type: Literal["chat", "text"] = "text",
 971        cache_ttl_seconds: Optional[int] = None,
 972        fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None,
 973        max_retries: Optional[int] = None,
 974        fetch_timeout_seconds: Optional[int] = None,
 975    ) -> PromptClient:
 976        """Get a prompt.
 977
 978        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 979        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 980        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 981        return the expired prompt as a fallback.
 982
 983        Args:
 984            name (str): The name of the prompt to retrieve.
 985
 986        Keyword Args:
 987            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 988            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 989            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 990            keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0.
 991            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 992            fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None.
 993            max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds.
 994            fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default.
 995
 996        Returns:
 997            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
 998            - TextPromptClient, if type argument is 'text'.
 999            - ChatPromptClient, if type argument is 'chat'.
1000
1001        Raises:
1002            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
1003            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
1004        """
1005        if version is not None and label is not None:
1006            raise ValueError("Cannot specify both version and label at the same time.")
1007
1008        if not name:
1009            raise ValueError("Prompt name cannot be empty.")
1010
1011        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
1012        bounded_max_retries = self._get_bounded_max_retries(
1013            max_retries, default_max_retries=2, max_retries_upper_bound=4
1014        )
1015
1016        self.log.debug(f"Getting prompt '{cache_key}'")
1017        cached_prompt = self.prompt_cache.get(cache_key)
1018
1019        if cached_prompt is None or cache_ttl_seconds == 0:
1020            self.log.debug(
1021                f"Prompt '{cache_key}' not found in cache or caching disabled."
1022            )
1023            try:
1024                return self._fetch_prompt_and_update_cache(
1025                    name,
1026                    version=version,
1027                    label=label,
1028                    ttl_seconds=cache_ttl_seconds,
1029                    max_retries=bounded_max_retries,
1030                    fetch_timeout_seconds=fetch_timeout_seconds,
1031                )
1032            except Exception as e:
1033                if fallback:
1034                    self.log.warning(
1035                        f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}"
1036                    )
1037
1038                    fallback_client_args = {
1039                        "name": name,
1040                        "prompt": fallback,
1041                        "type": type,
1042                        "version": version or 0,
1043                        "config": {},
1044                        "labels": [label] if label else [],
1045                        "tags": [],
1046                    }
1047
1048                    if type == "text":
1049                        return TextPromptClient(
1050                            prompt=Prompt_Text(**fallback_client_args),
1051                            is_fallback=True,
1052                        )
1053
1054                    if type == "chat":
1055                        return ChatPromptClient(
1056                            prompt=Prompt_Chat(**fallback_client_args),
1057                            is_fallback=True,
1058                        )
1059
1060                raise e
1061
1062        if cached_prompt.is_expired():
1063            self.log.debug(f"Stale prompt '{cache_key}' found in cache.")
1064            try:
1065                # refresh prompt in background thread, refresh_prompt deduplicates tasks
1066                self.log.debug(f"Refreshing prompt '{cache_key}' in background.")
1067                self.prompt_cache.add_refresh_prompt_task(
1068                    cache_key,
1069                    lambda: self._fetch_prompt_and_update_cache(
1070                        name,
1071                        version=version,
1072                        label=label,
1073                        ttl_seconds=cache_ttl_seconds,
1074                        max_retries=bounded_max_retries,
1075                        fetch_timeout_seconds=fetch_timeout_seconds,
1076                    ),
1077                )
1078                self.log.debug(f"Returning stale prompt '{cache_key}' from cache.")
1079                # return stale prompt
1080                return cached_prompt.value
1081
1082            except Exception as e:
1083                self.log.warning(
1084                    f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}"
1085                )
1086                # creation of refresh prompt task failed, return stale prompt
1087                return cached_prompt.value
1088
1089        return cached_prompt.value
1090
1091    def _fetch_prompt_and_update_cache(
1092        self,
1093        name: str,
1094        *,
1095        version: Optional[int] = None,
1096        label: Optional[str] = None,
1097        ttl_seconds: Optional[int] = None,
1098        max_retries: int,
1099        fetch_timeout_seconds,
1100    ) -> PromptClient:
1101        try:
1102            cache_key = PromptCache.generate_cache_key(
1103                name, version=version, label=label
1104            )
1105
1106            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
1107
1108            @backoff.on_exception(
1109                backoff.constant, Exception, max_tries=max_retries, logger=None
1110            )
1111            def fetch_prompts():
1112                return self.client.prompts.get(
1113                    self._url_encode(name),
1114                    version=version,
1115                    label=label,
1116                    request_options={
1117                        "timeout_in_seconds": fetch_timeout_seconds,
1118                    }
1119                    if fetch_timeout_seconds is not None
1120                    else None,
1121                )
1122
1123            prompt_response = fetch_prompts()
1124
1125            if prompt_response.type == "chat":
1126                prompt = ChatPromptClient(prompt_response)
1127            else:
1128                prompt = TextPromptClient(prompt_response)
1129
1130            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
1131
1132            return prompt
1133
1134        except Exception as e:
1135            self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}")
1136            raise e
1137
1138    def _get_bounded_max_retries(
1139        self,
1140        max_retries: Optional[int],
1141        *,
1142        default_max_retries: int = 2,
1143        max_retries_upper_bound: int = 4,
1144    ) -> int:
1145        if max_retries is None:
1146            return default_max_retries
1147
1148        bounded_max_retries = min(
1149            max(max_retries, 0),
1150            max_retries_upper_bound,
1151        )
1152
1153        return bounded_max_retries
1154
1155    @overload
1156    def create_prompt(
1157        self,
1158        *,
1159        name: str,
1160        prompt: List[ChatMessageDict],
1161        is_active: Optional[bool] = None,  # deprecated
1162        labels: List[str] = [],
1163        tags: Optional[List[str]] = None,
1164        type: Optional[Literal["chat"]],
1165        config: Optional[Any] = None,
1166    ) -> ChatPromptClient: ...
1167
1168    @overload
1169    def create_prompt(
1170        self,
1171        *,
1172        name: str,
1173        prompt: str,
1174        is_active: Optional[bool] = None,  # deprecated
1175        labels: List[str] = [],
1176        tags: Optional[List[str]] = None,
1177        type: Optional[Literal["text"]] = "text",
1178        config: Optional[Any] = None,
1179    ) -> TextPromptClient: ...
1180
1181    def create_prompt(
1182        self,
1183        *,
1184        name: str,
1185        prompt: Union[str, List[ChatMessageDict]],
1186        is_active: Optional[bool] = None,  # deprecated
1187        labels: List[str] = [],
1188        tags: Optional[List[str]] = None,
1189        type: Optional[Literal["chat", "text"]] = "text",
1190        config: Optional[Any] = None,
1191    ) -> PromptClient:
1192        """Create a new prompt in Langfuse.
1193
1194        Keyword Args:
1195            name : The name of the prompt to be created.
1196            prompt : The content of the prompt to be created.
1197            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
1198            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
1199            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
1200            config: Additional structured data to be saved with the prompt. Defaults to None.
1201            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
1202
1203        Returns:
1204            TextPromptClient: The prompt if type argument is 'text'.
1205            ChatPromptClient: The prompt if type argument is 'chat'.
1206        """
1207        try:
1208            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
1209
1210            # Handle deprecated is_active flag
1211            if is_active:
1212                self.log.warning(
1213                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
1214                )
1215
1216                labels = labels if "production" in labels else labels + ["production"]
1217
1218            if type == "chat":
1219                if not isinstance(prompt, list):
1220                    raise ValueError(
1221                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
1222                    )
1223                request = CreatePromptRequest_Chat(
1224                    name=name,
1225                    prompt=prompt,
1226                    labels=labels,
1227                    tags=tags,
1228                    config=config or {},
1229                    type="chat",
1230                )
1231                server_prompt = self.client.prompts.create(request=request)
1232
1233                return ChatPromptClient(prompt=server_prompt)
1234
1235            if not isinstance(prompt, str):
1236                raise ValueError("For 'text' type, 'prompt' must be a string.")
1237
1238            request = CreatePromptRequest_Text(
1239                name=name,
1240                prompt=prompt,
1241                labels=labels,
1242                tags=tags,
1243                config=config or {},
1244                type="text",
1245            )
1246
1247            server_prompt = self.client.prompts.create(request=request)
1248            return TextPromptClient(prompt=server_prompt)
1249
1250        except Exception as e:
1251            handle_fern_exception(e)
1252            raise e
1253
1254    def _url_encode(self, url: str) -> str:
1255        return urllib.parse.quote(url)
1256
1257    def trace(
1258        self,
1259        *,
1260        id: typing.Optional[str] = None,
1261        name: typing.Optional[str] = None,
1262        user_id: typing.Optional[str] = None,
1263        session_id: typing.Optional[str] = None,
1264        version: typing.Optional[str] = None,
1265        input: typing.Optional[typing.Any] = None,
1266        output: typing.Optional[typing.Any] = None,
1267        metadata: typing.Optional[typing.Any] = None,
1268        tags: typing.Optional[typing.List[str]] = None,
1269        timestamp: typing.Optional[dt.datetime] = None,
1270        public: typing.Optional[bool] = None,
1271        **kwargs,
1272    ) -> "StatefulTraceClient":
1273        """Create a trace.
1274
1275        Args:
1276            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
1277            name: Identifier of the trace. Useful for sorting/filtering in the UI.
1278            input: The input of the trace. Can be any JSON object.
1279            output: The output of the trace. Can be any JSON object.
1280            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
1281            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
1282            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
1283            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
1284            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
1285            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
1286            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
1287            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
1288            **kwargs: Additional keyword arguments that can be included in the trace.
1289
1290        Returns:
1291            StatefulTraceClient: The created trace.
1292
1293        Example:
1294            ```python
1295            from langfuse import Langfuse
1296
1297            langfuse = Langfuse()
1298
1299            trace = langfuse.trace(
1300                name="example-application",
1301                user_id="user-1234")
1302            )
1303            ```
1304        """
1305        new_id = id or str(uuid.uuid4())
1306        self.trace_id = new_id
1307        try:
1308            new_dict = {
1309                "id": new_id,
1310                "name": name,
1311                "userId": user_id,
1312                "sessionId": session_id
1313                or kwargs.get("sessionId", None),  # backward compatibility
1314                "release": self.release,
1315                "version": version,
1316                "metadata": metadata,
1317                "input": input,
1318                "output": output,
1319                "tags": tags,
1320                "timestamp": timestamp or _get_timestamp(),
1321                "public": public,
1322            }
1323            if kwargs is not None:
1324                new_dict.update(kwargs)
1325
1326            new_body = TraceBody(**new_dict)
1327
1328            self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}")
1329            event = {
1330                "id": str(uuid.uuid4()),
1331                "type": "trace-create",
1332                "body": new_body,
1333            }
1334
1335            self.task_manager.add_task(
1336                event,
1337            )
1338
1339        except Exception as e:
1340            self.log.exception(e)
1341        finally:
1342            self._log_memory_usage()
1343
1344            return StatefulTraceClient(
1345                self.client, new_id, StateType.TRACE, new_id, self.task_manager
1346            )
1347
1348    def _log_memory_usage(self):
1349        try:
1350            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
1351            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
1352            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
1353
1354            if (
1355                not is_malloc_tracing_enabled
1356                or report_interval <= 0
1357                or round(time.monotonic()) % report_interval != 0
1358            ):
1359                return
1360
1361            snapshot = tracemalloc.take_snapshot().statistics("lineno")
1362
1363            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
1364            memory_usage_total_items = [f"{stat}" for stat in snapshot]
1365            memory_usage_langfuse_items = [
1366                stat for stat in memory_usage_total_items if "/langfuse/" in stat
1367            ]
1368
1369            logged_memory_usage = {
1370                "all_files": [f"{stat}" for stat in memory_usage_total_items][
1371                    :top_k_items
1372                ],
1373                "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
1374                    :top_k_items
1375                ],
1376                "total_usage": f"{total_memory_usage:.2f} MB",
1377                "langfuse_queue_length": self.task_manager._ingestion_queue.qsize(),
1378            }
1379
1380            self.log.debug("Memory usage: ", logged_memory_usage)
1381
1382            event = SdkLogBody(log=logged_memory_usage)
1383            self.task_manager.add_task(
1384                {
1385                    "id": str(uuid.uuid4()),
1386                    "type": "sdk-log",
1387                    "timestamp": _get_timestamp(),
1388                    "body": event.dict(),
1389                }
1390            )
1391
1392        except Exception as e:
1393            self.log.exception(e)
1394
1395    @overload
1396    def score(
1397        self,
1398        *,
1399        name: str,
1400        value: float,
1401        data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
1402        trace_id: typing.Optional[str] = None,
1403        id: typing.Optional[str] = None,
1404        comment: typing.Optional[str] = None,
1405        observation_id: typing.Optional[str] = None,
1406        config_id: typing.Optional[str] = None,
1407        **kwargs,
1408    ) -> "StatefulClient": ...
1409
1410    @overload
1411    def score(
1412        self,
1413        *,
1414        name: str,
1415        value: str,
1416        data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
1417        trace_id: typing.Optional[str] = None,
1418        id: typing.Optional[str] = None,
1419        comment: typing.Optional[str] = None,
1420        observation_id: typing.Optional[str] = None,
1421        config_id: typing.Optional[str] = None,
1422        **kwargs,
1423    ) -> "StatefulClient": ...
1424
1425    def score(
1426        self,
1427        *,
1428        name: str,
1429        value: typing.Union[float, str],
1430        data_type: typing.Optional[ScoreDataType] = None,
1431        trace_id: typing.Optional[str] = None,
1432        id: typing.Optional[str] = None,
1433        comment: typing.Optional[str] = None,
1434        observation_id: typing.Optional[str] = None,
1435        config_id: typing.Optional[str] = None,
1436        **kwargs,
1437    ) -> "StatefulClient":
1438        """Create a score attached to a trace (and optionally an observation).
1439
1440        Args:
1441            name (str): Identifier of the score.
1442            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
1443            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
1444              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
1445            trace_id (str): The id of the trace to which the score should be attached.
1446            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
1447            comment (Optional[str]): Additional context/explanation of the score.
1448            observation_id (Optional[str]): The id of the observation to which the score should be attached.
1449            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
1450            **kwargs: Additional keyword arguments to include in the score.
1451
1452        Returns:
1453            StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided).
1454
1455        Example:
1456            ```python
1457            from langfuse import Langfuse
1458
1459            langfuse = Langfuse()
1460
1461            # Create a trace
1462            trace = langfuse.trace(name="example-application")
1463
1464            # Get id of created trace
1465            trace_id = trace.id
1466
1467            # Add score to the trace
1468            trace = langfuse.score(
1469                trace_id=trace_id,
1470                name="user-explicit-feedback",
1471                value=0.9,
1472                comment="I like how personalized the response is"
1473            )
1474            ```
1475        """
1476        trace_id = trace_id or self.trace_id or str(uuid.uuid4())
1477        new_id = id or str(uuid.uuid4())
1478        try:
1479            new_dict = {
1480                "id": new_id,
1481                "trace_id": trace_id,
1482                "observation_id": observation_id,
1483                "name": name,
1484                "value": value,
1485                "data_type": data_type,
1486                "comment": comment,
1487                "config_id": config_id,
1488                **kwargs,
1489            }
1490
1491            self.log.debug(f"Creating score {new_dict}...")
1492            new_body = ScoreBody(**new_dict)
1493
1494            event = {
1495                "id": str(uuid.uuid4()),
1496                "type": "score-create",
1497                "body": new_body,
1498            }
1499            self.task_manager.add_task(event)
1500
1501        except Exception as e:
1502            self.log.exception(e)
1503        finally:
1504            if observation_id is not None:
1505                return StatefulClient(
1506                    self.client,
1507                    observation_id,
1508                    StateType.OBSERVATION,
1509                    trace_id,
1510                    self.task_manager,
1511                )
1512            else:
1513                return StatefulClient(
1514                    self.client, new_id, StateType.TRACE, new_id, self.task_manager
1515                )
1516
1517    def span(
1518        self,
1519        *,
1520        id: typing.Optional[str] = None,
1521        trace_id: typing.Optional[str] = None,
1522        parent_observation_id: typing.Optional[str] = None,
1523        name: typing.Optional[str] = None,
1524        start_time: typing.Optional[dt.datetime] = None,
1525        end_time: typing.Optional[dt.datetime] = None,
1526        metadata: typing.Optional[typing.Any] = None,
1527        level: typing.Optional[SpanLevel] = None,
1528        status_message: typing.Optional[str] = None,
1529        input: typing.Optional[typing.Any] = None,
1530        output: typing.Optional[typing.Any] = None,
1531        version: typing.Optional[str] = None,
1532        **kwargs,
1533    ) -> "StatefulSpanClient":
1534        """Create a span.
1535
1536        A span represents durations of units of work in a trace.
1537        Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1538
1539        If no trace_id is provided, a new trace is created just for this span.
1540
1541        Args:
1542            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
1543            trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated.
1544            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1545            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
1546            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
1547            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
1548            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
1549            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1550            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
1551            input (Optional[dict]): The input to the span. Can be any JSON object.
1552            output (Optional[dict]): The output to the span. Can be any JSON object.
1553            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1554            **kwargs: Additional keyword arguments to include in the span.
1555
1556        Returns:
1557            StatefulSpanClient: The created span.
1558
1559        Example:
1560            ```python
1561            from langfuse import Langfuse
1562
1563            langfuse = Langfuse()
1564
1565            trace = langfuse.trace(name = "llm-feature")
1566
1567            # Create a span
1568            retrieval = langfuse.span(name = "retrieval", trace_id = trace.id)
1569
1570            # Create a nested span
1571            nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id)
1572            ```
1573        """
1574        new_span_id = id or str(uuid.uuid4())
1575        new_trace_id = trace_id or str(uuid.uuid4())
1576        self.trace_id = new_trace_id
1577        try:
1578            span_body = {
1579                "id": new_span_id,
1580                "trace_id": new_trace_id,
1581                "name": name,
1582                "start_time": start_time or _get_timestamp(),
1583                "metadata": metadata,
1584                "input": input,
1585                "output": output,
1586                "level": level,
1587                "status_message": status_message,
1588                "parent_observation_id": parent_observation_id,
1589                "version": version,
1590                "end_time": end_time,
1591                "trace": {"release": self.release},
1592                **kwargs,
1593            }
1594
1595            if trace_id is None:
1596                self._generate_trace(new_trace_id, name or new_trace_id)
1597
1598            self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...")
1599
1600            span_body = CreateSpanBody(**span_body)
1601
1602            event = {
1603                "id": str(uuid.uuid4()),
1604                "type": "span-create",
1605                "body": span_body,
1606            }
1607
1608            self.task_manager.add_task(event)
1609
1610        except Exception as e:
1611            self.log.exception(e)
1612        finally:
1613            self._log_memory_usage()
1614
1615            return StatefulSpanClient(
1616                self.client,
1617                new_span_id,
1618                StateType.OBSERVATION,
1619                new_trace_id,
1620                self.task_manager,
1621            )
1622
1623    def event(
1624        self,
1625        *,
1626        id: typing.Optional[str] = None,
1627        trace_id: typing.Optional[str] = None,
1628        parent_observation_id: typing.Optional[str] = None,
1629        name: typing.Optional[str] = None,
1630        start_time: typing.Optional[dt.datetime] = None,
1631        metadata: typing.Optional[typing.Any] = None,
1632        input: typing.Optional[typing.Any] = None,
1633        output: typing.Optional[typing.Any] = None,
1634        level: typing.Optional[SpanLevel] = None,
1635        status_message: typing.Optional[str] = None,
1636        version: typing.Optional[str] = None,
1637        **kwargs,
1638    ) -> "StatefulSpanClient":
1639        """Create an event.
1640
1641        An event represents a discrete event in a trace.
1642        Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1643
1644        If no trace_id is provided, a new trace is created just for this event.
1645
1646        Args:
1647            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
1648            trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event.
1649            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1650            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
1651            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
1652            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
1653            input (Optional[Any]): The input to the event. Can be any JSON object.
1654            output (Optional[Any]): The output to the event. Can be any JSON object.
1655            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1656            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
1657            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
1658            **kwargs: Additional keyword arguments to include in the event.
1659
1660        Returns:
1661            StatefulSpanClient: The created event.
1662
1663        Example:
1664            ```python
1665            from langfuse import Langfuse
1666
1667            langfuse = Langfuse()
1668
1669            trace = langfuse.trace(name = "llm-feature")
1670
1671            # Create an event
1672            retrieval = langfuse.event(name = "retrieval", trace_id = trace.id)
1673            ```
1674        """
1675        event_id = id or str(uuid.uuid4())
1676        new_trace_id = trace_id or str(uuid.uuid4())
1677        self.trace_id = new_trace_id
1678        try:
1679            event_body = {
1680                "id": event_id,
1681                "trace_id": new_trace_id,
1682                "name": name,
1683                "start_time": start_time or _get_timestamp(),
1684                "metadata": metadata,
1685                "input": input,
1686                "output": output,
1687                "level": level,
1688                "status_message": status_message,
1689                "parent_observation_id": parent_observation_id,
1690                "version": version,
1691                "trace": {"release": self.release},
1692                **kwargs,
1693            }
1694
1695            if trace_id is None:
1696                self._generate_trace(new_trace_id, name or new_trace_id)
1697
1698            request = CreateEventBody(**event_body)
1699
1700            event = {
1701                "id": str(uuid.uuid4()),
1702                "type": "event-create",
1703                "body": request,
1704            }
1705
1706            self.log.debug(
1707                f"Creating event {_filter_io_from_event_body(event_body)} ..."
1708            )
1709            self.task_manager.add_task(event)
1710
1711        except Exception as e:
1712            self.log.exception(e)
1713        finally:
1714            return StatefulSpanClient(
1715                self.client,
1716                event_id,
1717                StateType.OBSERVATION,
1718                new_trace_id,
1719                self.task_manager,
1720            )
1721
1722    def generation(
1723        self,
1724        *,
1725        id: typing.Optional[str] = None,
1726        trace_id: typing.Optional[str] = None,
1727        parent_observation_id: typing.Optional[str] = None,
1728        name: typing.Optional[str] = None,
1729        start_time: typing.Optional[dt.datetime] = None,
1730        end_time: typing.Optional[dt.datetime] = None,
1731        completion_start_time: typing.Optional[dt.datetime] = None,
1732        metadata: typing.Optional[typing.Any] = None,
1733        level: typing.Optional[SpanLevel] = None,
1734        status_message: typing.Optional[str] = None,
1735        version: typing.Optional[str] = None,
1736        model: typing.Optional[str] = None,
1737        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1738        input: typing.Optional[typing.Any] = None,
1739        output: typing.Optional[typing.Any] = None,
1740        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1741        prompt: typing.Optional[PromptClient] = None,
1742        **kwargs,
1743    ) -> "StatefulGenerationClient":
1744        """Create a generation.
1745
1746        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
1747
1748        Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1749
1750        If no trace_id is provided, a new trace is created just for this generation.
1751
1752        Args:
1753            id (Optional[str]): The id of the generation can be set, defaults to random id.
1754            trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created
1755            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1756            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1757            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1758            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1759            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1760            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1761            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1762            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1763            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1764            model (Optional[str]): The name of the model used for the generation.
1765            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1766            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1767            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1768            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1769            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1770            **kwargs: Additional keyword arguments to include in the generation.
1771
1772        Returns:
1773            StatefulGenerationClient: The created generation.
1774
1775        Example:
1776            ```python
1777            from langfuse import Langfuse
1778
1779            langfuse = Langfuse()
1780
1781            # Create a generation in Langfuse
1782            generation = langfuse.generation(
1783                name="summary-generation",
1784                model="gpt-3.5-turbo",
1785                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
1786                input=[{"role": "system", "content": "You are a helpful assistant."},
1787                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
1788                metadata={"interface": "whatsapp"}
1789            )
1790            ```
1791        """
1792        new_trace_id = trace_id or str(uuid.uuid4())
1793        new_generation_id = id or str(uuid.uuid4())
1794        self.trace_id = new_trace_id
1795        try:
1796            generation_body = {
1797                "id": new_generation_id,
1798                "trace_id": new_trace_id,
1799                "release": self.release,
1800                "name": name,
1801                "start_time": start_time or _get_timestamp(),
1802                "metadata": metadata,
1803                "input": input,
1804                "output": output,
1805                "level": level,
1806                "status_message": status_message,
1807                "parent_observation_id": parent_observation_id,
1808                "version": version,
1809                "end_time": end_time,
1810                "completion_start_time": completion_start_time,
1811                "model": model,
1812                "model_parameters": model_parameters,
1813                "usage": _convert_usage_input(usage) if usage is not None else None,
1814                "trace": {"release": self.release},
1815                **_create_prompt_context(prompt),
1816                **kwargs,
1817            }
1818
1819            if trace_id is None:
1820                trace = {
1821                    "id": new_trace_id,
1822                    "release": self.release,
1823                    "name": name,
1824                }
1825                request = TraceBody(**trace)
1826
1827                event = {
1828                    "id": str(uuid.uuid4()),
1829                    "type": "trace-create",
1830                    "body": request,
1831                }
1832
1833                self.log.debug("Creating trace...")
1834
1835                self.task_manager.add_task(event)
1836
1837            self.log.debug(
1838                f"Creating generation max {_filter_io_from_event_body(generation_body)}..."
1839            )
1840            request = CreateGenerationBody(**generation_body)
1841
1842            event = {
1843                "id": str(uuid.uuid4()),
1844                "type": "generation-create",
1845                "body": request,
1846            }
1847
1848            self.task_manager.add_task(event)
1849
1850        except Exception as e:
1851            self.log.exception(e)
1852        finally:
1853            return StatefulGenerationClient(
1854                self.client,
1855                new_generation_id,
1856                StateType.OBSERVATION,
1857                new_trace_id,
1858                self.task_manager,
1859            )
1860
1861    def _generate_trace(self, trace_id: str, name: str):
1862        trace_dict = {
1863            "id": trace_id,
1864            "release": self.release,
1865            "name": name,
1866        }
1867
1868        trace_body = TraceBody(**trace_dict)
1869
1870        event = {
1871            "id": str(uuid.uuid4()),
1872            "type": "trace-create",
1873            "body": trace_body,
1874        }
1875
1876        self.log.debug(f"Creating trace {_filter_io_from_event_body(trace_dict)}...")
1877        self.task_manager.add_task(event)
1878
1879    def join(self):
1880        """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter.
1881
1882        If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes.
1883        To guarantee all messages have been delivered, you still need to call flush().
1884        """
1885        try:
1886            return self.task_manager.join()
1887        except Exception as e:
1888            self.log.exception(e)
1889
1890    def flush(self):
1891        """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down.
1892
1893        Example:
1894            ```python
1895            from langfuse import Langfuse
1896
1897            langfuse = Langfuse()
1898
1899            # Some operations with Langfuse
1900
1901            # Flushing all events to end Langfuse cleanly
1902            langfuse.flush()
1903            ```
1904        """
1905        try:
1906            return self.task_manager.flush()
1907        except Exception as e:
1908            self.log.exception(e)
1909
1910    def shutdown(self):
1911        """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated.
1912
1913        This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API.
1914        As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
1915        """
1916        try:
1917            return self.task_manager.shutdown()
1918        except Exception as e:
1919            self.log.exception(e)
1920
1921
1922class StateType(Enum):
1923    """Enum to distinguish observation and trace states.
1924
1925    Attributes:
1926        OBSERVATION (int): Observation state.
1927        TRACE (int): Trace state.
1928    """
1929
1930    OBSERVATION = 1
1931    TRACE = 0
1932
1933
1934class StatefulClient(object):
1935    """Base class for handling stateful operations in the Langfuse system.
1936
1937    This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events,
1938    associating them with either an observation or a trace based on the specified state type.
1939
1940    Attributes:
1941        client (FernLangfuse): Core interface for Langfuse API interactions.
1942        id (str): Unique identifier of the stateful client (either observation or trace).
1943        state_type (StateType): Enum indicating whether the client is an observation or a trace.
1944        trace_id (str): Id of the trace associated with the stateful client.
1945        task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1946    """
1947
1948    log = logging.getLogger("langfuse")
1949
1950    def __init__(
1951        self,
1952        client: FernLangfuse,
1953        id: str,
1954        state_type: StateType,
1955        trace_id: str,
1956        task_manager: TaskManager,
1957    ):
1958        """Initialize the StatefulClient.
1959
1960        Args:
1961            client (FernLangfuse): Core interface for Langfuse API interactions.
1962            id (str): Unique identifier of the stateful client (either observation or trace).
1963            state_type (StateType): Enum indicating whether the client is an observation or a trace.
1964            trace_id (str): Id of the trace associated with the stateful client.
1965            task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1966        """
1967        self.client = client
1968        self.trace_id = trace_id
1969        self.id = id
1970        self.state_type = state_type
1971        self.task_manager = task_manager
1972
1973    def _add_state_to_event(self, body: dict):
1974        if self.state_type == StateType.OBSERVATION:
1975            body["parent_observation_id"] = self.id
1976            body["trace_id"] = self.trace_id
1977        else:
1978            body["trace_id"] = self.id
1979        return body
1980
1981    def _add_default_values(self, body: dict):
1982        if body.get("start_time") is None:
1983            body["start_time"] = _get_timestamp()
1984        return body
1985
1986    def generation(
1987        self,
1988        *,
1989        id: typing.Optional[str] = None,
1990        name: typing.Optional[str] = None,
1991        start_time: typing.Optional[dt.datetime] = None,
1992        end_time: typing.Optional[dt.datetime] = None,
1993        metadata: typing.Optional[typing.Any] = None,
1994        level: typing.Optional[SpanLevel] = None,
1995        status_message: typing.Optional[str] = None,
1996        version: typing.Optional[str] = None,
1997        completion_start_time: typing.Optional[dt.datetime] = None,
1998        model: typing.Optional[str] = None,
1999        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2000        input: typing.Optional[typing.Any] = None,
2001        output: typing.Optional[typing.Any] = None,
2002        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2003        prompt: typing.Optional[PromptClient] = None,
2004        **kwargs,
2005    ) -> "StatefulGenerationClient":
2006        """Create a generation nested within the current observation or trace.
2007
2008        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
2009
2010        Args:
2011            id (Optional[str]): The id of the generation can be set, defaults to random id.
2012            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2013            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2014            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
2015            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2016            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2017            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2018            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2019            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2020            model (Optional[str]): The name of the model used for the generation.
2021            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2022            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2023            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2024            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2025            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2026            **kwargs: Additional keyword arguments to include in the generation.
2027
2028        Returns:
2029            StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations.
2030
2031        Example:
2032            ```python
2033            from langfuse import Langfuse
2034
2035            langfuse = Langfuse()
2036
2037            # Create a trace
2038            trace = langfuse.trace(name = "llm-feature")
2039
2040            # Create a nested generation in Langfuse
2041            generation = trace.generation(
2042                name="summary-generation",
2043                model="gpt-3.5-turbo",
2044                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
2045                input=[{"role": "system", "content": "You are a helpful assistant."},
2046                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
2047                metadata={"interface": "whatsapp"}
2048            )
2049            ```
2050        """
2051        generation_id = id or str(uuid.uuid4())
2052        try:
2053            generation_body = {
2054                "id": generation_id,
2055                "name": name,
2056                "start_time": start_time or _get_timestamp(),
2057                "metadata": metadata,
2058                "level": level,
2059                "status_message": status_message,
2060                "version": version,
2061                "end_time": end_time,
2062                "completion_start_time": completion_start_time,
2063                "model": model,
2064                "model_parameters": model_parameters,
2065                "input": input,
2066                "output": output,
2067                "usage": _convert_usage_input(usage) if usage is not None else None,
2068                **_create_prompt_context(prompt),
2069                **kwargs,
2070            }
2071
2072            generation_body = self._add_state_to_event(generation_body)
2073            new_body = self._add_default_values(generation_body)
2074
2075            new_body = CreateGenerationBody(**new_body)
2076
2077            event = {
2078                "id": str(uuid.uuid4()),
2079                "type": "generation-create",
2080                "body": new_body.dict(exclude_none=True, exclude_unset=False),
2081            }
2082
2083            self.log.debug(
2084                f"Creating generation {_filter_io_from_event_body(generation_body)}..."
2085            )
2086            self.task_manager.add_task(event)
2087
2088        except Exception as e:
2089            self.log.exception(e)
2090        finally:
2091            return StatefulGenerationClient(
2092                self.client,
2093                generation_id,
2094                StateType.OBSERVATION,
2095                self.trace_id,
2096                task_manager=self.task_manager,
2097            )
2098
2099    def span(
2100        self,
2101        *,
2102        id: typing.Optional[str] = None,
2103        name: typing.Optional[str] = None,
2104        start_time: typing.Optional[dt.datetime] = None,
2105        end_time: typing.Optional[dt.datetime] = None,
2106        metadata: typing.Optional[typing.Any] = None,
2107        input: typing.Optional[typing.Any] = None,
2108        output: typing.Optional[typing.Any] = None,
2109        level: typing.Optional[SpanLevel] = None,
2110        status_message: typing.Optional[str] = None,
2111        version: typing.Optional[str] = None,
2112        **kwargs,
2113    ) -> "StatefulSpanClient":
2114        """Create a span nested within the current observation or trace.
2115
2116        A span represents durations of units of work in a trace.
2117
2118        Args:
2119            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
2120            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2121            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2122            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2123            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2124            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2125            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2126            input (Optional[dict]): The input to the span. Can be any JSON object.
2127            output (Optional[dict]): The output to the span. Can be any JSON object.
2128            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2129            **kwargs: Additional keyword arguments to include in the span.
2130
2131        Returns:
2132            StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations.
2133
2134        Example:
2135            ```python
2136            from langfuse import Langfuse
2137
2138            langfuse = Langfuse()
2139
2140            # Create a trace
2141            trace = langfuse.trace(name = "llm-feature")
2142
2143            # Create a span
2144            retrieval = langfuse.span(name = "retrieval")
2145            ```
2146        """
2147        span_id = id or str(uuid.uuid4())
2148        try:
2149            span_body = {
2150                "id": span_id,
2151                "name": name,
2152                "start_time": start_time or _get_timestamp(),
2153                "metadata": metadata,
2154                "input": input,
2155                "output": output,
2156                "level": level,
2157                "status_message": status_message,
2158                "version": version,
2159                "end_time": end_time,
2160                **kwargs,
2161            }
2162
2163            self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...")
2164
2165            new_dict = self._add_state_to_event(span_body)
2166            new_body = self._add_default_values(new_dict)
2167
2168            event = CreateSpanBody(**new_body)
2169
2170            event = {
2171                "id": str(uuid.uuid4()),
2172                "type": "span-create",
2173                "body": event,
2174            }
2175
2176            self.task_manager.add_task(event)
2177        except Exception as e:
2178            self.log.exception(e)
2179        finally:
2180            return StatefulSpanClient(
2181                self.client,
2182                span_id,
2183                StateType.OBSERVATION,
2184                self.trace_id,
2185                task_manager=self.task_manager,
2186            )
2187
2188    @overload
2189    def score(
2190        self,
2191        *,
2192        id: typing.Optional[str] = None,
2193        name: str,
2194        value: float,
2195        data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
2196        comment: typing.Optional[str] = None,
2197        config_id: typing.Optional[str] = None,
2198        **kwargs,
2199    ) -> "StatefulClient": ...
2200
2201    @overload
2202    def score(
2203        self,
2204        *,
2205        id: typing.Optional[str] = None,
2206        name: str,
2207        value: str,
2208        data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
2209        comment: typing.Optional[str] = None,
2210        config_id: typing.Optional[str] = None,
2211        **kwargs,
2212    ) -> "StatefulClient": ...
2213
2214    def score(
2215        self,
2216        *,
2217        id: typing.Optional[str] = None,
2218        name: str,
2219        value: typing.Union[float, str],
2220        data_type: typing.Optional[ScoreDataType] = None,
2221        comment: typing.Optional[str] = None,
2222        config_id: typing.Optional[str] = None,
2223        **kwargs,
2224    ) -> "StatefulClient":
2225        """Create a score attached for the current observation or trace.
2226
2227        Args:
2228            name (str): Identifier of the score.
2229            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
2230            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
2231              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
2232            comment (Optional[str]): Additional context/explanation of the score.
2233            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
2234            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
2235            **kwargs: Additional keyword arguments to include in the score.
2236
2237        Returns:
2238            StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining.
2239
2240        Example:
2241            ```python
2242            from langfuse import Langfuse
2243
2244            langfuse = Langfuse()
2245
2246            # Create a trace
2247            trace = langfuse.trace(name="example-application")
2248
2249            # Add score to the trace
2250            trace = trace.score(
2251                name="user-explicit-feedback",
2252                value=0.8,
2253                comment="I like how personalized the response is"
2254            )
2255            ```
2256        """
2257        score_id = id or str(uuid.uuid4())
2258        try:
2259            new_score = {
2260                "id": score_id,
2261                "trace_id": self.trace_id,
2262                "name": name,
2263                "value": value,
2264                "data_type": data_type,
2265                "comment": comment,
2266                "config_id": config_id,
2267                **kwargs,
2268            }
2269
2270            self.log.debug(f"Creating score {new_score}...")
2271
2272            new_dict = self._add_state_to_event(new_score)
2273
2274            if self.state_type == StateType.OBSERVATION:
2275                new_dict["observationId"] = self.id
2276
2277            request = ScoreBody(**new_dict)
2278
2279            event = {
2280                "id": str(uuid.uuid4()),
2281                "type": "score-create",
2282                "body": request,
2283            }
2284
2285            self.task_manager.add_task(event)
2286
2287        except Exception as e:
2288            self.log.exception(e)
2289        finally:
2290            return StatefulClient(
2291                self.client,
2292                self.id,
2293                self.state_type,
2294                self.trace_id,
2295                task_manager=self.task_manager,
2296            )
2297
2298    def event(
2299        self,
2300        *,
2301        id: typing.Optional[str] = None,
2302        name: typing.Optional[str] = None,
2303        start_time: typing.Optional[dt.datetime] = None,
2304        metadata: typing.Optional[typing.Any] = None,
2305        input: typing.Optional[typing.Any] = None,
2306        output: typing.Optional[typing.Any] = None,
2307        level: typing.Optional[SpanLevel] = None,
2308        status_message: typing.Optional[str] = None,
2309        version: typing.Optional[str] = None,
2310        **kwargs,
2311    ) -> "StatefulClient":
2312        """Create an event nested within the current observation or trace.
2313
2314        An event represents a discrete event in a trace.
2315
2316        Args:
2317            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
2318            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
2319            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
2320            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
2321            input (Optional[Any]): The input to the event. Can be any JSON object.
2322            output (Optional[Any]): The output to the event. Can be any JSON object.
2323            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2324            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
2325            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
2326            **kwargs: Additional keyword arguments to include in the event.
2327
2328        Returns:
2329            StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations.
2330
2331        Example:
2332            ```python
2333            from langfuse import Langfuse
2334
2335            langfuse = Langfuse()
2336
2337            # Create a trace
2338            trace = langfuse.trace(name = "llm-feature")
2339
2340            # Create an event
2341            retrieval = trace.event(name = "retrieval")
2342            ```
2343        """
2344        event_id = id or str(uuid.uuid4())
2345        try:
2346            event_body = {
2347                "id": event_id,
2348                "name": name,
2349                "start_time": start_time or _get_timestamp(),
2350                "metadata": metadata,
2351                "input": input,
2352                "output": output,
2353                "level": level,
2354                "status_message": status_message,
2355                "version": version,
2356                **kwargs,
2357            }
2358
2359            new_dict = self._add_state_to_event(event_body)
2360            new_body = self._add_default_values(new_dict)
2361
2362            request = CreateEventBody(**new_body)
2363
2364            event = {
2365                "id": str(uuid.uuid4()),
2366                "type": "event-create",
2367                "body": request,
2368            }
2369
2370            self.log.debug(
2371                f"Creating event {_filter_io_from_event_body(event_body)}..."
2372            )
2373            self.task_manager.add_task(event)
2374
2375        except Exception as e:
2376            self.log.exception(e)
2377        finally:
2378            return StatefulClient(
2379                self.client,
2380                event_id,
2381                StateType.OBSERVATION,
2382                self.trace_id,
2383                self.task_manager,
2384            )
2385
2386    def get_trace_url(self):
2387        """Get the URL to see the current trace in the Langfuse UI."""
2388        return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}"
2389
2390
2391class StatefulGenerationClient(StatefulClient):
2392    """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient.
2393
2394    This client extends the capabilities of the StatefulClient to specifically handle generation,
2395    allowing for the creation, update, and termination of generation processes in Langfuse.
2396
2397    Attributes:
2398        client (FernLangfuse): Core interface for Langfuse API interaction.
2399        id (str): Unique identifier of the generation.
2400        state_type (StateType): Type of the stateful entity (observation or trace).
2401        trace_id (str): Id of trace associated with the generation.
2402        task_manager (TaskManager): Manager for handling asynchronous tasks.
2403    """
2404
2405    log = logging.getLogger("langfuse")
2406
2407    def __init__(
2408        self,
2409        client: FernLangfuse,
2410        id: str,
2411        state_type: StateType,
2412        trace_id: str,
2413        task_manager: TaskManager,
2414    ):
2415        """Initialize the StatefulGenerationClient."""
2416        super().__init__(client, id, state_type, trace_id, task_manager)
2417
2418    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
2419    def update(
2420        self,
2421        *,
2422        name: typing.Optional[str] = None,
2423        start_time: typing.Optional[dt.datetime] = None,
2424        end_time: typing.Optional[dt.datetime] = None,
2425        completion_start_time: typing.Optional[dt.datetime] = None,
2426        metadata: typing.Optional[typing.Any] = None,
2427        level: typing.Optional[SpanLevel] = None,
2428        status_message: typing.Optional[str] = None,
2429        version: typing.Optional[str] = None,
2430        model: typing.Optional[str] = None,
2431        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2432        input: typing.Optional[typing.Any] = None,
2433        output: typing.Optional[typing.Any] = None,
2434        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2435        prompt: typing.Optional[PromptClient] = None,
2436        **kwargs,
2437    ) -> "StatefulGenerationClient":
2438        """Update the generation.
2439
2440        Args:
2441            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2442            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2443            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
2444            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2445            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2446            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2447            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2448            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2449            model (Optional[str]): The name of the model used for the generation.
2450            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2451            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2452            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2453            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2454            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2455            **kwargs: Additional keyword arguments to include in the generation.
2456
2457        Returns:
2458            StatefulGenerationClient: The updated generation. Passthrough for chaining.
2459
2460        Example:
2461            ```python
2462            from langfuse import Langfuse
2463
2464            langfuse = Langfuse()
2465
2466            # Create a trace
2467            trace = langfuse.trace(name = "llm-feature")
2468
2469            # Create a nested generation in Langfuse
2470            generation = trace.generation(name="summary-generation")
2471
2472            # Update the generation
2473            generation = generation.update(metadata={"interface": "whatsapp"})
2474            ```
2475        """
2476        try:
2477            generation_body = {
2478                "id": self.id,
2479                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
2480                "name": name,
2481                "start_time": start_time,
2482                "metadata": metadata,
2483                "level": level,
2484                "status_message": status_message,
2485                "version": version,
2486                "end_time": end_time,
2487                "completion_start_time": completion_start_time,
2488                "model": model,
2489                "model_parameters": model_parameters,
2490                "input": input,
2491                "output": output,
2492                "usage": _convert_usage_input(usage) if usage is not None else None,
2493                **_create_prompt_context(prompt),
2494                **kwargs,
2495            }
2496
2497            self.log.debug(
2498                f"Update generation {_filter_io_from_event_body(generation_body)}..."
2499            )
2500
2501            request = UpdateGenerationBody(**generation_body)
2502
2503            event = {
2504                "id": str(uuid.uuid4()),
2505                "type": "generation-update",
2506                "body": request.dict(exclude_none=True, exclude_unset=False),
2507            }
2508
2509            self.log.debug(
2510                f"Update generation {_filter_io_from_event_body(generation_body)}..."
2511            )
2512            self.task_manager.add_task(event)
2513
2514        except Exception as e:
2515            self.log.exception(e)
2516        finally:
2517            return StatefulGenerationClient(
2518                self.client,
2519                self.id,
2520                StateType.OBSERVATION,
2521                self.trace_id,
2522                task_manager=self.task_manager,
2523            )
2524
2525    def end(
2526        self,
2527        *,
2528        name: typing.Optional[str] = None,
2529        start_time: typing.Optional[dt.datetime] = None,
2530        end_time: typing.Optional[dt.datetime] = None,
2531        completion_start_time: typing.Optional[dt.datetime] = None,
2532        metadata: typing.Optional[typing.Any] = None,
2533        level: typing.Optional[SpanLevel] = None,
2534        status_message: typing.Optional[str] = None,
2535        version: typing.Optional[str] = None,
2536        model: typing.Optional[str] = None,
2537        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2538        input: typing.Optional[typing.Any] = None,
2539        output: typing.Optional[typing.Any] = None,
2540        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2541        prompt: typing.Optional[PromptClient] = None,
2542        **kwargs,
2543    ) -> "StatefulGenerationClient":
2544        """End the generation, optionally updating its properties.
2545
2546        Args:
2547            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2548            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2549            end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time.
2550            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2551            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2552            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2553            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2554            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2555            model (Optional[str]): The name of the model used for the generation.
2556            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2557            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2558            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2559            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2560            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2561            **kwargs: Additional keyword arguments to include in the generation.
2562
2563        Returns:
2564            StatefulGenerationClient: The ended generation. Passthrough for chaining.
2565
2566        Example:
2567            ```python
2568            from langfuse import Langfuse
2569
2570            langfuse = Langfuse()
2571
2572            # Create a trace
2573            trace = langfuse.trace(name = "llm-feature")
2574
2575            # Create a nested generation in Langfuse
2576            generation = trace.generation(name="summary-generation")
2577
2578            # End the generation and update its properties
2579            generation = generation.end(metadata={"interface": "whatsapp"})
2580            ```
2581        """
2582        return self.update(
2583            name=name,
2584            start_time=start_time,
2585            end_time=end_time or _get_timestamp(),
2586            metadata=metadata,
2587            level=level,
2588            status_message=status_message,
2589            version=version,
2590            completion_start_time=completion_start_time,
2591            model=model,
2592            model_parameters=model_parameters,
2593            input=input,
2594            output=output,
2595            usage=usage,
2596            prompt=prompt,
2597            **kwargs,
2598        )
2599
2600
2601class StatefulSpanClient(StatefulClient):
2602    """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient.
2603
2604    Attributes:
2605        client (FernLangfuse): Core interface for Langfuse API interaction.
2606        id (str): Unique identifier of the span.
2607        state_type (StateType): Type of the stateful entity (observation or trace).
2608        trace_id (str): Id of trace associated with the span.
2609        task_manager (TaskManager): Manager for handling asynchronous tasks.
2610    """
2611
2612    log = logging.getLogger("langfuse")
2613
2614    def __init__(
2615        self,
2616        client: FernLangfuse,
2617        id: str,
2618        state_type: StateType,
2619        trace_id: str,
2620        task_manager: TaskManager,
2621    ):
2622        """Initialize the StatefulSpanClient."""
2623        super().__init__(client, id, state_type, trace_id, task_manager)
2624
2625    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
2626    def update(
2627        self,
2628        *,
2629        name: typing.Optional[str] = None,
2630        start_time: typing.Optional[dt.datetime] = None,
2631        end_time: typing.Optional[dt.datetime] = None,
2632        metadata: typing.Optional[typing.Any] = None,
2633        input: typing.Optional[typing.Any] = None,
2634        output: typing.Optional[typing.Any] = None,
2635        level: typing.Optional[SpanLevel] = None,
2636        status_message: typing.Optional[str] = None,
2637        version: typing.Optional[str] = None,
2638        **kwargs,
2639    ) -> "StatefulSpanClient":
2640        """Update the span.
2641
2642        Args:
2643            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2644            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2645            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2646            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2647            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2648            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2649            input (Optional[dict]): The input to the span. Can be any JSON object.
2650            output (Optional[dict]): The output to the span. Can be any JSON object.
2651            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2652            **kwargs: Additional keyword arguments to include in the span.
2653
2654        Returns:
2655            StatefulSpanClient: The updated span. Passthrough for chaining.
2656
2657        Example:
2658            ```python
2659            from langfuse import Langfuse
2660
2661            langfuse = Langfuse()
2662
2663            # Create a trace
2664            trace = langfuse.trace(name = "llm-feature")
2665
2666            # Create a nested span in Langfuse
2667            span = trace.span(name="retrieval")
2668
2669            # Update the span
2670            span = span.update(metadata={"interface": "whatsapp"})
2671            ```
2672        """
2673        try:
2674            span_body = {
2675                "id": self.id,
2676                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
2677                "name": name,
2678                "start_time": start_time,
2679                "metadata": metadata,
2680                "input": input,
2681                "output": output,
2682                "level": level,
2683                "status_message": status_message,
2684                "version": version,
2685                "end_time": end_time,
2686                **kwargs,
2687            }
2688            self.log.debug(f"Update span {_filter_io_from_event_body(span_body)}...")
2689
2690            request = UpdateSpanBody(**span_body)
2691
2692            event = {
2693                "id": str(uuid.uuid4()),
2694                "type": "span-update",
2695                "body": request,
2696            }
2697
2698            self.task_manager.add_task(event)
2699        except Exception as e:
2700            self.log.exception(e)
2701        finally:
2702            return StatefulSpanClient(
2703                self.client,
2704                self.id,
2705                StateType.OBSERVATION,
2706                self.trace_id,
2707                task_manager=self.task_manager,
2708            )
2709
2710    def end(
2711        self,
2712        *,
2713        name: typing.Optional[str] = None,
2714        start_time: typing.Optional[dt.datetime] = None,
2715        end_time: typing.Optional[dt.datetime] = None,
2716        metadata: typing.Optional[typing.Any] = None,
2717        input: typing.Optional[typing.Any] = None,
2718        output: typing.Optional[typing.Any] = None,
2719        level: typing.Optional[SpanLevel] = None,
2720        status_message: typing.Optional[str] = None,
2721        version: typing.Optional[str] = None,
2722        **kwargs,
2723    ) -> "StatefulSpanClient":
2724        """End the span, optionally updating its properties.
2725
2726        Args:
2727            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2728            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2729            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2730            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2731            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2732            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2733            input (Optional[dict]): The input to the span. Can be any JSON object.
2734            output (Optional[dict]): The output to the span. Can be any JSON object.
2735            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2736            **kwargs: Additional keyword arguments to include in the span.
2737
2738        Returns:
2739            StatefulSpanClient: The updated span. Passthrough for chaining.
2740
2741        Example:
2742            ```python
2743            from langfuse import Langfuse
2744
2745            langfuse = Langfuse()
2746
2747            # Create a trace
2748            trace = langfuse.trace(name = "llm-feature")
2749
2750            # Create a nested span in Langfuse
2751            span = trace.span(name="retrieval")
2752
2753            # End the span and update its properties
2754            span = span.end(metadata={"interface": "whatsapp"})
2755            ```
2756        """
2757        try:
2758            span_body = {
2759                "name": name,
2760                "start_time": start_time,
2761                "metadata": metadata,
2762                "input": input,
2763                "output": output,
2764                "level": level,
2765                "status_message": status_message,
2766                "version": version,
2767                "end_time": end_time or _get_timestamp(),
2768                **kwargs,
2769            }
2770            return self.update(**span_body)
2771
2772        except Exception as e:
2773            self.log.warning(e)
2774        finally:
2775            return StatefulSpanClient(
2776                self.client,
2777                self.id,
2778                StateType.OBSERVATION,
2779                self.trace_id,
2780                task_manager=self.task_manager,
2781            )
2782
2783    def get_langchain_handler(self, update_parent: bool = False):
2784        """Get langchain callback handler associated with the current span.
2785
2786        Args:
2787            update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run.
2788
2789        Returns:
2790            CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient.
2791        """
2792        from langfuse.callback import CallbackHandler
2793
2794        return CallbackHandler(
2795            stateful_client=self, update_stateful_client=update_parent
2796        )
2797
2798
2799class StatefulTraceClient(StatefulClient):
2800    """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient.
2801
2802    Attributes:
2803        client (FernLangfuse): Core interface for Langfuse API interaction.
2804        id (str): Unique identifier of the trace.
2805        state_type (StateType): Type of the stateful entity (observation or trace).
2806        trace_id (str): The trace ID associated with this client.
2807        task_manager (TaskManager): Manager for handling asynchronous tasks.
2808    """
2809
2810    log = logging.getLogger("langfuse")
2811
2812    def __init__(
2813        self,
2814        client: FernLangfuse,
2815        id: str,
2816        state_type: StateType,
2817        trace_id: str,
2818        task_manager: TaskManager,
2819    ):
2820        """Initialize the StatefulTraceClient."""
2821        super().__init__(client, id, state_type, trace_id, task_manager)
2822        self.task_manager = task_manager
2823
2824    def update(
2825        self,
2826        *,
2827        name: typing.Optional[str] = None,
2828        user_id: typing.Optional[str] = None,
2829        session_id: typing.Optional[str] = None,
2830        version: typing.Optional[str] = None,
2831        release: typing.Optional[str] = None,
2832        input: typing.Optional[typing.Any] = None,
2833        output: typing.Optional[typing.Any] = None,
2834        metadata: typing.Optional[typing.Any] = None,
2835        tags: typing.Optional[typing.List[str]] = None,
2836        public: typing.Optional[bool] = None,
2837        **kwargs,
2838    ) -> "StatefulTraceClient":
2839        """Update the trace.
2840
2841        Args:
2842            name: Identifier of the trace. Useful for sorting/filtering in the UI.
2843            input: The input of the trace. Can be any JSON object.
2844            output: The output of the trace. Can be any JSON object.
2845            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
2846            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
2847            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
2848            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
2849            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
2850            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
2851            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
2852            **kwargs: Additional keyword arguments that can be included in the trace.
2853
2854        Returns:
2855            StatefulTraceClient: The updated trace. Passthrough for chaining.
2856
2857        Example:
2858            ```python
2859            from langfuse import Langfuse
2860
2861            langfuse = Langfuse()
2862
2863            # Create a trace
2864            trace = langfuse.trace(
2865                name="example-application",
2866                user_id="user-1234")
2867            )
2868
2869            # Update the trace
2870            trace = trace.update(
2871                output={"result": "success"},
2872                metadata={"interface": "whatsapp"}
2873            )
2874            ```
2875        """
2876        try:
2877            trace_body = {
2878                "id": self.id,
2879                "name": name,
2880                "userId": user_id,
2881                "sessionId": session_id
2882                or kwargs.get("sessionId", None),  # backward compatibility
2883                "version": version,
2884                "release": release,
2885                "input": input,
2886                "output": output,
2887                "metadata": metadata,
2888                "public": public,
2889                "tags": tags,
2890                **kwargs,
2891            }
2892            self.log.debug(f"Update trace {_filter_io_from_event_body(trace_body)}...")
2893
2894            request = TraceBody(**trace_body)
2895
2896            event = {
2897                "id": str(uuid.uuid4()),
2898                "type": "trace-create",
2899                "body": request,
2900            }
2901
2902            self.task_manager.add_task(event)
2903
2904        except Exception as e:
2905            self.log.exception(e)
2906        finally:
2907            return StatefulTraceClient(
2908                self.client,
2909                self.id,
2910                StateType.TRACE,
2911                self.trace_id,
2912                task_manager=self.task_manager,
2913            )
2914
2915    def get_langchain_handler(self, update_parent: bool = False):
2916        """Get langchain callback handler associated with the current trace.
2917
2918        This method creates and returns a CallbackHandler instance, linking it with the current
2919        trace. Use this if you want to group multiple Langchain runs within a single trace.
2920
2921        Args:
2922            update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run.
2923
2924        Raises:
2925            ImportError: If the 'langchain' module is not installed, indicating missing functionality.
2926
2927        Returns:
2928            CallbackHandler: Langchain callback handler linked to the current trace.
2929
2930        Example:
2931            ```python
2932            from langfuse import Langfuse
2933
2934            langfuse = Langfuse()
2935
2936            # Create a trace
2937            trace = langfuse.trace(name = "llm-feature")
2938
2939            # Get a langchain callback handler
2940            handler = trace.get_langchain_handler()
2941            ```
2942        """
2943        try:
2944            from langfuse.callback import CallbackHandler
2945
2946            self.log.debug(f"Creating new handler for trace {self.id}")
2947
2948            return CallbackHandler(
2949                stateful_client=self,
2950                debug=self.log.level == logging.DEBUG,
2951                update_stateful_client=update_parent,
2952            )
2953        except Exception as e:
2954            self.log.exception(e)
2955
2956    def getNewHandler(self):
2957        """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated."""
2958        return self.get_langchain_handler()
2959
2960
2961class DatasetItemClient:
2962    """Class for managing dataset items in Langfuse.
2963
2964    Args:
2965        id (str): Unique identifier of the dataset item.
2966        status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
2967        input (Any): Input data of the dataset item.
2968        expected_output (Optional[Any]): Expected output of the dataset item.
2969        metadata (Optional[Any]): Additional metadata of the dataset item.
2970        source_trace_id (Optional[str]): Identifier of the source trace.
2971        source_observation_id (Optional[str]): Identifier of the source observation.
2972        dataset_id (str): Identifier of the dataset to which this item belongs.
2973        dataset_name (str): Name of the dataset to which this item belongs.
2974        created_at (datetime): Timestamp of dataset item creation.
2975        updated_at (datetime): Timestamp of the last update to the dataset item.
2976        langfuse (Langfuse): Instance of Langfuse client for API interactions.
2977
2978    Example:
2979        ```python
2980        from langfuse import Langfuse
2981
2982        langfuse = Langfuse()
2983
2984        dataset = langfuse.get_dataset("<dataset_name>")
2985
2986        for item in dataset.items:
2987            # Generate a completion using the input of every item
2988            completion, generation = llm_app.run(item.input)
2989
2990            # Evaluate the completion
2991            generation.score(
2992                name="example-score",
2993                value=1
2994            )
2995        ```
2996    """
2997
2998    log = logging.getLogger("langfuse")
2999
3000    id: str
3001    status: DatasetStatus
3002    input: typing.Any
3003    expected_output: typing.Optional[typing.Any]
3004    metadata: Optional[Any]
3005    source_trace_id: typing.Optional[str]
3006    source_observation_id: typing.Optional[str]
3007    dataset_id: str
3008    dataset_name: str
3009    created_at: dt.datetime
3010    updated_at: dt.datetime
3011
3012    langfuse: Langfuse
3013
3014    def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse):
3015        """Initialize the DatasetItemClient."""
3016        self.id = dataset_item.id
3017        self.status = dataset_item.status
3018        self.input = dataset_item.input
3019        self.expected_output = dataset_item.expected_output
3020        self.metadata = dataset_item.metadata
3021        self.source_trace_id = dataset_item.source_trace_id
3022        self.source_observation_id = dataset_item.source_observation_id
3023        self.dataset_id = dataset_item.dataset_id
3024        self.dataset_name = dataset_item.dataset_name
3025        self.created_at = dataset_item.created_at
3026        self.updated_at = dataset_item.updated_at
3027
3028        self.langfuse = langfuse
3029
3030    def flush(self, observation: StatefulClient, run_name: str):
3031        """Flushes an observations task manager's queue.
3032
3033        Used before creating a dataset run item to ensure all events are persistent.
3034
3035        Args:
3036            observation (StatefulClient): The observation or trace client associated with the dataset item.
3037            run_name (str): The name of the dataset run.
3038        """
3039        observation.task_manager.flush()
3040
3041    def link(
3042        self,
3043        trace_or_observation: typing.Union[StatefulClient, str, None],
3044        run_name: str,
3045        run_metadata: Optional[Any] = None,
3046        run_description: Optional[str] = None,
3047        trace_id: Optional[str] = None,
3048        observation_id: Optional[str] = None,
3049    ):
3050        """Link the dataset item to observation within a specific dataset run. Creates a dataset run item.
3051
3052        Args:
3053            trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID.
3054            run_name (str): The name of the dataset run.
3055            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3056            run_description (Optional[str]): Description of the dataset run.
3057            trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided.
3058            observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided.
3059        """
3060        parsed_trace_id: str = None
3061        parsed_observation_id: str = None
3062
3063        if isinstance(trace_or_observation, StatefulClient):
3064            # flush the queue before creating the dataset run item
3065            # to ensure that all events are persisted.
3066            if trace_or_observation.state_type == StateType.TRACE:
3067                parsed_trace_id = trace_or_observation.trace_id
3068            elif trace_or_observation.state_type == StateType.OBSERVATION:
3069                parsed_observation_id = trace_or_observation.id
3070                parsed_trace_id = trace_or_observation.trace_id
3071        # legacy support for observation_id
3072        elif isinstance(trace_or_observation, str):
3073            parsed_observation_id = trace_or_observation
3074        elif trace_or_observation is None:
3075            if trace_id is not None:
3076                parsed_trace_id = trace_id
3077                if observation_id is not None:
3078                    parsed_observation_id = observation_id
3079            else:
3080                raise ValueError(
3081                    "trace_id must be provided if trace_or_observation is None"
3082                )
3083        else:
3084            raise ValueError(
3085                "trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item"
3086            )
3087
3088        self.log.debug(
3089            f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}"
3090        )
3091        self.langfuse.client.dataset_run_items.create(
3092            request=CreateDatasetRunItemRequest(
3093                runName=run_name,
3094                datasetItemId=self.id,
3095                traceId=parsed_trace_id,
3096                observationId=parsed_observation_id,
3097                metadata=run_metadata,
3098                runDescription=run_description,
3099            )
3100        )
3101
3102    def get_langchain_handler(
3103        self,
3104        *,
3105        run_name: str,
3106        run_description: Optional[str] = None,
3107        run_metadata: Optional[Any] = None,
3108    ):
3109        """Create and get a langchain callback handler linked to this dataset item.
3110
3111        Args:
3112            run_name (str): The name of the dataset run to be used in the callback handler.
3113            run_description (Optional[str]): Description of the dataset run.
3114            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3115
3116        Returns:
3117            CallbackHandler: An instance of CallbackHandler linked to the dataset item.
3118        """
3119        metadata = {
3120            "dataset_item_id": self.id,
3121            "run_name": run_name,
3122            "dataset_id": self.dataset_id,
3123        }
3124        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3125
3126        self.link(
3127            trace, run_name, run_metadata=run_metadata, run_description=run_description
3128        )
3129
3130        return trace.get_langchain_handler(update_parent=True)
3131
3132    @contextmanager
3133    def observe(
3134        self,
3135        *,
3136        run_name: str,
3137        run_description: Optional[str] = None,
3138        run_metadata: Optional[Any] = None,
3139        trace_id: Optional[str] = None,
3140    ):
3141        """Observes a dataset run within the Langfuse client.
3142
3143        Args:
3144            run_name (str): The name of the dataset run.
3145            root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created.
3146            run_description (Optional[str]): The description of the dataset run.
3147            run_metadata (Optional[Any]): Additional metadata for the dataset run.
3148
3149        Yields:
3150            StatefulTraceClient: The trace associated with the dataset run.
3151        """
3152        from langfuse.decorators import langfuse_context
3153
3154        root_trace_id = trace_id or str(uuid.uuid4())
3155
3156        langfuse_context._set_root_trace_id(root_trace_id)
3157
3158        try:
3159            yield root_trace_id
3160
3161        finally:
3162            self.link(
3163                run_name=run_name,
3164                run_metadata=run_metadata,
3165                run_description=run_description,
3166                trace_or_observation=None,
3167                trace_id=root_trace_id,
3168            )
3169
3170    @contextmanager
3171    def observe_llama_index(
3172        self,
3173        *,
3174        run_name: str,
3175        run_description: Optional[str] = None,
3176        run_metadata: Optional[Any] = None,
3177        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
3178    ):
3179        """Context manager for observing LlamaIndex operations linked to this dataset item.
3180
3181        This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging
3182        and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all
3183        operations performed within the context are linked to the appropriate dataset item and run in Langfuse.
3184
3185        Args:
3186            run_name (str): The name of the dataset run.
3187            run_description (Optional[str]): Description of the dataset run. Defaults to None.
3188            run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None.
3189            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass
3190                to the LlamaIndex integration constructor. Defaults to an empty dictionary.
3191
3192        Yields:
3193            LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations.
3194
3195        Example:
3196            ```python
3197            dataset_item = dataset.items[0]
3198
3199            with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler:
3200                # Perform LlamaIndex operations here
3201                some_llama_index_operation()
3202            ```
3203
3204        Raises:
3205            ImportError: If required modules for LlamaIndex integration are not available.
3206        """
3207        metadata = {
3208            "dataset_item_id": self.id,
3209            "run_name": run_name,
3210            "dataset_id": self.dataset_id,
3211        }
3212        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3213        self.link(
3214            trace, run_name, run_metadata=run_metadata, run_description=run_description
3215        )
3216
3217        try:
3218            import llama_index.core
3219            from llama_index.core import Settings
3220            from llama_index.core.callbacks import CallbackManager
3221
3222            from langfuse.llama_index import LlamaIndexCallbackHandler
3223
3224            callback_handler = LlamaIndexCallbackHandler(
3225                **llama_index_integration_constructor_kwargs,
3226            )
3227            callback_handler.set_root(trace, update_root=True)
3228
3229            # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler
3230            # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it
3231            prev_global_handler = llama_index.core.global_handler
3232            prev_langfuse_handler = None
3233
3234            if isinstance(prev_global_handler, LlamaIndexCallbackHandler):
3235                llama_index.core.global_handler = None
3236
3237            if Settings.callback_manager is None:
3238                Settings.callback_manager = CallbackManager([callback_handler])
3239            else:
3240                for handler in Settings.callback_manager.handlers:
3241                    if isinstance(handler, LlamaIndexCallbackHandler):
3242                        prev_langfuse_handler = handler
3243                        Settings.callback_manager.remove_handler(handler)
3244
3245                Settings.callback_manager.add_handler(callback_handler)
3246
3247        except Exception as e:
3248            self.log.exception(e)
3249
3250        try:
3251            yield callback_handler
3252        finally:
3253            # Reset the handlers
3254            Settings.callback_manager.remove_handler(callback_handler)
3255            if prev_langfuse_handler is not None:
3256                Settings.callback_manager.add_handler(prev_langfuse_handler)
3257
3258            llama_index.core.global_handler = prev_global_handler
3259
3260    def get_llama_index_handler(
3261        self,
3262        *,
3263        run_name: str,
3264        run_description: Optional[str] = None,
3265        run_metadata: Optional[Any] = None,
3266        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
3267    ):
3268        """Create and get a llama-index callback handler linked to this dataset item.
3269
3270        Args:
3271            run_name (str): The name of the dataset run to be used in the callback handler.
3272            run_description (Optional[str]): Description of the dataset run.
3273            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3274            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor.
3275
3276        Returns:
3277            LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item.
3278        """
3279        metadata = {
3280            "dataset_item_id": self.id,
3281            "run_name": run_name,
3282            "dataset_id": self.dataset_id,
3283        }
3284        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3285
3286        self.link(
3287            trace, run_name, run_metadata=run_metadata, run_description=run_description
3288        )
3289
3290        try:
3291            from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler
3292
3293            callback_handler = LlamaIndexCallbackHandler(
3294                **llama_index_integration_constructor_kwargs,
3295            )
3296            callback_handler.set_root(trace, update_root=True)
3297
3298            return callback_handler
3299        except Exception as e:
3300            self.log.exception(e)
3301
3302
3303class DatasetClient:
3304    """Class for managing datasets in Langfuse.
3305
3306    Attributes:
3307        id (str): Unique identifier of the dataset.
3308        name (str): Name of the dataset.
3309        description (Optional[str]): Description of the dataset.
3310        metadata (Optional[typing.Any]): Additional metadata of the dataset.
3311        project_id (str): Identifier of the project to which the dataset belongs.
3312        dataset_name (str): Name of the dataset.
3313        created_at (datetime): Timestamp of dataset creation.
3314        updated_at (datetime): Timestamp of the last update to the dataset.
3315        items (List[DatasetItemClient]): List of dataset items associated with the dataset.
3316        runs (List[str]): List of dataset runs associated with the dataset. Deprecated.
3317
3318    Example:
3319        Print the input of each dataset item in a dataset.
3320        ```python
3321        from langfuse import Langfuse
3322
3323        langfuse = Langfuse()
3324
3325        dataset = langfuse.get_dataset("<dataset_name>")
3326
3327        for item in dataset.items:
3328            print(item.input)
3329        ```
3330    """
3331
3332    id: str
3333    name: str
3334    description: Optional[str]
3335    project_id: str
3336    dataset_name: str  # for backward compatibility, to be deprecated
3337    metadata: Optional[Any]
3338    created_at: dt.datetime
3339    updated_at: dt.datetime
3340    items: typing.List[DatasetItemClient]
3341    runs: typing.List[str] = []  # deprecated
3342
3343    def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]):
3344        """Initialize the DatasetClient."""
3345        self.id = dataset.id
3346        self.name = dataset.name
3347        self.description = dataset.description
3348        self.project_id = dataset.project_id
3349        self.metadata = dataset.metadata
3350        self.dataset_name = dataset.name  # for backward compatibility, to be deprecated
3351        self.created_at = dataset.created_at
3352        self.updated_at = dataset.updated_at
3353        self.items = items
3354
3355
3356def _filter_io_from_event_body(event_body: Dict[str, Any]):
3357    return {
3358        k: v for k, v in event_body.items() if k not in ("input", "output", "metadata")
3359    }
@dataclass
class FetchTracesResponse:
85@dataclass
86class FetchTracesResponse:
87    """Response object for fetch_traces method."""
88
89    data: typing.List[TraceWithDetails]
90    meta: MetaResponse

Response object for fetch_traces method.

FetchTracesResponse( data: List[langfuse.api.TraceWithDetails], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
@dataclass
class FetchTraceResponse:
93@dataclass
94class FetchTraceResponse:
95    """Response object for fetch_trace method."""
96
97    data: TraceWithFullDetails

Response object for fetch_trace method.

FetchTraceResponse( data: langfuse.api.TraceWithFullDetails)
@dataclass
class FetchObservationsResponse:
100@dataclass
101class FetchObservationsResponse:
102    """Response object for fetch_observations method."""
103
104    data: typing.List[ObservationsView]
105    meta: MetaResponse

Response object for fetch_observations method.

FetchObservationsResponse( data: List[langfuse.api.ObservationsView], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
@dataclass
class FetchObservationResponse:
108@dataclass
109class FetchObservationResponse:
110    """Response object for fetch_observation method."""
111
112    data: Observation

Response object for fetch_observation method.

FetchObservationResponse(data: langfuse.api.Observation)
@dataclass
class FetchSessionsResponse:
115@dataclass
116class FetchSessionsResponse:
117    """Response object for fetch_sessions method."""
118
119    data: typing.List[Session]
120    meta: MetaResponse

Response object for fetch_sessions method.

FetchSessionsResponse( data: List[langfuse.api.Session], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.Session]
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
class Langfuse:
 123class Langfuse(object):
 124    """Langfuse Python client.
 125
 126    Attributes:
 127        log (logging.Logger): Logger for the Langfuse client.
 128        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
 129        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
 130        client (FernLangfuse): Core interface for Langfuse API interaction.
 131        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
 132        release (str): Identifies the release number or hash of the application.
 133        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
 134
 135    Example:
 136        Initiating the Langfuse client should always be first step to use Langfuse.
 137        ```python
 138        import os
 139        from langfuse import Langfuse
 140
 141        # Set the public and secret keys as environment variables
 142        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 143        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 144
 145        # Initialize the Langfuse client using the credentials
 146        langfuse = Langfuse()
 147        ```
 148    """
 149
 150    log = logging.getLogger("langfuse")
 151    """Logger for the Langfuse client."""
 152
 153    host: str
 154    """Host of Langfuse API."""
 155
 156    def __init__(
 157        self,
 158        public_key: Optional[str] = None,
 159        secret_key: Optional[str] = None,
 160        host: Optional[str] = None,
 161        release: Optional[str] = None,
 162        debug: bool = False,
 163        threads: Optional[int] = None,
 164        flush_at: Optional[int] = None,
 165        flush_interval: Optional[float] = None,
 166        max_retries: Optional[int] = None,
 167        timeout: Optional[int] = None,  # seconds
 168        sdk_integration: Optional[str] = "default",
 169        httpx_client: Optional[httpx.Client] = None,
 170        enabled: Optional[bool] = True,
 171        sample_rate: Optional[float] = None,
 172        mask: Optional[MaskFunction] = None,
 173    ):
 174        """Initialize the Langfuse client.
 175
 176        Args:
 177            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 178            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 179            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 180            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 181            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 182            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 183            flush_at: Max batch size that's sent to the API.
 184            flush_interval: Max delay until a new batch is sent to the API.
 185            max_retries: Max number of retries in case of API/network errors.
 186            timeout: Timeout of API requests in seconds. Defaults to 20 seconds.
 187            httpx_client: Pass your own httpx client for more customizability of requests.
 188            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 189            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 190            sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
 191            mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
 192
 193        Raises:
 194            ValueError: If public_key or secret_key are not set and not found in environment variables.
 195
 196        Example:
 197            Initiating the Langfuse client should always be first step to use Langfuse.
 198            ```python
 199            import os
 200            from langfuse import Langfuse
 201
 202            # Set the public and secret keys as environment variables
 203            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 204            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 205
 206            # Initialize the Langfuse client using the credentials
 207            langfuse = Langfuse()
 208            ```
 209        """
 210        self.enabled = enabled
 211        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 212        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 213        sample_rate = (
 214            sample_rate
 215            if sample_rate
 216            is not None  # needs explicit None check, as 0 is a valid value
 217            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 218        )
 219
 220        if sample_rate is not None and (
 221            sample_rate > 1 or sample_rate < 0
 222        ):  # default value 1 will be set in the taskmanager
 223            self.enabled = False
 224            self.log.warning(
 225                "Langfuse client is disabled since the sample rate provided is not between 0 and 1."
 226            )
 227
 228        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 229        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 230        flush_interval = flush_interval or float(
 231            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 232        )
 233
 234        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 235        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 236
 237        if not self.enabled:
 238            self.log.warning(
 239                "Langfuse client is disabled. No observability data will be sent."
 240            )
 241
 242        elif not public_key:
 243            self.enabled = False
 244            self.log.warning(
 245                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 246            )
 247
 248        elif not secret_key:
 249            self.enabled = False
 250            self.log.warning(
 251                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 252            )
 253
 254        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 255
 256        if set_debug is True:
 257            # Ensures that debug level messages are logged when debug mode is on.
 258            # Otherwise, defaults to WARNING level.
 259            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 260            logging.basicConfig()
 261            # Set level for all loggers under langfuse package
 262            logging.getLogger("langfuse").setLevel(logging.DEBUG)
 263
 264            clean_logger()
 265        else:
 266            logging.getLogger("langfuse").setLevel(logging.WARNING)
 267            clean_logger()
 268
 269        self.base_url = (
 270            host
 271            if host
 272            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 273        )
 274
 275        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 276
 277        self.client = FernLangfuse(
 278            base_url=self.base_url,
 279            username=public_key,
 280            password=secret_key,
 281            x_langfuse_sdk_name="python",
 282            x_langfuse_sdk_version=version,
 283            x_langfuse_public_key=public_key,
 284            httpx_client=self.httpx_client,
 285        )
 286
 287        langfuse_client = LangfuseClient(
 288            public_key=public_key,
 289            secret_key=secret_key,
 290            base_url=self.base_url,
 291            version=version,
 292            timeout=timeout,
 293            session=self.httpx_client,
 294        )
 295
 296        args = {
 297            "threads": threads,
 298            "flush_at": flush_at,
 299            "flush_interval": flush_interval,
 300            "max_retries": max_retries,
 301            "client": langfuse_client,
 302            "api_client": self.client,
 303            "public_key": public_key,
 304            "sdk_name": "python",
 305            "sdk_version": version,
 306            "sdk_integration": sdk_integration,
 307            "enabled": self.enabled,
 308            "sample_rate": sample_rate,
 309            "mask": mask,
 310        }
 311
 312        self.task_manager = TaskManager(**args)
 313
 314        self.trace_id = None
 315
 316        self.release = self._get_release_value(release)
 317
 318        self.prompt_cache = PromptCache()
 319
 320    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 321        if release:
 322            return release
 323        elif "LANGFUSE_RELEASE" in os.environ:
 324            return os.environ["LANGFUSE_RELEASE"]
 325        else:
 326            return get_common_release_envs()
 327
 328    def get_trace_id(self) -> str:
 329        """Get the current trace id."""
 330        return self.trace_id
 331
 332    def get_trace_url(self) -> str:
 333        """Get the URL of the current trace to view it in the Langfuse UI."""
 334        return f"{self.base_url}/trace/{self.trace_id}"
 335
 336    def get_dataset(
 337        self, name: str, *, fetch_items_page_size: Optional[int] = 50
 338    ) -> "DatasetClient":
 339        """Fetch a dataset by its name.
 340
 341        Args:
 342            name (str): The name of the dataset to fetch.
 343            fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
 344
 345        Returns:
 346            DatasetClient: The dataset with the given name.
 347        """
 348        try:
 349            self.log.debug(f"Getting datasets {name}")
 350            dataset = self.client.datasets.get(dataset_name=name)
 351
 352            dataset_items = []
 353            page = 1
 354            while True:
 355                new_items = self.client.dataset_items.list(
 356                    dataset_name=self._url_encode(name),
 357                    page=page,
 358                    limit=fetch_items_page_size,
 359                )
 360                dataset_items.extend(new_items.data)
 361                if new_items.meta.total_pages <= page:
 362                    break
 363                page += 1
 364
 365            items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
 366
 367            return DatasetClient(dataset, items=items)
 368        except Exception as e:
 369            handle_fern_exception(e)
 370            raise e
 371
 372    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 373        """Get the dataset item with the given id."""
 374        try:
 375            self.log.debug(f"Getting dataset item {id}")
 376            dataset_item = self.client.dataset_items.get(id=id)
 377            return DatasetItemClient(dataset_item, langfuse=self)
 378        except Exception as e:
 379            handle_fern_exception(e)
 380            raise e
 381
 382    def auth_check(self) -> bool:
 383        """Check if the provided credentials (public and secret key) are valid.
 384
 385        Raises:
 386            Exception: If no projects were found for the provided credentials.
 387
 388        Note:
 389            This method is blocking. It is discouraged to use it in production code.
 390        """
 391        try:
 392            projects = self.client.projects.get()
 393            self.log.debug(
 394                f"Auth check successful, found {len(projects.data)} projects"
 395            )
 396            if len(projects.data) == 0:
 397                raise Exception(
 398                    "Auth check failed, no project found for the keys provided."
 399                )
 400            return True
 401
 402        except Exception as e:
 403            handle_fern_exception(e)
 404            raise e
 405
 406    def get_dataset_runs(
 407        self,
 408        dataset_name: str,
 409        *,
 410        page: typing.Optional[int] = None,
 411        limit: typing.Optional[int] = None,
 412    ) -> PaginatedDatasetRuns:
 413        """Get all dataset runs.
 414
 415        Args:
 416            dataset_name (str): Name of the dataset.
 417            page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None.
 418            limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50.
 419
 420        Returns:
 421            PaginatedDatasetRuns: The dataset runs.
 422        """
 423        try:
 424            self.log.debug("Getting dataset runs")
 425            return self.client.datasets.get_runs(
 426                dataset_name=self._url_encode(dataset_name), page=page, limit=limit
 427            )
 428        except Exception as e:
 429            handle_fern_exception(e)
 430            raise e
 431
 432    def get_dataset_run(
 433        self,
 434        dataset_name: str,
 435        dataset_run_name: str,
 436    ) -> DatasetRunWithItems:
 437        """Get a dataset run.
 438
 439        Args:
 440            dataset_name: Name of the dataset.
 441            dataset_run_name: Name of the dataset run.
 442
 443        Returns:
 444            DatasetRunWithItems: The dataset run.
 445        """
 446        try:
 447            self.log.debug(
 448                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 449            )
 450            return self.client.datasets.get_run(
 451                dataset_name=self._url_encode(dataset_name),
 452                run_name=self._url_encode(dataset_run_name),
 453            )
 454        except Exception as e:
 455            handle_fern_exception(e)
 456            raise e
 457
 458    def create_dataset(
 459        self,
 460        name: str,
 461        description: Optional[str] = None,
 462        metadata: Optional[Any] = None,
 463    ) -> Dataset:
 464        """Create a dataset with the given name on Langfuse.
 465
 466        Args:
 467            name: Name of the dataset to create.
 468            description: Description of the dataset. Defaults to None.
 469            metadata: Additional metadata. Defaults to None.
 470
 471        Returns:
 472            Dataset: The created dataset as returned by the Langfuse API.
 473        """
 474        try:
 475            body = CreateDatasetRequest(
 476                name=name, description=description, metadata=metadata
 477            )
 478            self.log.debug(f"Creating datasets {body}")
 479            return self.client.datasets.create(request=body)
 480        except Exception as e:
 481            handle_fern_exception(e)
 482            raise e
 483
 484    def create_dataset_item(
 485        self,
 486        dataset_name: str,
 487        input: Optional[Any] = None,
 488        expected_output: Optional[Any] = None,
 489        metadata: Optional[Any] = None,
 490        source_trace_id: Optional[str] = None,
 491        source_observation_id: Optional[str] = None,
 492        status: Optional[DatasetStatus] = None,
 493        id: Optional[str] = None,
 494    ) -> DatasetItem:
 495        """Create a dataset item.
 496
 497        Upserts if an item with id already exists.
 498
 499        Args:
 500            dataset_name: Name of the dataset in which the dataset item should be created.
 501            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 502            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 503            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 504            source_trace_id: Id of the source trace. Defaults to None.
 505            source_observation_id: Id of the source observation. Defaults to None.
 506            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 507            id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
 508
 509        Returns:
 510            DatasetItem: The created dataset item as returned by the Langfuse API.
 511
 512        Example:
 513            ```python
 514            from langfuse import Langfuse
 515
 516            langfuse = Langfuse()
 517
 518            # Uploading items to the Langfuse dataset named "capital_cities"
 519            langfuse.create_dataset_item(
 520                dataset_name="capital_cities",
 521                input={"input": {"country": "Italy"}},
 522                expected_output={"expected_output": "Rome"},
 523                metadata={"foo": "bar"}
 524            )
 525            ```
 526        """
 527        try:
 528            body = CreateDatasetItemRequest(
 529                datasetName=dataset_name,
 530                input=input,
 531                expectedOutput=expected_output,
 532                metadata=metadata,
 533                sourceTraceId=source_trace_id,
 534                sourceObservationId=source_observation_id,
 535                status=status,
 536                id=id,
 537            )
 538            self.log.debug(f"Creating dataset item {body}")
 539            return self.client.dataset_items.create(request=body)
 540        except Exception as e:
 541            handle_fern_exception(e)
 542            raise e
 543
 544    def fetch_trace(
 545        self,
 546        id: str,
 547    ) -> FetchTraceResponse:
 548        """Fetch a trace via the Langfuse API by its id.
 549
 550        Args:
 551            id: The id of the trace to fetch.
 552
 553        Returns:
 554            FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`.
 555
 556        Raises:
 557            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 558        """
 559        try:
 560            self.log.debug(f"Getting trace {id}")
 561            trace = self.client.trace.get(id)
 562            return FetchTraceResponse(data=trace)
 563        except Exception as e:
 564            handle_fern_exception(e)
 565            raise e
 566
 567    def get_trace(
 568        self,
 569        id: str,
 570    ) -> TraceWithFullDetails:
 571        """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead.
 572
 573        Args:
 574            id: The id of the trace to fetch.
 575
 576        Returns:
 577            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 578
 579        Raises:
 580            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 581        """
 582        warnings.warn(
 583            "get_trace is deprecated, use fetch_trace instead.",
 584            DeprecationWarning,
 585        )
 586
 587        try:
 588            self.log.debug(f"Getting trace {id}")
 589            return self.client.trace.get(id)
 590        except Exception as e:
 591            handle_fern_exception(e)
 592            raise e
 593
 594    def fetch_traces(
 595        self,
 596        *,
 597        page: Optional[int] = None,
 598        limit: Optional[int] = None,
 599        user_id: Optional[str] = None,
 600        name: Optional[str] = None,
 601        session_id: Optional[str] = None,
 602        from_timestamp: Optional[dt.datetime] = None,
 603        to_timestamp: Optional[dt.datetime] = None,
 604        order_by: Optional[str] = None,
 605        tags: Optional[Union[str, Sequence[str]]] = None,
 606    ) -> FetchTracesResponse:
 607        """Fetch a list of traces in the current project matching the given parameters.
 608
 609        Args:
 610            page (Optional[int]): Page number, starts at 1. Defaults to None.
 611            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 612            name (Optional[str]): Filter by name of traces. Defaults to None.
 613            user_id (Optional[str]): Filter by user_id. Defaults to None.
 614            session_id (Optional[str]): Filter by session_id. Defaults to None.
 615            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 616            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 617            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 618            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 619
 620        Returns:
 621            FetchTracesResponse, list of traces on `data` and metadata on `meta`.
 622
 623        Raises:
 624            Exception: If an error occurred during the request.
 625        """
 626        try:
 627            self.log.debug(
 628                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 629            )
 630            res = self.client.trace.list(
 631                page=page,
 632                limit=limit,
 633                name=name,
 634                user_id=user_id,
 635                session_id=session_id,
 636                from_timestamp=from_timestamp,
 637                to_timestamp=to_timestamp,
 638                order_by=order_by,
 639                tags=tags,
 640            )
 641            return FetchTracesResponse(data=res.data, meta=res.meta)
 642        except Exception as e:
 643            handle_fern_exception(e)
 644            raise e
 645
 646    def get_traces(
 647        self,
 648        *,
 649        page: Optional[int] = None,
 650        limit: Optional[int] = None,
 651        user_id: Optional[str] = None,
 652        name: Optional[str] = None,
 653        session_id: Optional[str] = None,
 654        from_timestamp: Optional[dt.datetime] = None,
 655        to_timestamp: Optional[dt.datetime] = None,
 656        order_by: Optional[str] = None,
 657        tags: Optional[Union[str, Sequence[str]]] = None,
 658    ) -> Traces:
 659        """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead.
 660
 661        Args:
 662            page (Optional[int]): Page number, starts at 1. Defaults to None.
 663            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 664            name (Optional[str]): Filter by name of traces. Defaults to None.
 665            user_id (Optional[str]): Filter by user_id. Defaults to None.
 666            session_id (Optional[str]): Filter by session_id. Defaults to None.
 667            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 668            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 669            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 670            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 671
 672        Returns:
 673            List of Traces
 674
 675        Raises:
 676            Exception: If an error occurred during the request.
 677        """
 678        warnings.warn(
 679            "get_traces is deprecated, use fetch_traces instead.",
 680            DeprecationWarning,
 681        )
 682        try:
 683            self.log.debug(
 684                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 685            )
 686            return self.client.trace.list(
 687                page=page,
 688                limit=limit,
 689                name=name,
 690                user_id=user_id,
 691                session_id=session_id,
 692                from_timestamp=from_timestamp,
 693                to_timestamp=to_timestamp,
 694                order_by=order_by,
 695                tags=tags,
 696            )
 697        except Exception as e:
 698            handle_fern_exception(e)
 699            raise e
 700
 701    def fetch_observations(
 702        self,
 703        *,
 704        page: typing.Optional[int] = None,
 705        limit: typing.Optional[int] = None,
 706        name: typing.Optional[str] = None,
 707        user_id: typing.Optional[str] = None,
 708        trace_id: typing.Optional[str] = None,
 709        parent_observation_id: typing.Optional[str] = None,
 710        from_start_time: typing.Optional[dt.datetime] = None,
 711        to_start_time: typing.Optional[dt.datetime] = None,
 712        type: typing.Optional[str] = None,
 713    ) -> FetchObservationsResponse:
 714        """Get a list of observations in the current project matching the given parameters.
 715
 716        Args:
 717            page (Optional[int]): Page number of the observations to return. Defaults to None.
 718            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 719            name (Optional[str]): Name of the observations to return. Defaults to None.
 720            user_id (Optional[str]): User identifier. Defaults to None.
 721            trace_id (Optional[str]): Trace identifier. Defaults to None.
 722            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 723            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 724            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 725            type (Optional[str]): Type of the observation. Defaults to None.
 726
 727        Returns:
 728            FetchObservationsResponse, list of observations on `data` and metadata on `meta`.
 729
 730        Raises:
 731            Exception: If an error occurred during the request.
 732        """
 733        try:
 734            self.log.debug(
 735                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 736            )
 737            res = self.client.observations.get_many(
 738                page=page,
 739                limit=limit,
 740                name=name,
 741                user_id=user_id,
 742                trace_id=trace_id,
 743                parent_observation_id=parent_observation_id,
 744                from_start_time=from_start_time,
 745                to_start_time=to_start_time,
 746                type=type,
 747            )
 748            return FetchObservationsResponse(data=res.data, meta=res.meta)
 749        except Exception as e:
 750            self.log.exception(e)
 751            raise e
 752
 753    def get_observations(
 754        self,
 755        *,
 756        page: typing.Optional[int] = None,
 757        limit: typing.Optional[int] = None,
 758        name: typing.Optional[str] = None,
 759        user_id: typing.Optional[str] = None,
 760        trace_id: typing.Optional[str] = None,
 761        parent_observation_id: typing.Optional[str] = None,
 762        from_start_time: typing.Optional[dt.datetime] = None,
 763        to_start_time: typing.Optional[dt.datetime] = None,
 764        type: typing.Optional[str] = None,
 765    ) -> ObservationsViews:
 766        """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead.
 767
 768        Args:
 769            page (Optional[int]): Page number of the observations to return. Defaults to None.
 770            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 771            name (Optional[str]): Name of the observations to return. Defaults to None.
 772            user_id (Optional[str]): User identifier. Defaults to None.
 773            trace_id (Optional[str]): Trace identifier. Defaults to None.
 774            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 775            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 776            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 777            type (Optional[str]): Type of the observation. Defaults to None.
 778
 779        Returns:
 780            List of ObservationsViews: List of observations in the project matching the given parameters.
 781
 782        Raises:
 783            Exception: If an error occurred during the request.
 784        """
 785        warnings.warn(
 786            "get_observations is deprecated, use fetch_observations instead.",
 787            DeprecationWarning,
 788        )
 789        try:
 790            self.log.debug(
 791                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 792            )
 793            return self.client.observations.get_many(
 794                page=page,
 795                limit=limit,
 796                name=name,
 797                user_id=user_id,
 798                trace_id=trace_id,
 799                parent_observation_id=parent_observation_id,
 800                from_start_time=from_start_time,
 801                to_start_time=to_start_time,
 802                type=type,
 803            )
 804        except Exception as e:
 805            handle_fern_exception(e)
 806            raise e
 807
 808    def get_generations(
 809        self,
 810        *,
 811        page: typing.Optional[int] = None,
 812        limit: typing.Optional[int] = None,
 813        name: typing.Optional[str] = None,
 814        user_id: typing.Optional[str] = None,
 815        trace_id: typing.Optional[str] = None,
 816        from_start_time: typing.Optional[dt.datetime] = None,
 817        to_start_time: typing.Optional[dt.datetime] = None,
 818        parent_observation_id: typing.Optional[str] = None,
 819    ) -> ObservationsViews:
 820        """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead.
 821
 822        Args:
 823            page (Optional[int]): Page number of the generations to return. Defaults to None.
 824            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 825            name (Optional[str]): Name of the generations to return. Defaults to None.
 826            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 827            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 828            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 829            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 830            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 831
 832        Returns:
 833            List of ObservationsViews: List of generations in the project matching the given parameters.
 834
 835        Raises:
 836            Exception: If an error occurred during the request.
 837        """
 838        warnings.warn(
 839            "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.",
 840            DeprecationWarning,
 841        )
 842        return self.get_observations(
 843            page=page,
 844            limit=limit,
 845            name=name,
 846            user_id=user_id,
 847            trace_id=trace_id,
 848            parent_observation_id=parent_observation_id,
 849            from_start_time=from_start_time,
 850            to_start_time=to_start_time,
 851            type="GENERATION",
 852        )
 853
 854    def fetch_observation(
 855        self,
 856        id: str,
 857    ) -> FetchObservationResponse:
 858        """Get an observation in the current project with the given identifier.
 859
 860        Args:
 861            id: The identifier of the observation to fetch.
 862
 863        Returns:
 864            FetchObservationResponse: The observation with the given id on `data`.
 865
 866        Raises:
 867            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 868        """
 869        try:
 870            self.log.debug(f"Getting observation {id}")
 871            observation = self.client.observations.get(id)
 872            return FetchObservationResponse(data=observation)
 873        except Exception as e:
 874            handle_fern_exception(e)
 875            raise e
 876
 877    def get_observation(
 878        self,
 879        id: str,
 880    ) -> Observation:
 881        """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead.
 882
 883        Args:
 884            id: The identifier of the observation to fetch.
 885
 886        Raises:
 887            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 888        """
 889        warnings.warn(
 890            "get_observation is deprecated, use fetch_observation instead.",
 891            DeprecationWarning,
 892        )
 893        try:
 894            self.log.debug(f"Getting observation {id}")
 895            return self.client.observations.get(id)
 896        except Exception as e:
 897            handle_fern_exception(e)
 898            raise e
 899
 900    def fetch_sessions(
 901        self,
 902        *,
 903        page: typing.Optional[int] = None,
 904        limit: typing.Optional[int] = None,
 905        from_timestamp: typing.Optional[dt.datetime] = None,
 906        to_timestamp: typing.Optional[dt.datetime] = None,
 907    ) -> FetchSessionsResponse:
 908        """Get a list of sessions in the current project.
 909
 910        Args:
 911            page (Optional[int]): Page number of the sessions to return. Defaults to None.
 912            limit (Optional[int]): Maximum number of sessions to return. Defaults to None.
 913            from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None.
 914            to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None.
 915
 916        Returns:
 917            FetchSessionsResponse, list of sessions on `data` and metadata on `meta`.
 918
 919        Raises:
 920            Exception: If an error occurred during the request.
 921        """
 922        try:
 923            self.log.debug(
 924                f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}"
 925            )
 926            res = self.client.sessions.list(
 927                page=page,
 928                limit=limit,
 929                from_timestamp=from_timestamp,
 930                to_timestamp=to_timestamp,
 931            )
 932            return FetchSessionsResponse(data=res.data, meta=res.meta)
 933        except Exception as e:
 934            handle_fern_exception(e)
 935            raise e
 936
 937    @overload
 938    def get_prompt(
 939        self,
 940        name: str,
 941        version: Optional[int] = None,
 942        *,
 943        label: Optional[str] = None,
 944        type: Literal["chat"],
 945        cache_ttl_seconds: Optional[int] = None,
 946        fallback: Optional[List[ChatMessageDict]] = None,
 947        max_retries: Optional[int] = None,
 948        fetch_timeout_seconds: Optional[int] = None,
 949    ) -> ChatPromptClient: ...
 950
 951    @overload
 952    def get_prompt(
 953        self,
 954        name: str,
 955        version: Optional[int] = None,
 956        *,
 957        label: Optional[str] = None,
 958        type: Literal["text"] = "text",
 959        cache_ttl_seconds: Optional[int] = None,
 960        fallback: Optional[str] = None,
 961        max_retries: Optional[int] = None,
 962        fetch_timeout_seconds: Optional[int] = None,
 963    ) -> TextPromptClient: ...
 964
 965    def get_prompt(
 966        self,
 967        name: str,
 968        version: Optional[int] = None,
 969        *,
 970        label: Optional[str] = None,
 971        type: Literal["chat", "text"] = "text",
 972        cache_ttl_seconds: Optional[int] = None,
 973        fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None,
 974        max_retries: Optional[int] = None,
 975        fetch_timeout_seconds: Optional[int] = None,
 976    ) -> PromptClient:
 977        """Get a prompt.
 978
 979        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 980        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 981        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 982        return the expired prompt as a fallback.
 983
 984        Args:
 985            name (str): The name of the prompt to retrieve.
 986
 987        Keyword Args:
 988            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 989            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 990            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 991            keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0.
 992            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 993            fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None.
 994            max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds.
 995            fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default.
 996
 997        Returns:
 998            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
 999            - TextPromptClient, if type argument is 'text'.
1000            - ChatPromptClient, if type argument is 'chat'.
1001
1002        Raises:
1003            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
1004            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
1005        """
1006        if version is not None and label is not None:
1007            raise ValueError("Cannot specify both version and label at the same time.")
1008
1009        if not name:
1010            raise ValueError("Prompt name cannot be empty.")
1011
1012        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
1013        bounded_max_retries = self._get_bounded_max_retries(
1014            max_retries, default_max_retries=2, max_retries_upper_bound=4
1015        )
1016
1017        self.log.debug(f"Getting prompt '{cache_key}'")
1018        cached_prompt = self.prompt_cache.get(cache_key)
1019
1020        if cached_prompt is None or cache_ttl_seconds == 0:
1021            self.log.debug(
1022                f"Prompt '{cache_key}' not found in cache or caching disabled."
1023            )
1024            try:
1025                return self._fetch_prompt_and_update_cache(
1026                    name,
1027                    version=version,
1028                    label=label,
1029                    ttl_seconds=cache_ttl_seconds,
1030                    max_retries=bounded_max_retries,
1031                    fetch_timeout_seconds=fetch_timeout_seconds,
1032                )
1033            except Exception as e:
1034                if fallback:
1035                    self.log.warning(
1036                        f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}"
1037                    )
1038
1039                    fallback_client_args = {
1040                        "name": name,
1041                        "prompt": fallback,
1042                        "type": type,
1043                        "version": version or 0,
1044                        "config": {},
1045                        "labels": [label] if label else [],
1046                        "tags": [],
1047                    }
1048
1049                    if type == "text":
1050                        return TextPromptClient(
1051                            prompt=Prompt_Text(**fallback_client_args),
1052                            is_fallback=True,
1053                        )
1054
1055                    if type == "chat":
1056                        return ChatPromptClient(
1057                            prompt=Prompt_Chat(**fallback_client_args),
1058                            is_fallback=True,
1059                        )
1060
1061                raise e
1062
1063        if cached_prompt.is_expired():
1064            self.log.debug(f"Stale prompt '{cache_key}' found in cache.")
1065            try:
1066                # refresh prompt in background thread, refresh_prompt deduplicates tasks
1067                self.log.debug(f"Refreshing prompt '{cache_key}' in background.")
1068                self.prompt_cache.add_refresh_prompt_task(
1069                    cache_key,
1070                    lambda: self._fetch_prompt_and_update_cache(
1071                        name,
1072                        version=version,
1073                        label=label,
1074                        ttl_seconds=cache_ttl_seconds,
1075                        max_retries=bounded_max_retries,
1076                        fetch_timeout_seconds=fetch_timeout_seconds,
1077                    ),
1078                )
1079                self.log.debug(f"Returning stale prompt '{cache_key}' from cache.")
1080                # return stale prompt
1081                return cached_prompt.value
1082
1083            except Exception as e:
1084                self.log.warning(
1085                    f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}"
1086                )
1087                # creation of refresh prompt task failed, return stale prompt
1088                return cached_prompt.value
1089
1090        return cached_prompt.value
1091
1092    def _fetch_prompt_and_update_cache(
1093        self,
1094        name: str,
1095        *,
1096        version: Optional[int] = None,
1097        label: Optional[str] = None,
1098        ttl_seconds: Optional[int] = None,
1099        max_retries: int,
1100        fetch_timeout_seconds,
1101    ) -> PromptClient:
1102        try:
1103            cache_key = PromptCache.generate_cache_key(
1104                name, version=version, label=label
1105            )
1106
1107            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
1108
1109            @backoff.on_exception(
1110                backoff.constant, Exception, max_tries=max_retries, logger=None
1111            )
1112            def fetch_prompts():
1113                return self.client.prompts.get(
1114                    self._url_encode(name),
1115                    version=version,
1116                    label=label,
1117                    request_options={
1118                        "timeout_in_seconds": fetch_timeout_seconds,
1119                    }
1120                    if fetch_timeout_seconds is not None
1121                    else None,
1122                )
1123
1124            prompt_response = fetch_prompts()
1125
1126            if prompt_response.type == "chat":
1127                prompt = ChatPromptClient(prompt_response)
1128            else:
1129                prompt = TextPromptClient(prompt_response)
1130
1131            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
1132
1133            return prompt
1134
1135        except Exception as e:
1136            self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}")
1137            raise e
1138
1139    def _get_bounded_max_retries(
1140        self,
1141        max_retries: Optional[int],
1142        *,
1143        default_max_retries: int = 2,
1144        max_retries_upper_bound: int = 4,
1145    ) -> int:
1146        if max_retries is None:
1147            return default_max_retries
1148
1149        bounded_max_retries = min(
1150            max(max_retries, 0),
1151            max_retries_upper_bound,
1152        )
1153
1154        return bounded_max_retries
1155
1156    @overload
1157    def create_prompt(
1158        self,
1159        *,
1160        name: str,
1161        prompt: List[ChatMessageDict],
1162        is_active: Optional[bool] = None,  # deprecated
1163        labels: List[str] = [],
1164        tags: Optional[List[str]] = None,
1165        type: Optional[Literal["chat"]],
1166        config: Optional[Any] = None,
1167    ) -> ChatPromptClient: ...
1168
1169    @overload
1170    def create_prompt(
1171        self,
1172        *,
1173        name: str,
1174        prompt: str,
1175        is_active: Optional[bool] = None,  # deprecated
1176        labels: List[str] = [],
1177        tags: Optional[List[str]] = None,
1178        type: Optional[Literal["text"]] = "text",
1179        config: Optional[Any] = None,
1180    ) -> TextPromptClient: ...
1181
1182    def create_prompt(
1183        self,
1184        *,
1185        name: str,
1186        prompt: Union[str, List[ChatMessageDict]],
1187        is_active: Optional[bool] = None,  # deprecated
1188        labels: List[str] = [],
1189        tags: Optional[List[str]] = None,
1190        type: Optional[Literal["chat", "text"]] = "text",
1191        config: Optional[Any] = None,
1192    ) -> PromptClient:
1193        """Create a new prompt in Langfuse.
1194
1195        Keyword Args:
1196            name : The name of the prompt to be created.
1197            prompt : The content of the prompt to be created.
1198            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
1199            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
1200            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
1201            config: Additional structured data to be saved with the prompt. Defaults to None.
1202            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
1203
1204        Returns:
1205            TextPromptClient: The prompt if type argument is 'text'.
1206            ChatPromptClient: The prompt if type argument is 'chat'.
1207        """
1208        try:
1209            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
1210
1211            # Handle deprecated is_active flag
1212            if is_active:
1213                self.log.warning(
1214                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
1215                )
1216
1217                labels = labels if "production" in labels else labels + ["production"]
1218
1219            if type == "chat":
1220                if not isinstance(prompt, list):
1221                    raise ValueError(
1222                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
1223                    )
1224                request = CreatePromptRequest_Chat(
1225                    name=name,
1226                    prompt=prompt,
1227                    labels=labels,
1228                    tags=tags,
1229                    config=config or {},
1230                    type="chat",
1231                )
1232                server_prompt = self.client.prompts.create(request=request)
1233
1234                return ChatPromptClient(prompt=server_prompt)
1235
1236            if not isinstance(prompt, str):
1237                raise ValueError("For 'text' type, 'prompt' must be a string.")
1238
1239            request = CreatePromptRequest_Text(
1240                name=name,
1241                prompt=prompt,
1242                labels=labels,
1243                tags=tags,
1244                config=config or {},
1245                type="text",
1246            )
1247
1248            server_prompt = self.client.prompts.create(request=request)
1249            return TextPromptClient(prompt=server_prompt)
1250
1251        except Exception as e:
1252            handle_fern_exception(e)
1253            raise e
1254
1255    def _url_encode(self, url: str) -> str:
1256        return urllib.parse.quote(url)
1257
1258    def trace(
1259        self,
1260        *,
1261        id: typing.Optional[str] = None,
1262        name: typing.Optional[str] = None,
1263        user_id: typing.Optional[str] = None,
1264        session_id: typing.Optional[str] = None,
1265        version: typing.Optional[str] = None,
1266        input: typing.Optional[typing.Any] = None,
1267        output: typing.Optional[typing.Any] = None,
1268        metadata: typing.Optional[typing.Any] = None,
1269        tags: typing.Optional[typing.List[str]] = None,
1270        timestamp: typing.Optional[dt.datetime] = None,
1271        public: typing.Optional[bool] = None,
1272        **kwargs,
1273    ) -> "StatefulTraceClient":
1274        """Create a trace.
1275
1276        Args:
1277            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
1278            name: Identifier of the trace. Useful for sorting/filtering in the UI.
1279            input: The input of the trace. Can be any JSON object.
1280            output: The output of the trace. Can be any JSON object.
1281            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
1282            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
1283            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
1284            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
1285            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
1286            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
1287            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
1288            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
1289            **kwargs: Additional keyword arguments that can be included in the trace.
1290
1291        Returns:
1292            StatefulTraceClient: The created trace.
1293
1294        Example:
1295            ```python
1296            from langfuse import Langfuse
1297
1298            langfuse = Langfuse()
1299
1300            trace = langfuse.trace(
1301                name="example-application",
1302                user_id="user-1234")
1303            )
1304            ```
1305        """
1306        new_id = id or str(uuid.uuid4())
1307        self.trace_id = new_id
1308        try:
1309            new_dict = {
1310                "id": new_id,
1311                "name": name,
1312                "userId": user_id,
1313                "sessionId": session_id
1314                or kwargs.get("sessionId", None),  # backward compatibility
1315                "release": self.release,
1316                "version": version,
1317                "metadata": metadata,
1318                "input": input,
1319                "output": output,
1320                "tags": tags,
1321                "timestamp": timestamp or _get_timestamp(),
1322                "public": public,
1323            }
1324            if kwargs is not None:
1325                new_dict.update(kwargs)
1326
1327            new_body = TraceBody(**new_dict)
1328
1329            self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}")
1330            event = {
1331                "id": str(uuid.uuid4()),
1332                "type": "trace-create",
1333                "body": new_body,
1334            }
1335
1336            self.task_manager.add_task(
1337                event,
1338            )
1339
1340        except Exception as e:
1341            self.log.exception(e)
1342        finally:
1343            self._log_memory_usage()
1344
1345            return StatefulTraceClient(
1346                self.client, new_id, StateType.TRACE, new_id, self.task_manager
1347            )
1348
1349    def _log_memory_usage(self):
1350        try:
1351            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
1352            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
1353            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
1354
1355            if (
1356                not is_malloc_tracing_enabled
1357                or report_interval <= 0
1358                or round(time.monotonic()) % report_interval != 0
1359            ):
1360                return
1361
1362            snapshot = tracemalloc.take_snapshot().statistics("lineno")
1363
1364            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
1365            me