langfuse.client

   1from contextlib import contextmanager
   2import datetime as dt
   3import logging
   4import os
   5import typing
   6import uuid
   7import backoff
   8import httpx
   9from enum import Enum
  10import time
  11import tracemalloc
  12from typing import (
  13    Any,
  14    Dict,
  15    Optional,
  16    Literal,
  17    Union,
  18    List,
  19    Sequence,
  20    overload,
  21)
  22import urllib.parse
  23import warnings
  24from dataclasses import dataclass
  25
  26
  27from langfuse.api.resources.commons.types.dataset_run_with_items import (
  28    DatasetRunWithItems,
  29)
  30from langfuse.api.resources.commons.types.observations_view import ObservationsView
  31from langfuse.api.resources.commons.types.session import Session
  32from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails
  33from langfuse.api.resources.datasets.types.paginated_dataset_runs import (
  34    PaginatedDatasetRuns,
  35)
  36from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody
  37from langfuse.api.resources.ingestion.types.create_generation_body import (
  38    CreateGenerationBody,
  39)
  40from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody
  41from langfuse.api.resources.ingestion.types.score_body import ScoreBody
  42from langfuse.api.resources.ingestion.types.trace_body import TraceBody
  43from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
  44from langfuse.api.resources.ingestion.types.update_generation_body import (
  45    UpdateGenerationBody,
  46)
  47from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody
  48from langfuse.api.resources.observations.types.observations_views import (
  49    ObservationsViews,
  50)
  51from langfuse.api.resources.prompts.types import (
  52    CreatePromptRequest_Chat,
  53    CreatePromptRequest_Text,
  54    Prompt_Text,
  55    Prompt_Chat,
  56)
  57from langfuse.api.resources.trace.types.traces import Traces
  58from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
  59    MetaResponse,
  60)
  61from langfuse.model import (
  62    CreateDatasetItemRequest,
  63    CreateDatasetRequest,
  64    CreateDatasetRunItemRequest,
  65    ChatMessageDict,
  66    DatasetItem,
  67    DatasetStatus,
  68    ModelUsage,
  69    PromptClient,
  70    ChatPromptClient,
  71    TextPromptClient,
  72)
  73from langfuse.parse_error import (
  74    handle_fern_exception,
  75)
  76from langfuse.prompt_cache import PromptCache
  77
  78try:
  79    import pydantic.v1 as pydantic  # type: ignore
  80except ImportError:
  81    import pydantic  # type: ignore
  82
  83from langfuse.api.client import FernLangfuse
  84from langfuse.environment import get_common_release_envs
  85from langfuse.logging import clean_logger
  86from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
  87from langfuse.request import LangfuseClient
  88from langfuse.task_manager import TaskManager
  89from langfuse.types import SpanLevel, ScoreDataType
  90from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
  91
  92from .version import __version__ as version
  93
  94
  95@dataclass
  96class FetchTracesResponse:
  97    """Response object for fetch_traces method."""
  98
  99    data: typing.List[TraceWithDetails]
 100    meta: MetaResponse
 101
 102
 103@dataclass
 104class FetchTraceResponse:
 105    """Response object for fetch_trace method."""
 106
 107    data: TraceWithFullDetails
 108
 109
 110@dataclass
 111class FetchObservationsResponse:
 112    """Response object for fetch_observations method."""
 113
 114    data: typing.List[ObservationsView]
 115    meta: MetaResponse
 116
 117
 118@dataclass
 119class FetchObservationResponse:
 120    """Response object for fetch_observation method."""
 121
 122    data: Observation
 123
 124
 125@dataclass
 126class FetchSessionsResponse:
 127    """Response object for fetch_sessions method."""
 128
 129    data: typing.List[Session]
 130    meta: MetaResponse
 131
 132
 133class Langfuse(object):
 134    """Langfuse Python client.
 135
 136    Attributes:
 137        log (logging.Logger): Logger for the Langfuse client.
 138        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
 139        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
 140        client (FernLangfuse): Core interface for Langfuse API interaction.
 141        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
 142        release (str): Identifies the release number or hash of the application.
 143        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
 144
 145    Example:
 146        Initiating the Langfuse client should always be first step to use Langfuse.
 147        ```python
 148        import os
 149        from langfuse import Langfuse
 150
 151        # Set the public and secret keys as environment variables
 152        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 153        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 154
 155        # Initialize the Langfuse client using the credentials
 156        langfuse = Langfuse()
 157        ```
 158    """
 159
 160    log = logging.getLogger("langfuse")
 161    """Logger for the Langfuse client."""
 162
 163    host: str
 164    """Host of Langfuse API."""
 165
 166    def __init__(
 167        self,
 168        public_key: Optional[str] = None,
 169        secret_key: Optional[str] = None,
 170        host: Optional[str] = None,
 171        release: Optional[str] = None,
 172        debug: bool = False,
 173        threads: Optional[int] = None,
 174        flush_at: Optional[int] = None,
 175        flush_interval: Optional[float] = None,
 176        max_retries: Optional[int] = None,
 177        timeout: Optional[int] = None,  # seconds
 178        sdk_integration: Optional[str] = "default",
 179        httpx_client: Optional[httpx.Client] = None,
 180        enabled: Optional[bool] = True,
 181        sample_rate: Optional[float] = None,
 182    ):
 183        """Initialize the Langfuse client.
 184
 185        Args:
 186            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 187            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 188            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 189            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 190            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 191            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 192            flush_at: Max batch size that's sent to the API.
 193            flush_interval: Max delay until a new batch is sent to the API.
 194            max_retries: Max number of retries in case of API/network errors.
 195            timeout: Timeout of API requests in seconds. Defaults to 20 seconds.
 196            httpx_client: Pass your own httpx client for more customizability of requests.
 197            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 198            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 199            sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
 200
 201        Raises:
 202            ValueError: If public_key or secret_key are not set and not found in environment variables.
 203
 204        Example:
 205            Initiating the Langfuse client should always be first step to use Langfuse.
 206            ```python
 207            import os
 208            from langfuse import Langfuse
 209
 210            # Set the public and secret keys as environment variables
 211            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 212            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 213
 214            # Initialize the Langfuse client using the credentials
 215            langfuse = Langfuse()
 216            ```
 217        """
 218        self.enabled = enabled
 219        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 220        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 221        sample_rate = (
 222            sample_rate
 223            if sample_rate
 224            is not None  # needs explicit None check, as 0 is a valid value
 225            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 226        )
 227
 228        if sample_rate is not None and (
 229            sample_rate > 1 or sample_rate < 0
 230        ):  # default value 1 will be set in the taskmanager
 231            self.enabled = False
 232            self.log.warning(
 233                "Langfuse client is disabled since the sample rate provided is not between 0 and 1."
 234            )
 235
 236        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 237        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 238        flush_interval = flush_interval or float(
 239            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 240        )
 241
 242        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 243        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 244
 245        if not self.enabled:
 246            self.log.warning(
 247                "Langfuse client is disabled. No observability data will be sent."
 248            )
 249
 250        elif not public_key:
 251            self.enabled = False
 252            self.log.warning(
 253                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 254            )
 255
 256        elif not secret_key:
 257            self.enabled = False
 258            self.log.warning(
 259                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 260            )
 261
 262        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 263
 264        if set_debug is True:
 265            # Ensures that debug level messages are logged when debug mode is on.
 266            # Otherwise, defaults to WARNING level.
 267            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 268            logging.basicConfig()
 269            self.log.setLevel(logging.DEBUG)
 270
 271            clean_logger()
 272        else:
 273            self.log.setLevel(logging.WARNING)
 274            clean_logger()
 275
 276        self.base_url = (
 277            host
 278            if host
 279            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 280        )
 281
 282        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 283
 284        self.client = FernLangfuse(
 285            base_url=self.base_url,
 286            username=public_key,
 287            password=secret_key,
 288            x_langfuse_sdk_name="python",
 289            x_langfuse_sdk_version=version,
 290            x_langfuse_public_key=public_key,
 291            httpx_client=self.httpx_client,
 292        )
 293
 294        langfuse_client = LangfuseClient(
 295            public_key=public_key,
 296            secret_key=secret_key,
 297            base_url=self.base_url,
 298            version=version,
 299            timeout=timeout,
 300            session=self.httpx_client,
 301        )
 302
 303        args = {
 304            "threads": threads,
 305            "flush_at": flush_at,
 306            "flush_interval": flush_interval,
 307            "max_retries": max_retries,
 308            "client": langfuse_client,
 309            "public_key": public_key,
 310            "sdk_name": "python",
 311            "sdk_version": version,
 312            "sdk_integration": sdk_integration,
 313            "enabled": self.enabled,
 314            "sample_rate": sample_rate,
 315        }
 316
 317        self.task_manager = TaskManager(**args)
 318
 319        self.trace_id = None
 320
 321        self.release = self._get_release_value(release)
 322
 323        self.prompt_cache = PromptCache()
 324
 325    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 326        if release:
 327            return release
 328        elif "LANGFUSE_RELEASE" in os.environ:
 329            return os.environ["LANGFUSE_RELEASE"]
 330        else:
 331            return get_common_release_envs()
 332
 333    def get_trace_id(self) -> str:
 334        """Get the current trace id."""
 335        return self.trace_id
 336
 337    def get_trace_url(self) -> str:
 338        """Get the URL of the current trace to view it in the Langfuse UI."""
 339        return f"{self.base_url}/trace/{self.trace_id}"
 340
 341    def get_dataset(
 342        self, name: str, *, fetch_items_page_size: Optional[int] = 50
 343    ) -> "DatasetClient":
 344        """Fetch a dataset by its name.
 345
 346        Args:
 347            name (str): The name of the dataset to fetch.
 348            fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
 349
 350        Returns:
 351            DatasetClient: The dataset with the given name.
 352        """
 353        try:
 354            self.log.debug(f"Getting datasets {name}")
 355            dataset = self.client.datasets.get(dataset_name=name)
 356
 357            dataset_items = []
 358            page = 1
 359            while True:
 360                new_items = self.client.dataset_items.list(
 361                    dataset_name=name, page=page, limit=fetch_items_page_size
 362                )
 363                dataset_items.extend(new_items.data)
 364                if new_items.meta.total_pages <= page:
 365                    break
 366                page += 1
 367
 368            items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
 369
 370            return DatasetClient(dataset, items=items)
 371        except Exception as e:
 372            handle_fern_exception(e)
 373            raise e
 374
 375    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 376        """Get the dataset item with the given id."""
 377        try:
 378            self.log.debug(f"Getting dataset item {id}")
 379            dataset_item = self.client.dataset_items.get(id=id)
 380            return DatasetItemClient(dataset_item, langfuse=self)
 381        except Exception as e:
 382            handle_fern_exception(e)
 383            raise e
 384
 385    def auth_check(self) -> bool:
 386        """Check if the provided credentials (public and secret key) are valid.
 387
 388        Raises:
 389            Exception: If no projects were found for the provided credentials.
 390
 391        Note:
 392            This method is blocking. It is discouraged to use it in production code.
 393        """
 394        try:
 395            projects = self.client.projects.get()
 396            self.log.debug(
 397                f"Auth check successful, found {len(projects.data)} projects"
 398            )
 399            if len(projects.data) == 0:
 400                raise Exception(
 401                    "Auth check failed, no project found for the keys provided."
 402                )
 403            return True
 404
 405        except Exception as e:
 406            handle_fern_exception(e)
 407            raise e
 408
 409    def get_dataset_runs(
 410        self,
 411        dataset_name: str,
 412        *,
 413        page: typing.Optional[int] = None,
 414        limit: typing.Optional[int] = None,
 415    ) -> PaginatedDatasetRuns:
 416        """Get all dataset runs.
 417
 418        Args:
 419            dataset_name (str): Name of the dataset.
 420            page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None.
 421            limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50.
 422
 423        Returns:
 424            PaginatedDatasetRuns: The dataset runs.
 425        """
 426        try:
 427            self.log.debug("Getting dataset runs")
 428            return self.client.datasets.get_runs(
 429                dataset_name=dataset_name, page=page, limit=limit
 430            )
 431        except Exception as e:
 432            handle_fern_exception(e)
 433            raise e
 434
 435    def get_dataset_run(
 436        self,
 437        dataset_name: str,
 438        dataset_run_name: str,
 439    ) -> DatasetRunWithItems:
 440        """Get a dataset run.
 441
 442        Args:
 443            dataset_name: Name of the dataset.
 444            dataset_run_name: Name of the dataset run.
 445
 446        Returns:
 447            DatasetRunWithItems: The dataset run.
 448        """
 449        try:
 450            self.log.debug(
 451                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 452            )
 453            return self.client.datasets.get_run(
 454                dataset_name=dataset_name, run_name=dataset_run_name
 455            )
 456        except Exception as e:
 457            handle_fern_exception(e)
 458            raise e
 459
 460    def create_dataset(
 461        self,
 462        name: str,
 463        description: Optional[str] = None,
 464        metadata: Optional[Any] = None,
 465    ) -> Dataset:
 466        """Create a dataset with the given name on Langfuse.
 467
 468        Args:
 469            name: Name of the dataset to create.
 470            description: Description of the dataset. Defaults to None.
 471            metadata: Additional metadata. Defaults to None.
 472
 473        Returns:
 474            Dataset: The created dataset as returned by the Langfuse API.
 475        """
 476        try:
 477            body = CreateDatasetRequest(
 478                name=name, description=description, metadata=metadata
 479            )
 480            self.log.debug(f"Creating datasets {body}")
 481            return self.client.datasets.create(request=body)
 482        except Exception as e:
 483            handle_fern_exception(e)
 484            raise e
 485
 486    def create_dataset_item(
 487        self,
 488        dataset_name: str,
 489        input: Optional[Any] = None,
 490        expected_output: Optional[Any] = None,
 491        metadata: Optional[Any] = None,
 492        source_trace_id: Optional[str] = None,
 493        source_observation_id: Optional[str] = None,
 494        status: Optional[DatasetStatus] = None,
 495        id: Optional[str] = None,
 496    ) -> DatasetItem:
 497        """Create a dataset item.
 498
 499        Upserts if an item with id already exists.
 500
 501        Args:
 502            dataset_name: Name of the dataset in which the dataset item should be created.
 503            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 504            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 505            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 506            source_trace_id: Id of the source trace. Defaults to None.
 507            source_observation_id: Id of the source observation. Defaults to None.
 508            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 509            id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
 510
 511        Returns:
 512            DatasetItem: The created dataset item as returned by the Langfuse API.
 513
 514        Example:
 515            ```python
 516            from langfuse import Langfuse
 517
 518            langfuse = Langfuse()
 519
 520            # Uploading items to the Langfuse dataset named "capital_cities"
 521            langfuse.create_dataset_item(
 522                dataset_name="capital_cities",
 523                input={"input": {"country": "Italy"}},
 524                expected_output={"expected_output": "Rome"},
 525                metadata={"foo": "bar"}
 526            )
 527            ```
 528        """
 529        try:
 530            body = CreateDatasetItemRequest(
 531                datasetName=dataset_name,
 532                input=input,
 533                expectedOutput=expected_output,
 534                metadata=metadata,
 535                sourceTraceId=source_trace_id,
 536                sourceObservationId=source_observation_id,
 537                status=status,
 538                id=id,
 539            )
 540            self.log.debug(f"Creating dataset item {body}")
 541            return self.client.dataset_items.create(request=body)
 542        except Exception as e:
 543            handle_fern_exception(e)
 544            raise e
 545
 546    def fetch_trace(
 547        self,
 548        id: str,
 549    ) -> FetchTraceResponse:
 550        """Fetch a trace via the Langfuse API by its id.
 551
 552        Args:
 553            id: The id of the trace to fetch.
 554
 555        Returns:
 556            FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`.
 557
 558        Raises:
 559            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 560        """
 561        try:
 562            self.log.debug(f"Getting trace {id}")
 563            trace = self.client.trace.get(id)
 564            return FetchTraceResponse(data=trace)
 565        except Exception as e:
 566            handle_fern_exception(e)
 567            raise e
 568
 569    def get_trace(
 570        self,
 571        id: str,
 572    ) -> TraceWithFullDetails:
 573        """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead.
 574
 575        Args:
 576            id: The id of the trace to fetch.
 577
 578        Returns:
 579            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 580
 581        Raises:
 582            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 583        """
 584        warnings.warn(
 585            "get_trace is deprecated, use fetch_trace instead.",
 586            DeprecationWarning,
 587        )
 588
 589        try:
 590            self.log.debug(f"Getting trace {id}")
 591            return self.client.trace.get(id)
 592        except Exception as e:
 593            handle_fern_exception(e)
 594            raise e
 595
 596    def fetch_traces(
 597        self,
 598        *,
 599        page: Optional[int] = None,
 600        limit: Optional[int] = None,
 601        user_id: Optional[str] = None,
 602        name: Optional[str] = None,
 603        session_id: Optional[str] = None,
 604        from_timestamp: Optional[dt.datetime] = None,
 605        to_timestamp: Optional[dt.datetime] = None,
 606        order_by: Optional[str] = None,
 607        tags: Optional[Union[str, Sequence[str]]] = None,
 608    ) -> FetchTracesResponse:
 609        """Fetch a list of traces in the current project matching the given parameters.
 610
 611        Args:
 612            page (Optional[int]): Page number, starts at 1. Defaults to None.
 613            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 614            name (Optional[str]): Filter by name of traces. Defaults to None.
 615            user_id (Optional[str]): Filter by user_id. Defaults to None.
 616            session_id (Optional[str]): Filter by session_id. Defaults to None.
 617            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 618            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 619            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 620            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 621
 622        Returns:
 623            FetchTracesResponse, list of traces on `data` and metadata on `meta`.
 624
 625        Raises:
 626            Exception: If an error occurred during the request.
 627        """
 628        try:
 629            self.log.debug(
 630                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 631            )
 632            res = self.client.trace.list(
 633                page=page,
 634                limit=limit,
 635                name=name,
 636                user_id=user_id,
 637                session_id=session_id,
 638                from_timestamp=from_timestamp,
 639                to_timestamp=to_timestamp,
 640                order_by=order_by,
 641                tags=tags,
 642            )
 643            return FetchTracesResponse(data=res.data, meta=res.meta)
 644        except Exception as e:
 645            handle_fern_exception(e)
 646            raise e
 647
 648    def get_traces(
 649        self,
 650        *,
 651        page: Optional[int] = None,
 652        limit: Optional[int] = None,
 653        user_id: Optional[str] = None,
 654        name: Optional[str] = None,
 655        session_id: Optional[str] = None,
 656        from_timestamp: Optional[dt.datetime] = None,
 657        to_timestamp: Optional[dt.datetime] = None,
 658        order_by: Optional[str] = None,
 659        tags: Optional[Union[str, Sequence[str]]] = None,
 660    ) -> Traces:
 661        """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead.
 662
 663        Args:
 664            page (Optional[int]): Page number, starts at 1. Defaults to None.
 665            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 666            name (Optional[str]): Filter by name of traces. Defaults to None.
 667            user_id (Optional[str]): Filter by user_id. Defaults to None.
 668            session_id (Optional[str]): Filter by session_id. Defaults to None.
 669            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 670            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 671            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 672            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 673
 674        Returns:
 675            List of Traces
 676
 677        Raises:
 678            Exception: If an error occurred during the request.
 679        """
 680        warnings.warn(
 681            "get_traces is deprecated, use fetch_traces instead.",
 682            DeprecationWarning,
 683        )
 684        try:
 685            self.log.debug(
 686                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 687            )
 688            return self.client.trace.list(
 689                page=page,
 690                limit=limit,
 691                name=name,
 692                user_id=user_id,
 693                session_id=session_id,
 694                from_timestamp=from_timestamp,
 695                to_timestamp=to_timestamp,
 696                order_by=order_by,
 697                tags=tags,
 698            )
 699        except Exception as e:
 700            handle_fern_exception(e)
 701            raise e
 702
 703    def fetch_observations(
 704        self,
 705        *,
 706        page: typing.Optional[int] = None,
 707        limit: typing.Optional[int] = None,
 708        name: typing.Optional[str] = None,
 709        user_id: typing.Optional[str] = None,
 710        trace_id: typing.Optional[str] = None,
 711        parent_observation_id: typing.Optional[str] = None,
 712        from_start_time: typing.Optional[dt.datetime] = None,
 713        to_start_time: typing.Optional[dt.datetime] = None,
 714        type: typing.Optional[str] = None,
 715    ) -> FetchObservationsResponse:
 716        """Get a list of observations in the current project matching the given parameters.
 717
 718        Args:
 719            page (Optional[int]): Page number of the observations to return. Defaults to None.
 720            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 721            name (Optional[str]): Name of the observations to return. Defaults to None.
 722            user_id (Optional[str]): User identifier. Defaults to None.
 723            trace_id (Optional[str]): Trace identifier. Defaults to None.
 724            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 725            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 726            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 727            type (Optional[str]): Type of the observation. Defaults to None.
 728
 729        Returns:
 730            FetchObservationsResponse, list of observations on `data` and metadata on `meta`.
 731
 732        Raises:
 733            Exception: If an error occurred during the request.
 734        """
 735        try:
 736            self.log.debug(
 737                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 738            )
 739            res = self.client.observations.get_many(
 740                page=page,
 741                limit=limit,
 742                name=name,
 743                user_id=user_id,
 744                trace_id=trace_id,
 745                parent_observation_id=parent_observation_id,
 746                from_start_time=from_start_time,
 747                to_start_time=to_start_time,
 748                type=type,
 749            )
 750            return FetchObservationsResponse(data=res.data, meta=res.meta)
 751        except Exception as e:
 752            self.log.exception(e)
 753            raise e
 754
 755    def get_observations(
 756        self,
 757        *,
 758        page: typing.Optional[int] = None,
 759        limit: typing.Optional[int] = None,
 760        name: typing.Optional[str] = None,
 761        user_id: typing.Optional[str] = None,
 762        trace_id: typing.Optional[str] = None,
 763        parent_observation_id: typing.Optional[str] = None,
 764        from_start_time: typing.Optional[dt.datetime] = None,
 765        to_start_time: typing.Optional[dt.datetime] = None,
 766        type: typing.Optional[str] = None,
 767    ) -> ObservationsViews:
 768        """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead.
 769
 770        Args:
 771            page (Optional[int]): Page number of the observations to return. Defaults to None.
 772            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 773            name (Optional[str]): Name of the observations to return. Defaults to None.
 774            user_id (Optional[str]): User identifier. Defaults to None.
 775            trace_id (Optional[str]): Trace identifier. Defaults to None.
 776            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 777            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 778            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 779            type (Optional[str]): Type of the observation. Defaults to None.
 780
 781        Returns:
 782            List of ObservationsViews: List of observations in the project matching the given parameters.
 783
 784        Raises:
 785            Exception: If an error occurred during the request.
 786        """
 787        warnings.warn(
 788            "get_observations is deprecated, use fetch_observations instead.",
 789            DeprecationWarning,
 790        )
 791        try:
 792            self.log.debug(
 793                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 794            )
 795            return self.client.observations.get_many(
 796                page=page,
 797                limit=limit,
 798                name=name,
 799                user_id=user_id,
 800                trace_id=trace_id,
 801                parent_observation_id=parent_observation_id,
 802                from_start_time=from_start_time,
 803                to_start_time=to_start_time,
 804                type=type,
 805            )
 806        except Exception as e:
 807            handle_fern_exception(e)
 808            raise e
 809
 810    def get_generations(
 811        self,
 812        *,
 813        page: typing.Optional[int] = None,
 814        limit: typing.Optional[int] = None,
 815        name: typing.Optional[str] = None,
 816        user_id: typing.Optional[str] = None,
 817        trace_id: typing.Optional[str] = None,
 818        from_start_time: typing.Optional[dt.datetime] = None,
 819        to_start_time: typing.Optional[dt.datetime] = None,
 820        parent_observation_id: typing.Optional[str] = None,
 821    ) -> ObservationsViews:
 822        """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead.
 823
 824        Args:
 825            page (Optional[int]): Page number of the generations to return. Defaults to None.
 826            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 827            name (Optional[str]): Name of the generations to return. Defaults to None.
 828            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 829            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 830            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 831            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 832            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 833
 834        Returns:
 835            List of ObservationsViews: List of generations in the project matching the given parameters.
 836
 837        Raises:
 838            Exception: If an error occurred during the request.
 839        """
 840        warnings.warn(
 841            "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.",
 842            DeprecationWarning,
 843        )
 844        return self.get_observations(
 845            page=page,
 846            limit=limit,
 847            name=name,
 848            user_id=user_id,
 849            trace_id=trace_id,
 850            parent_observation_id=parent_observation_id,
 851            from_start_time=from_start_time,
 852            to_start_time=to_start_time,
 853            type="GENERATION",
 854        )
 855
 856    def fetch_observation(
 857        self,
 858        id: str,
 859    ) -> FetchObservationResponse:
 860        """Get an observation in the current project with the given identifier.
 861
 862        Args:
 863            id: The identifier of the observation to fetch.
 864
 865        Returns:
 866            FetchObservationResponse: The observation with the given id on `data`.
 867
 868        Raises:
 869            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 870        """
 871        try:
 872            self.log.debug(f"Getting observation {id}")
 873            observation = self.client.observations.get(id)
 874            return FetchObservationResponse(data=observation)
 875        except Exception as e:
 876            handle_fern_exception(e)
 877            raise e
 878
 879    def get_observation(
 880        self,
 881        id: str,
 882    ) -> Observation:
 883        """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead.
 884
 885        Args:
 886            id: The identifier of the observation to fetch.
 887
 888        Raises:
 889            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 890        """
 891        warnings.warn(
 892            "get_observation is deprecated, use fetch_observation instead.",
 893            DeprecationWarning,
 894        )
 895        try:
 896            self.log.debug(f"Getting observation {id}")
 897            return self.client.observations.get(id)
 898        except Exception as e:
 899            handle_fern_exception(e)
 900            raise e
 901
 902    def fetch_sessions(
 903        self,
 904        *,
 905        page: typing.Optional[int] = None,
 906        limit: typing.Optional[int] = None,
 907        from_timestamp: typing.Optional[dt.datetime] = None,
 908        to_timestamp: typing.Optional[dt.datetime] = None,
 909    ) -> FetchSessionsResponse:
 910        """Get a list of sessions in the current project.
 911
 912        Args:
 913            page (Optional[int]): Page number of the sessions to return. Defaults to None.
 914            limit (Optional[int]): Maximum number of sessions to return. Defaults to None.
 915            from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None.
 916            to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None.
 917
 918        Returns:
 919            FetchSessionsResponse, list of sessions on `data` and metadata on `meta`.
 920
 921        Raises:
 922            Exception: If an error occurred during the request.
 923        """
 924        try:
 925            self.log.debug(
 926                f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}"
 927            )
 928            res = self.client.sessions.list(
 929                page=page,
 930                limit=limit,
 931                from_timestamp=from_timestamp,
 932                to_timestamp=to_timestamp,
 933            )
 934            return FetchSessionsResponse(data=res.data, meta=res.meta)
 935        except Exception as e:
 936            handle_fern_exception(e)
 937            raise e
 938
 939    @overload
 940    def get_prompt(
 941        self,
 942        name: str,
 943        version: Optional[int] = None,
 944        *,
 945        label: Optional[str] = None,
 946        type: Literal["chat"],
 947        cache_ttl_seconds: Optional[int] = None,
 948        fallback: Optional[List[ChatMessageDict]] = None,
 949        max_retries: Optional[int] = None,
 950        fetch_timeout_seconds: Optional[int] = None,
 951    ) -> ChatPromptClient: ...
 952
 953    @overload
 954    def get_prompt(
 955        self,
 956        name: str,
 957        version: Optional[int] = None,
 958        *,
 959        label: Optional[str] = None,
 960        type: Literal["text"] = "text",
 961        cache_ttl_seconds: Optional[int] = None,
 962        fallback: Optional[str] = None,
 963        max_retries: Optional[int] = None,
 964        fetch_timeout_seconds: Optional[int] = None,
 965    ) -> TextPromptClient: ...
 966
 967    def get_prompt(
 968        self,
 969        name: str,
 970        version: Optional[int] = None,
 971        *,
 972        label: Optional[str] = None,
 973        type: Literal["chat", "text"] = "text",
 974        cache_ttl_seconds: Optional[int] = None,
 975        fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None,
 976        max_retries: Optional[int] = None,
 977        fetch_timeout_seconds: Optional[int] = None,
 978    ) -> PromptClient:
 979        """Get a prompt.
 980
 981        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 982        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 983        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 984        return the expired prompt as a fallback.
 985
 986        Args:
 987            name (str): The name of the prompt to retrieve.
 988
 989        Keyword Args:
 990            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 991            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 992            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 993            keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0.
 994            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 995            fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None.
 996            max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds.
 997            fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default.
 998
 999        Returns:
1000            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
1001            - TextPromptClient, if type argument is 'text'.
1002            - ChatPromptClient, if type argument is 'chat'.
1003
1004        Raises:
1005            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
1006            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
1007        """
1008        if version is not None and label is not None:
1009            raise ValueError("Cannot specify both version and label at the same time.")
1010
1011        if not name:
1012            raise ValueError("Prompt name cannot be empty.")
1013
1014        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
1015        bounded_max_retries = self._get_bounded_max_retries(
1016            max_retries, default_max_retries=2, max_retries_upper_bound=4
1017        )
1018
1019        self.log.debug(f"Getting prompt '{cache_key}'")
1020        cached_prompt = self.prompt_cache.get(cache_key)
1021
1022        if cached_prompt is None or cache_ttl_seconds == 0:
1023            self.log.debug(
1024                f"Prompt '{cache_key}' not found in cache or caching disabled."
1025            )
1026            try:
1027                return self._fetch_prompt_and_update_cache(
1028                    name,
1029                    version=version,
1030                    label=label,
1031                    ttl_seconds=cache_ttl_seconds,
1032                    max_retries=bounded_max_retries,
1033                    fetch_timeout_seconds=fetch_timeout_seconds,
1034                )
1035            except Exception as e:
1036                if fallback:
1037                    self.log.warning(
1038                        f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}"
1039                    )
1040
1041                    fallback_client_args = {
1042                        "name": name,
1043                        "prompt": fallback,
1044                        "type": type,
1045                        "version": version or 0,
1046                        "config": {},
1047                        "labels": [label] if label else [],
1048                        "tags": [],
1049                    }
1050
1051                    if type == "text":
1052                        return TextPromptClient(
1053                            prompt=Prompt_Text(**fallback_client_args),
1054                            is_fallback=True,
1055                        )
1056
1057                    if type == "chat":
1058                        return ChatPromptClient(
1059                            prompt=Prompt_Chat(**fallback_client_args),
1060                            is_fallback=True,
1061                        )
1062
1063                raise e
1064
1065        if cached_prompt.is_expired():
1066            self.log.debug(f"Stale prompt '{cache_key}' found in cache.")
1067            try:
1068                # refresh prompt in background thread, refresh_prompt deduplicates tasks
1069                self.log.debug(f"Refreshing prompt '{cache_key}' in background.")
1070                self.prompt_cache.add_refresh_prompt_task(
1071                    cache_key,
1072                    lambda: self._fetch_prompt_and_update_cache(
1073                        name,
1074                        version=version,
1075                        label=label,
1076                        ttl_seconds=cache_ttl_seconds,
1077                        max_retries=bounded_max_retries,
1078                        fetch_timeout_seconds=fetch_timeout_seconds,
1079                    ),
1080                )
1081                self.log.debug(f"Returning stale prompt '{cache_key}' from cache.")
1082                # return stale prompt
1083                return cached_prompt.value
1084
1085            except Exception as e:
1086                self.log.warning(
1087                    f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}"
1088                )
1089                # creation of refresh prompt task failed, return stale prompt
1090                return cached_prompt.value
1091
1092        return cached_prompt.value
1093
1094    def _fetch_prompt_and_update_cache(
1095        self,
1096        name: str,
1097        *,
1098        version: Optional[int] = None,
1099        label: Optional[str] = None,
1100        ttl_seconds: Optional[int] = None,
1101        max_retries: int,
1102        fetch_timeout_seconds,
1103    ) -> PromptClient:
1104        try:
1105            cache_key = PromptCache.generate_cache_key(
1106                name, version=version, label=label
1107            )
1108
1109            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
1110
1111            @backoff.on_exception(
1112                backoff.constant, Exception, max_tries=max_retries, logger=None
1113            )
1114            def fetch_prompts():
1115                return self.client.prompts.get(
1116                    self._url_encode(name),
1117                    version=version,
1118                    label=label,
1119                    request_options={
1120                        "timeout_in_seconds": fetch_timeout_seconds,
1121                    }
1122                    if fetch_timeout_seconds is not None
1123                    else None,
1124                )
1125
1126            prompt_response = fetch_prompts()
1127
1128            if prompt_response.type == "chat":
1129                prompt = ChatPromptClient(prompt_response)
1130            else:
1131                prompt = TextPromptClient(prompt_response)
1132
1133            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
1134
1135            return prompt
1136
1137        except Exception as e:
1138            self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}")
1139            raise e
1140
1141    def _get_bounded_max_retries(
1142        self,
1143        max_retries: Optional[int],
1144        *,
1145        default_max_retries: int = 2,
1146        max_retries_upper_bound: int = 4,
1147    ) -> int:
1148        if max_retries is None:
1149            return default_max_retries
1150
1151        bounded_max_retries = min(
1152            max(max_retries, 0),
1153            max_retries_upper_bound,
1154        )
1155
1156        return bounded_max_retries
1157
1158    @overload
1159    def create_prompt(
1160        self,
1161        *,
1162        name: str,
1163        prompt: List[ChatMessageDict],
1164        is_active: Optional[bool] = None,  # deprecated
1165        labels: List[str] = [],
1166        tags: Optional[List[str]] = None,
1167        type: Optional[Literal["chat"]],
1168        config: Optional[Any] = None,
1169    ) -> ChatPromptClient: ...
1170
1171    @overload
1172    def create_prompt(
1173        self,
1174        *,
1175        name: str,
1176        prompt: str,
1177        is_active: Optional[bool] = None,  # deprecated
1178        labels: List[str] = [],
1179        tags: Optional[List[str]] = None,
1180        type: Optional[Literal["text"]] = "text",
1181        config: Optional[Any] = None,
1182    ) -> TextPromptClient: ...
1183
1184    def create_prompt(
1185        self,
1186        *,
1187        name: str,
1188        prompt: Union[str, List[ChatMessageDict]],
1189        is_active: Optional[bool] = None,  # deprecated
1190        labels: List[str] = [],
1191        tags: Optional[List[str]] = None,
1192        type: Optional[Literal["chat", "text"]] = "text",
1193        config: Optional[Any] = None,
1194    ) -> PromptClient:
1195        """Create a new prompt in Langfuse.
1196
1197        Keyword Args:
1198            name : The name of the prompt to be created.
1199            prompt : The content of the prompt to be created.
1200            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
1201            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
1202            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
1203            config: Additional structured data to be saved with the prompt. Defaults to None.
1204            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
1205
1206        Returns:
1207            TextPromptClient: The prompt if type argument is 'text'.
1208            ChatPromptClient: The prompt if type argument is 'chat'.
1209        """
1210        try:
1211            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
1212
1213            # Handle deprecated is_active flag
1214            if is_active:
1215                self.log.warning(
1216                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
1217                )
1218
1219                labels = labels if "production" in labels else labels + ["production"]
1220
1221            if type == "chat":
1222                if not isinstance(prompt, list):
1223                    raise ValueError(
1224                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
1225                    )
1226                request = CreatePromptRequest_Chat(
1227                    name=name,
1228                    prompt=prompt,
1229                    labels=labels,
1230                    tags=tags,
1231                    config=config or {},
1232                    type="chat",
1233                )
1234                server_prompt = self.client.prompts.create(request=request)
1235
1236                return ChatPromptClient(prompt=server_prompt)
1237
1238            if not isinstance(prompt, str):
1239                raise ValueError("For 'text' type, 'prompt' must be a string.")
1240
1241            request = CreatePromptRequest_Text(
1242                name=name,
1243                prompt=prompt,
1244                labels=labels,
1245                tags=tags,
1246                config=config or {},
1247                type="text",
1248            )
1249
1250            server_prompt = self.client.prompts.create(request=request)
1251            return TextPromptClient(prompt=server_prompt)
1252
1253        except Exception as e:
1254            handle_fern_exception(e)
1255            raise e
1256
1257    def _url_encode(self, url: str) -> str:
1258        return urllib.parse.quote(url)
1259
1260    def trace(
1261        self,
1262        *,
1263        id: typing.Optional[str] = None,
1264        name: typing.Optional[str] = None,
1265        user_id: typing.Optional[str] = None,
1266        session_id: typing.Optional[str] = None,
1267        version: typing.Optional[str] = None,
1268        input: typing.Optional[typing.Any] = None,
1269        output: typing.Optional[typing.Any] = None,
1270        metadata: typing.Optional[typing.Any] = None,
1271        tags: typing.Optional[typing.List[str]] = None,
1272        timestamp: typing.Optional[dt.datetime] = None,
1273        public: typing.Optional[bool] = None,
1274        **kwargs,
1275    ) -> "StatefulTraceClient":
1276        """Create a trace.
1277
1278        Args:
1279            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
1280            name: Identifier of the trace. Useful for sorting/filtering in the UI.
1281            input: The input of the trace. Can be any JSON object.
1282            output: The output of the trace. Can be any JSON object.
1283            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
1284            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
1285            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
1286            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
1287            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
1288            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
1289            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
1290            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
1291            **kwargs: Additional keyword arguments that can be included in the trace.
1292
1293        Returns:
1294            StatefulTraceClient: The created trace.
1295
1296        Example:
1297            ```python
1298            from langfuse import Langfuse
1299
1300            langfuse = Langfuse()
1301
1302            trace = langfuse.trace(
1303                name="example-application",
1304                user_id="user-1234")
1305            )
1306            ```
1307        """
1308        new_id = id or str(uuid.uuid4())
1309        self.trace_id = new_id
1310        try:
1311            new_dict = {
1312                "id": new_id,
1313                "name": name,
1314                "userId": user_id,
1315                "sessionId": session_id
1316                or kwargs.get("sessionId", None),  # backward compatibility
1317                "release": self.release,
1318                "version": version,
1319                "metadata": metadata,
1320                "input": input,
1321                "output": output,
1322                "tags": tags,
1323                "timestamp": timestamp or _get_timestamp(),
1324                "public": public,
1325            }
1326            if kwargs is not None:
1327                new_dict.update(kwargs)
1328
1329            new_body = TraceBody(**new_dict)
1330
1331            self.log.debug(f"Creating trace {new_body}")
1332            event = {
1333                "id": str(uuid.uuid4()),
1334                "type": "trace-create",
1335                "body": new_body.dict(exclude_none=True),
1336            }
1337
1338            self.task_manager.add_task(
1339                event,
1340            )
1341
1342        except Exception as e:
1343            self.log.exception(e)
1344        finally:
1345            self._log_memory_usage()
1346
1347            return StatefulTraceClient(
1348                self.client, new_id, StateType.TRACE, new_id, self.task_manager
1349            )
1350
1351    def _log_memory_usage(self):
1352        try:
1353            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
1354            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
1355            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
1356
1357            if (
1358                not is_malloc_tracing_enabled
1359                or report_interval <= 0
1360                or round(time.monotonic()) % report_interval != 0
1361            ):
1362                return
1363
1364            snapshot = tracemalloc.take_snapshot().statistics("lineno")
1365
1366            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
1367            memory_usage_total_items = [f"{stat}" for stat in snapshot]
1368            memory_usage_langfuse_items = [
1369                stat for stat in memory_usage_total_items if "/langfuse/" in stat
1370            ]
1371
1372            logged_memory_usage = {
1373                "all_files": [f"{stat}" for stat in memory_usage_total_items][
1374                    :top_k_items
1375                ],
1376                "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
1377                    :top_k_items
1378                ],
1379                "total_usage": f"{total_memory_usage:.2f} MB",
1380                "langfuse_queue_length": self.task_manager._queue.qsize(),
1381            }
1382
1383            self.log.debug("Memory usage: ", logged_memory_usage)
1384
1385            event = SdkLogBody(log=logged_memory_usage)
1386            self.task_manager.add_task(
1387                {
1388                    "id": str(uuid.uuid4()),
1389                    "type": "sdk-log",
1390                    "timestamp": _get_timestamp(),
1391                    "body": event.dict(),
1392                }
1393            )
1394
1395        except Exception as e:
1396            self.log.exception(e)
1397
1398    @overload
1399    def score(
1400        self,
1401        *,
1402        name: str,
1403        value: float,
1404        data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
1405        trace_id: typing.Optional[str] = None,
1406        id: typing.Optional[str] = None,
1407        comment: typing.Optional[str] = None,
1408        observation_id: typing.Optional[str] = None,
1409        config_id: typing.Optional[str] = None,
1410        **kwargs,
1411    ) -> "StatefulClient": ...
1412
1413    @overload
1414    def score(
1415        self,
1416        *,
1417        name: str,
1418        value: str,
1419        data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
1420        trace_id: typing.Optional[str] = None,
1421        id: typing.Optional[str] = None,
1422        comment: typing.Optional[str] = None,
1423        observation_id: typing.Optional[str] = None,
1424        config_id: typing.Optional[str] = None,
1425        **kwargs,
1426    ) -> "StatefulClient": ...
1427
1428    def score(
1429        self,
1430        *,
1431        name: str,
1432        value: typing.Union[float, str],
1433        data_type: typing.Optional[ScoreDataType] = None,
1434        trace_id: typing.Optional[str] = None,
1435        id: typing.Optional[str] = None,
1436        comment: typing.Optional[str] = None,
1437        observation_id: typing.Optional[str] = None,
1438        config_id: typing.Optional[str] = None,
1439        **kwargs,
1440    ) -> "StatefulClient":
1441        """Create a score attached to a trace (and optionally an observation).
1442
1443        Args:
1444            name (str): Identifier of the score.
1445            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
1446            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
1447              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
1448            trace_id (str): The id of the trace to which the score should be attached.
1449            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
1450            comment (Optional[str]): Additional context/explanation of the score.
1451            observation_id (Optional[str]): The id of the observation to which the score should be attached.
1452            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
1453            **kwargs: Additional keyword arguments to include in the score.
1454
1455        Returns:
1456            StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided).
1457
1458        Example:
1459            ```python
1460            from langfuse import Langfuse
1461
1462            langfuse = Langfuse()
1463
1464            # Create a trace
1465            trace = langfuse.trace(name="example-application")
1466
1467            # Get id of created trace
1468            trace_id = trace.id
1469
1470            # Add score to the trace
1471            trace = langfuse.score(
1472                trace_id=trace_id,
1473                name="user-explicit-feedback",
1474                value=0.9,
1475                comment="I like how personalized the response is"
1476            )
1477            ```
1478        """
1479        trace_id = trace_id or self.trace_id or str(uuid.uuid4())
1480        new_id = id or str(uuid.uuid4())
1481        try:
1482            new_dict = {
1483                "id": new_id,
1484                "trace_id": trace_id,
1485                "observation_id": observation_id,
1486                "name": name,
1487                "value": value,
1488                "data_type": data_type,
1489                "comment": comment,
1490                "config_id": config_id,
1491                **kwargs,
1492            }
1493
1494            self.log.debug(f"Creating score {new_dict}...")
1495            new_body = ScoreBody(**new_dict)
1496
1497            event = {
1498                "id": str(uuid.uuid4()),
1499                "type": "score-create",
1500                "body": new_body.dict(exclude_none=True),
1501            }
1502            self.task_manager.add_task(event)
1503
1504        except Exception as e:
1505            self.log.exception(e)
1506        finally:
1507            if observation_id is not None:
1508                return StatefulClient(
1509                    self.client,
1510                    observation_id,
1511                    StateType.OBSERVATION,
1512                    trace_id,
1513                    self.task_manager,
1514                )
1515            else:
1516                return StatefulClient(
1517                    self.client, new_id, StateType.TRACE, new_id, self.task_manager
1518                )
1519
1520    def span(
1521        self,
1522        *,
1523        id: typing.Optional[str] = None,
1524        trace_id: typing.Optional[str] = None,
1525        parent_observation_id: typing.Optional[str] = None,
1526        name: typing.Optional[str] = None,
1527        start_time: typing.Optional[dt.datetime] = None,
1528        end_time: typing.Optional[dt.datetime] = None,
1529        metadata: typing.Optional[typing.Any] = None,
1530        level: typing.Optional[SpanLevel] = None,
1531        status_message: typing.Optional[str] = None,
1532        input: typing.Optional[typing.Any] = None,
1533        output: typing.Optional[typing.Any] = None,
1534        version: typing.Optional[str] = None,
1535        **kwargs,
1536    ) -> "StatefulSpanClient":
1537        """Create a span.
1538
1539        A span represents durations of units of work in a trace.
1540        Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1541
1542        If no trace_id is provided, a new trace is created just for this span.
1543
1544        Args:
1545            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
1546            trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated.
1547            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1548            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
1549            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
1550            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
1551            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
1552            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1553            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
1554            input (Optional[dict]): The input to the span. Can be any JSON object.
1555            output (Optional[dict]): The output to the span. Can be any JSON object.
1556            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1557            **kwargs: Additional keyword arguments to include in the span.
1558
1559        Returns:
1560            StatefulSpanClient: The created span.
1561
1562        Example:
1563            ```python
1564            from langfuse import Langfuse
1565
1566            langfuse = Langfuse()
1567
1568            trace = langfuse.trace(name = "llm-feature")
1569
1570            # Create a span
1571            retrieval = langfuse.span(name = "retrieval", trace_id = trace.id)
1572
1573            # Create a nested span
1574            nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id)
1575            ```
1576        """
1577        new_span_id = id or str(uuid.uuid4())
1578        new_trace_id = trace_id or str(uuid.uuid4())
1579        self.trace_id = new_trace_id
1580        try:
1581            span_body = {
1582                "id": new_span_id,
1583                "trace_id": new_trace_id,
1584                "name": name,
1585                "start_time": start_time or _get_timestamp(),
1586                "metadata": metadata,
1587                "input": input,
1588                "output": output,
1589                "level": level,
1590                "status_message": status_message,
1591                "parent_observation_id": parent_observation_id,
1592                "version": version,
1593                "end_time": end_time,
1594                "trace": {"release": self.release},
1595                **kwargs,
1596            }
1597
1598            if trace_id is None:
1599                self._generate_trace(new_trace_id, name or new_trace_id)
1600
1601            self.log.debug(f"Creating span {span_body}...")
1602
1603            span_body = CreateSpanBody(**span_body)
1604
1605            event = {
1606                "id": str(uuid.uuid4()),
1607                "type": "span-create",
1608                "body": span_body.dict(exclude_none=True),
1609            }
1610
1611            self.log.debug(f"Creating span {event}...")
1612            self.task_manager.add_task(event)
1613
1614        except Exception as e:
1615            self.log.exception(e)
1616        finally:
1617            self._log_memory_usage()
1618
1619            return StatefulSpanClient(
1620                self.client,
1621                new_span_id,
1622                StateType.OBSERVATION,
1623                new_trace_id,
1624                self.task_manager,
1625            )
1626
1627    def event(
1628        self,
1629        *,
1630        id: typing.Optional[str] = None,
1631        trace_id: typing.Optional[str] = None,
1632        parent_observation_id: typing.Optional[str] = None,
1633        name: typing.Optional[str] = None,
1634        start_time: typing.Optional[dt.datetime] = None,
1635        metadata: typing.Optional[typing.Any] = None,
1636        input: typing.Optional[typing.Any] = None,
1637        output: typing.Optional[typing.Any] = None,
1638        level: typing.Optional[SpanLevel] = None,
1639        status_message: typing.Optional[str] = None,
1640        version: typing.Optional[str] = None,
1641        **kwargs,
1642    ) -> "StatefulSpanClient":
1643        """Create an event.
1644
1645        An event represents a discrete event in a trace.
1646        Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1647
1648        If no trace_id is provided, a new trace is created just for this event.
1649
1650        Args:
1651            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
1652            trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event.
1653            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1654            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
1655            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
1656            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
1657            input (Optional[Any]): The input to the event. Can be any JSON object.
1658            output (Optional[Any]): The output to the event. Can be any JSON object.
1659            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1660            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
1661            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
1662            **kwargs: Additional keyword arguments to include in the event.
1663
1664        Returns:
1665            StatefulSpanClient: The created event.
1666
1667        Example:
1668            ```python
1669            from langfuse import Langfuse
1670
1671            langfuse = Langfuse()
1672
1673            trace = langfuse.trace(name = "llm-feature")
1674
1675            # Create an event
1676            retrieval = langfuse.event(name = "retrieval", trace_id = trace.id)
1677            ```
1678        """
1679        event_id = id or str(uuid.uuid4())
1680        new_trace_id = trace_id or str(uuid.uuid4())
1681        self.trace_id = new_trace_id
1682        try:
1683            event_body = {
1684                "id": event_id,
1685                "trace_id": new_trace_id,
1686                "name": name,
1687                "start_time": start_time or _get_timestamp(),
1688                "metadata": metadata,
1689                "input": input,
1690                "output": output,
1691                "level": level,
1692                "status_message": status_message,
1693                "parent_observation_id": parent_observation_id,
1694                "version": version,
1695                "trace": {"release": self.release},
1696                **kwargs,
1697            }
1698
1699            if trace_id is None:
1700                self._generate_trace(new_trace_id, name or new_trace_id)
1701
1702            request = CreateEventBody(**event_body)
1703
1704            event = {
1705                "id": str(uuid.uuid4()),
1706                "type": "event-create",
1707                "body": request.dict(exclude_none=True),
1708            }
1709
1710            self.log.debug(f"Creating event {event}...")
1711            self.task_manager.add_task(event)
1712
1713        except Exception as e:
1714            self.log.exception(e)
1715        finally:
1716            return StatefulSpanClient(
1717                self.client,
1718                event_id,
1719                StateType.OBSERVATION,
1720                new_trace_id,
1721                self.task_manager,
1722            )
1723
1724    def generation(
1725        self,
1726        *,
1727        id: typing.Optional[str] = None,
1728        trace_id: typing.Optional[str] = None,
1729        parent_observation_id: typing.Optional[str] = None,
1730        name: typing.Optional[str] = None,
1731        start_time: typing.Optional[dt.datetime] = None,
1732        end_time: typing.Optional[dt.datetime] = None,
1733        completion_start_time: typing.Optional[dt.datetime] = None,
1734        metadata: typing.Optional[typing.Any] = None,
1735        level: typing.Optional[SpanLevel] = None,
1736        status_message: typing.Optional[str] = None,
1737        version: typing.Optional[str] = None,
1738        model: typing.Optional[str] = None,
1739        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
1740        input: typing.Optional[typing.Any] = None,
1741        output: typing.Optional[typing.Any] = None,
1742        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
1743        prompt: typing.Optional[PromptClient] = None,
1744        **kwargs,
1745    ) -> "StatefulGenerationClient":
1746        """Create a generation.
1747
1748        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
1749
1750        Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
1751
1752        If no trace_id is provided, a new trace is created just for this generation.
1753
1754        Args:
1755            id (Optional[str]): The id of the generation can be set, defaults to random id.
1756            trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created
1757            parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
1758            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
1759            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
1760            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
1761            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
1762            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
1763            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
1764            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
1765            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
1766            model (Optional[str]): The name of the model used for the generation.
1767            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
1768            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
1769            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
1770            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
1771            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
1772            **kwargs: Additional keyword arguments to include in the generation.
1773
1774        Returns:
1775            StatefulGenerationClient: The created generation.
1776
1777        Example:
1778            ```python
1779            from langfuse import Langfuse
1780
1781            langfuse = Langfuse()
1782
1783            # Create a generation in Langfuse
1784            generation = langfuse.generation(
1785                name="summary-generation",
1786                model="gpt-3.5-turbo",
1787                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
1788                input=[{"role": "system", "content": "You are a helpful assistant."},
1789                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
1790                metadata={"interface": "whatsapp"}
1791            )
1792            ```
1793        """
1794        new_trace_id = trace_id or str(uuid.uuid4())
1795        new_generation_id = id or str(uuid.uuid4())
1796        self.trace_id = new_trace_id
1797        try:
1798            generation_body = {
1799                "id": new_generation_id,
1800                "trace_id": new_trace_id,
1801                "release": self.release,
1802                "name": name,
1803                "start_time": start_time or _get_timestamp(),
1804                "metadata": metadata,
1805                "input": input,
1806                "output": output,
1807                "level": level,
1808                "status_message": status_message,
1809                "parent_observation_id": parent_observation_id,
1810                "version": version,
1811                "end_time": end_time,
1812                "completion_start_time": completion_start_time,
1813                "model": model,
1814                "model_parameters": model_parameters,
1815                "usage": _convert_usage_input(usage) if usage is not None else None,
1816                "trace": {"release": self.release},
1817                **_create_prompt_context(prompt),
1818                **kwargs,
1819            }
1820
1821            if trace_id is None:
1822                trace = {
1823                    "id": new_trace_id,
1824                    "release": self.release,
1825                    "name": name,
1826                }
1827                request = TraceBody(**trace)
1828
1829                event = {
1830                    "id": str(uuid.uuid4()),
1831                    "type": "trace-create",
1832                    "body": request.dict(exclude_none=True),
1833                }
1834
1835                self.log.debug(f"Creating trace {event}...")
1836
1837                self.task_manager.add_task(event)
1838
1839            self.log.debug(f"Creating generation max {generation_body} {usage}...")
1840            request = CreateGenerationBody(**generation_body)
1841
1842            event = {
1843                "id": str(uuid.uuid4()),
1844                "type": "generation-create",
1845                "body": request.dict(exclude_none=True),
1846            }
1847
1848            self.log.debug(f"Creating top-level generation {event} ...")
1849            self.task_manager.add_task(event)
1850
1851        except Exception as e:
1852            self.log.exception(e)
1853        finally:
1854            return StatefulGenerationClient(
1855                self.client,
1856                new_generation_id,
1857                StateType.OBSERVATION,
1858                new_trace_id,
1859                self.task_manager,
1860            )
1861
1862    def _generate_trace(self, trace_id: str, name: str):
1863        trace_dict = {
1864            "id": trace_id,
1865            "release": self.release,
1866            "name": name,
1867        }
1868
1869        trace_body = TraceBody(**trace_dict)
1870
1871        event = {
1872            "id": str(uuid.uuid4()),
1873            "type": "trace-create",
1874            "body": trace_body.dict(exclude_none=True),
1875        }
1876
1877        self.log.debug(f"Creating trace {event}...")
1878        self.task_manager.add_task(event)
1879
1880    def join(self):
1881        """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter.
1882
1883        If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes.
1884        To guarantee all messages have been delivered, you still need to call flush().
1885        """
1886        try:
1887            return self.task_manager.join()
1888        except Exception as e:
1889            self.log.exception(e)
1890
1891    def flush(self):
1892        """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down.
1893
1894        Example:
1895            ```python
1896            from langfuse import Langfuse
1897
1898            langfuse = Langfuse()
1899
1900            # Some operations with Langfuse
1901
1902            # Flushing all events to end Langfuse cleanly
1903            langfuse.flush()
1904            ```
1905        """
1906        try:
1907            return self.task_manager.flush()
1908        except Exception as e:
1909            self.log.exception(e)
1910
1911    def shutdown(self):
1912        """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated.
1913
1914        This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API.
1915        As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
1916        """
1917        try:
1918            return self.task_manager.shutdown()
1919        except Exception as e:
1920            self.log.exception(e)
1921
1922
1923class StateType(Enum):
1924    """Enum to distinguish observation and trace states.
1925
1926    Attributes:
1927        OBSERVATION (int): Observation state.
1928        TRACE (int): Trace state.
1929    """
1930
1931    OBSERVATION = 1
1932    TRACE = 0
1933
1934
1935class StatefulClient(object):
1936    """Base class for handling stateful operations in the Langfuse system.
1937
1938    This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events,
1939    associating them with either an observation or a trace based on the specified state type.
1940
1941    Attributes:
1942        client (FernLangfuse): Core interface for Langfuse API interactions.
1943        id (str): Unique identifier of the stateful client (either observation or trace).
1944        state_type (StateType): Enum indicating whether the client is an observation or a trace.
1945        trace_id (str): Id of the trace associated with the stateful client.
1946        task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1947    """
1948
1949    log = logging.getLogger("langfuse")
1950
1951    def __init__(
1952        self,
1953        client: FernLangfuse,
1954        id: str,
1955        state_type: StateType,
1956        trace_id: str,
1957        task_manager: TaskManager,
1958    ):
1959        """Initialize the StatefulClient.
1960
1961        Args:
1962            client (FernLangfuse): Core interface for Langfuse API interactions.
1963            id (str): Unique identifier of the stateful client (either observation or trace).
1964            state_type (StateType): Enum indicating whether the client is an observation or a trace.
1965            trace_id (str): Id of the trace associated with the stateful client.
1966            task_manager (TaskManager): Manager handling asynchronous tasks for the client.
1967        """
1968        self.client = client
1969        self.trace_id = trace_id
1970        self.id = id
1971        self.state_type = state_type
1972        self.task_manager = task_manager
1973
1974    def _add_state_to_event(self, body: dict):
1975        if self.state_type == StateType.OBSERVATION:
1976            body["parent_observation_id"] = self.id
1977            body["trace_id"] = self.trace_id
1978        else:
1979            body["trace_id"] = self.id
1980        return body
1981
1982    def _add_default_values(self, body: dict):
1983        if body.get("start_time") is None:
1984            body["start_time"] = _get_timestamp()
1985        return body
1986
1987    def generation(
1988        self,
1989        *,
1990        id: typing.Optional[str] = None,
1991        name: typing.Optional[str] = None,
1992        start_time: typing.Optional[dt.datetime] = None,
1993        end_time: typing.Optional[dt.datetime] = None,
1994        metadata: typing.Optional[typing.Any] = None,
1995        level: typing.Optional[SpanLevel] = None,
1996        status_message: typing.Optional[str] = None,
1997        version: typing.Optional[str] = None,
1998        completion_start_time: typing.Optional[dt.datetime] = None,
1999        model: typing.Optional[str] = None,
2000        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2001        input: typing.Optional[typing.Any] = None,
2002        output: typing.Optional[typing.Any] = None,
2003        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2004        prompt: typing.Optional[PromptClient] = None,
2005        **kwargs,
2006    ) -> "StatefulGenerationClient":
2007        """Create a generation nested within the current observation or trace.
2008
2009        A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
2010
2011        Args:
2012            id (Optional[str]): The id of the generation can be set, defaults to random id.
2013            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2014            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2015            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
2016            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2017            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2018            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2019            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2020            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2021            model (Optional[str]): The name of the model used for the generation.
2022            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2023            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2024            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2025            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2026            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2027            **kwargs: Additional keyword arguments to include in the generation.
2028
2029        Returns:
2030            StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations.
2031
2032        Example:
2033            ```python
2034            from langfuse import Langfuse
2035
2036            langfuse = Langfuse()
2037
2038            # Create a trace
2039            trace = langfuse.trace(name = "llm-feature")
2040
2041            # Create a nested generation in Langfuse
2042            generation = trace.generation(
2043                name="summary-generation",
2044                model="gpt-3.5-turbo",
2045                model_parameters={"maxTokens": "1000", "temperature": "0.9"},
2046                input=[{"role": "system", "content": "You are a helpful assistant."},
2047                       {"role": "user", "content": "Please generate a summary of the following documents ..."}],
2048                metadata={"interface": "whatsapp"}
2049            )
2050            ```
2051        """
2052        generation_id = id or str(uuid.uuid4())
2053        try:
2054            generation_body = {
2055                "id": generation_id,
2056                "name": name,
2057                "start_time": start_time or _get_timestamp(),
2058                "metadata": metadata,
2059                "level": level,
2060                "status_message": status_message,
2061                "version": version,
2062                "end_time": end_time,
2063                "completion_start_time": completion_start_time,
2064                "model": model,
2065                "model_parameters": model_parameters,
2066                "input": input,
2067                "output": output,
2068                "usage": _convert_usage_input(usage) if usage is not None else None,
2069                **_create_prompt_context(prompt),
2070                **kwargs,
2071            }
2072
2073            generation_body = self._add_state_to_event(generation_body)
2074            new_body = self._add_default_values(generation_body)
2075
2076            new_body = CreateGenerationBody(**new_body)
2077
2078            event = {
2079                "id": str(uuid.uuid4()),
2080                "type": "generation-create",
2081                "body": new_body.dict(exclude_none=True, exclude_unset=False),
2082            }
2083
2084            self.log.debug(f"Creating generation {new_body}...")
2085            self.task_manager.add_task(event)
2086
2087        except Exception as e:
2088            self.log.exception(e)
2089        finally:
2090            return StatefulGenerationClient(
2091                self.client,
2092                generation_id,
2093                StateType.OBSERVATION,
2094                self.trace_id,
2095                task_manager=self.task_manager,
2096            )
2097
2098    def span(
2099        self,
2100        *,
2101        id: typing.Optional[str] = None,
2102        name: typing.Optional[str] = None,
2103        start_time: typing.Optional[dt.datetime] = None,
2104        end_time: typing.Optional[dt.datetime] = None,
2105        metadata: typing.Optional[typing.Any] = None,
2106        input: typing.Optional[typing.Any] = None,
2107        output: typing.Optional[typing.Any] = None,
2108        level: typing.Optional[SpanLevel] = None,
2109        status_message: typing.Optional[str] = None,
2110        version: typing.Optional[str] = None,
2111        **kwargs,
2112    ) -> "StatefulSpanClient":
2113        """Create a span nested within the current observation or trace.
2114
2115        A span represents durations of units of work in a trace.
2116
2117        Args:
2118            id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
2119            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2120            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2121            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2122            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2123            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2124            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2125            input (Optional[dict]): The input to the span. Can be any JSON object.
2126            output (Optional[dict]): The output to the span. Can be any JSON object.
2127            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2128            **kwargs: Additional keyword arguments to include in the span.
2129
2130        Returns:
2131            StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations.
2132
2133        Example:
2134            ```python
2135            from langfuse import Langfuse
2136
2137            langfuse = Langfuse()
2138
2139            # Create a trace
2140            trace = langfuse.trace(name = "llm-feature")
2141
2142            # Create a span
2143            retrieval = langfuse.span(name = "retrieval")
2144            ```
2145        """
2146        span_id = id or str(uuid.uuid4())
2147        try:
2148            span_body = {
2149                "id": span_id,
2150                "name": name,
2151                "start_time": start_time or _get_timestamp(),
2152                "metadata": metadata,
2153                "input": input,
2154                "output": output,
2155                "level": level,
2156                "status_message": status_message,
2157                "version": version,
2158                "end_time": end_time,
2159                **kwargs,
2160            }
2161
2162            self.log.debug(f"Creating span {span_body}...")
2163
2164            new_dict = self._add_state_to_event(span_body)
2165            new_body = self._add_default_values(new_dict)
2166
2167            event = CreateSpanBody(**new_body)
2168
2169            event = {
2170                "id": str(uuid.uuid4()),
2171                "type": "span-create",
2172                "body": event.dict(exclude_none=True),
2173            }
2174
2175            self.task_manager.add_task(event)
2176        except Exception as e:
2177            self.log.exception(e)
2178        finally:
2179            return StatefulSpanClient(
2180                self.client,
2181                span_id,
2182                StateType.OBSERVATION,
2183                self.trace_id,
2184                task_manager=self.task_manager,
2185            )
2186
2187    @overload
2188    def score(
2189        self,
2190        *,
2191        id: typing.Optional[str] = None,
2192        name: str,
2193        value: float,
2194        data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
2195        comment: typing.Optional[str] = None,
2196        config_id: typing.Optional[str] = None,
2197        **kwargs,
2198    ) -> "StatefulClient": ...
2199
2200    @overload
2201    def score(
2202        self,
2203        *,
2204        id: typing.Optional[str] = None,
2205        name: str,
2206        value: str,
2207        data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
2208        comment: typing.Optional[str] = None,
2209        config_id: typing.Optional[str] = None,
2210        **kwargs,
2211    ) -> "StatefulClient": ...
2212
2213    def score(
2214        self,
2215        *,
2216        id: typing.Optional[str] = None,
2217        name: str,
2218        value: typing.Union[float, str],
2219        data_type: typing.Optional[ScoreDataType] = None,
2220        comment: typing.Optional[str] = None,
2221        config_id: typing.Optional[str] = None,
2222        **kwargs,
2223    ) -> "StatefulClient":
2224        """Create a score attached for the current observation or trace.
2225
2226        Args:
2227            name (str): Identifier of the score.
2228            value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
2229            data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
2230              When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
2231            comment (Optional[str]): Additional context/explanation of the score.
2232            id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
2233            config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
2234            **kwargs: Additional keyword arguments to include in the score.
2235
2236        Returns:
2237            StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining.
2238
2239        Example:
2240            ```python
2241            from langfuse import Langfuse
2242
2243            langfuse = Langfuse()
2244
2245            # Create a trace
2246            trace = langfuse.trace(name="example-application")
2247
2248            # Add score to the trace
2249            trace = trace.score(
2250                name="user-explicit-feedback",
2251                value=0.8,
2252                comment="I like how personalized the response is"
2253            )
2254            ```
2255        """
2256        score_id = id or str(uuid.uuid4())
2257        try:
2258            new_score = {
2259                "id": score_id,
2260                "trace_id": self.trace_id,
2261                "name": name,
2262                "value": value,
2263                "data_type": data_type,
2264                "comment": comment,
2265                "config_id": config_id,
2266                **kwargs,
2267            }
2268
2269            self.log.debug(f"Creating score {new_score}...")
2270
2271            new_dict = self._add_state_to_event(new_score)
2272
2273            if self.state_type == StateType.OBSERVATION:
2274                new_dict["observationId"] = self.id
2275
2276            request = ScoreBody(**new_dict)
2277
2278            event = {
2279                "id": str(uuid.uuid4()),
2280                "type": "score-create",
2281                "body": request.dict(exclude_none=True),
2282            }
2283
2284            self.task_manager.add_task(event)
2285
2286        except Exception as e:
2287            self.log.exception(e)
2288        finally:
2289            return StatefulClient(
2290                self.client,
2291                self.id,
2292                self.state_type,
2293                self.trace_id,
2294                task_manager=self.task_manager,
2295            )
2296
2297    def event(
2298        self,
2299        *,
2300        id: typing.Optional[str] = None,
2301        name: typing.Optional[str] = None,
2302        start_time: typing.Optional[dt.datetime] = None,
2303        metadata: typing.Optional[typing.Any] = None,
2304        input: typing.Optional[typing.Any] = None,
2305        output: typing.Optional[typing.Any] = None,
2306        level: typing.Optional[SpanLevel] = None,
2307        status_message: typing.Optional[str] = None,
2308        version: typing.Optional[str] = None,
2309        **kwargs,
2310    ) -> "StatefulClient":
2311        """Create an event nested within the current observation or trace.
2312
2313        An event represents a discrete event in a trace.
2314
2315        Args:
2316            id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
2317            name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
2318            start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
2319            metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
2320            input (Optional[Any]): The input to the event. Can be any JSON object.
2321            output (Optional[Any]): The output to the event. Can be any JSON object.
2322            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2323            status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
2324            version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
2325            **kwargs: Additional keyword arguments to include in the event.
2326
2327        Returns:
2328            StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations.
2329
2330        Example:
2331            ```python
2332            from langfuse import Langfuse
2333
2334            langfuse = Langfuse()
2335
2336            # Create a trace
2337            trace = langfuse.trace(name = "llm-feature")
2338
2339            # Create an event
2340            retrieval = trace.event(name = "retrieval")
2341            ```
2342        """
2343        event_id = id or str(uuid.uuid4())
2344        try:
2345            event_body = {
2346                "id": event_id,
2347                "name": name,
2348                "start_time": start_time or _get_timestamp(),
2349                "metadata": metadata,
2350                "input": input,
2351                "output": output,
2352                "level": level,
2353                "status_message": status_message,
2354                "version": version,
2355                **kwargs,
2356            }
2357
2358            new_dict = self._add_state_to_event(event_body)
2359            new_body = self._add_default_values(new_dict)
2360
2361            request = CreateEventBody(**new_body)
2362
2363            event = {
2364                "id": str(uuid.uuid4()),
2365                "type": "event-create",
2366                "body": request.dict(exclude_none=True),
2367            }
2368
2369            self.log.debug(f"Creating event {event}...")
2370            self.task_manager.add_task(event)
2371
2372        except Exception as e:
2373            self.log.exception(e)
2374        finally:
2375            return StatefulClient(
2376                self.client,
2377                event_id,
2378                StateType.OBSERVATION,
2379                self.trace_id,
2380                self.task_manager,
2381            )
2382
2383    def get_trace_url(self):
2384        """Get the URL to see the current trace in the Langfuse UI."""
2385        return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}"
2386
2387
2388class StatefulGenerationClient(StatefulClient):
2389    """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient.
2390
2391    This client extends the capabilities of the StatefulClient to specifically handle generation,
2392    allowing for the creation, update, and termination of generation processes in Langfuse.
2393
2394    Attributes:
2395        client (FernLangfuse): Core interface for Langfuse API interaction.
2396        id (str): Unique identifier of the generation.
2397        state_type (StateType): Type of the stateful entity (observation or trace).
2398        trace_id (str): Id of trace associated with the generation.
2399        task_manager (TaskManager): Manager for handling asynchronous tasks.
2400    """
2401
2402    log = logging.getLogger("langfuse")
2403
2404    def __init__(
2405        self,
2406        client: FernLangfuse,
2407        id: str,
2408        state_type: StateType,
2409        trace_id: str,
2410        task_manager: TaskManager,
2411    ):
2412        """Initialize the StatefulGenerationClient."""
2413        super().__init__(client, id, state_type, trace_id, task_manager)
2414
2415    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
2416    def update(
2417        self,
2418        *,
2419        name: typing.Optional[str] = None,
2420        start_time: typing.Optional[dt.datetime] = None,
2421        end_time: typing.Optional[dt.datetime] = None,
2422        completion_start_time: typing.Optional[dt.datetime] = None,
2423        metadata: typing.Optional[typing.Any] = None,
2424        level: typing.Optional[SpanLevel] = None,
2425        status_message: typing.Optional[str] = None,
2426        version: typing.Optional[str] = None,
2427        model: typing.Optional[str] = None,
2428        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2429        input: typing.Optional[typing.Any] = None,
2430        output: typing.Optional[typing.Any] = None,
2431        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2432        prompt: typing.Optional[PromptClient] = None,
2433        **kwargs,
2434    ) -> "StatefulGenerationClient":
2435        """Update the generation.
2436
2437        Args:
2438            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2439            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2440            end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
2441            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2442            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2443            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2444            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2445            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2446            model (Optional[str]): The name of the model used for the generation.
2447            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2448            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2449            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2450            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2451            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2452            **kwargs: Additional keyword arguments to include in the generation.
2453
2454        Returns:
2455            StatefulGenerationClient: The updated generation. Passthrough for chaining.
2456
2457        Example:
2458            ```python
2459            from langfuse import Langfuse
2460
2461            langfuse = Langfuse()
2462
2463            # Create a trace
2464            trace = langfuse.trace(name = "llm-feature")
2465
2466            # Create a nested generation in Langfuse
2467            generation = trace.generation(name="summary-generation")
2468
2469            # Update the generation
2470            generation = generation.update(metadata={"interface": "whatsapp"})
2471            ```
2472        """
2473        try:
2474            generation_body = {
2475                "id": self.id,
2476                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
2477                "name": name,
2478                "start_time": start_time,
2479                "metadata": metadata,
2480                "level": level,
2481                "status_message": status_message,
2482                "version": version,
2483                "end_time": end_time,
2484                "completion_start_time": completion_start_time,
2485                "model": model,
2486                "model_parameters": model_parameters,
2487                "input": input,
2488                "output": output,
2489                "usage": _convert_usage_input(usage) if usage is not None else None,
2490                **_create_prompt_context(prompt),
2491                **kwargs,
2492            }
2493
2494            self.log.debug(f"Update generation {generation_body}...")
2495
2496            request = UpdateGenerationBody(**generation_body)
2497
2498            event = {
2499                "id": str(uuid.uuid4()),
2500                "type": "generation-update",
2501                "body": request.dict(exclude_none=True, exclude_unset=False),
2502            }
2503
2504            self.log.debug(f"Update generation {event}...")
2505            self.task_manager.add_task(event)
2506
2507        except Exception as e:
2508            self.log.exception(e)
2509        finally:
2510            return StatefulGenerationClient(
2511                self.client,
2512                self.id,
2513                StateType.OBSERVATION,
2514                self.trace_id,
2515                task_manager=self.task_manager,
2516            )
2517
2518    def end(
2519        self,
2520        *,
2521        name: typing.Optional[str] = None,
2522        start_time: typing.Optional[dt.datetime] = None,
2523        end_time: typing.Optional[dt.datetime] = None,
2524        completion_start_time: typing.Optional[dt.datetime] = None,
2525        metadata: typing.Optional[typing.Any] = None,
2526        level: typing.Optional[SpanLevel] = None,
2527        status_message: typing.Optional[str] = None,
2528        version: typing.Optional[str] = None,
2529        model: typing.Optional[str] = None,
2530        model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
2531        input: typing.Optional[typing.Any] = None,
2532        output: typing.Optional[typing.Any] = None,
2533        usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
2534        prompt: typing.Optional[PromptClient] = None,
2535        **kwargs,
2536    ) -> "StatefulGenerationClient":
2537        """End the generation, optionally updating its properties.
2538
2539        Args:
2540            name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
2541            start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
2542            end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time.
2543            completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
2544            metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
2545            level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2546            status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
2547            version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2548            model (Optional[str]): The name of the model used for the generation.
2549            model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
2550            input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
2551            output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
2552            usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
2553            prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
2554            **kwargs: Additional keyword arguments to include in the generation.
2555
2556        Returns:
2557            StatefulGenerationClient: The ended generation. Passthrough for chaining.
2558
2559        Example:
2560            ```python
2561            from langfuse import Langfuse
2562
2563            langfuse = Langfuse()
2564
2565            # Create a trace
2566            trace = langfuse.trace(name = "llm-feature")
2567
2568            # Create a nested generation in Langfuse
2569            generation = trace.generation(name="summary-generation")
2570
2571            # End the generation and update its properties
2572            generation = generation.end(metadata={"interface": "whatsapp"})
2573            ```
2574        """
2575        return self.update(
2576            name=name,
2577            start_time=start_time,
2578            end_time=end_time or _get_timestamp(),
2579            metadata=metadata,
2580            level=level,
2581            status_message=status_message,
2582            version=version,
2583            completion_start_time=completion_start_time,
2584            model=model,
2585            model_parameters=model_parameters,
2586            input=input,
2587            output=output,
2588            usage=usage,
2589            prompt=prompt,
2590            **kwargs,
2591        )
2592
2593
2594class StatefulSpanClient(StatefulClient):
2595    """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient.
2596
2597    Attributes:
2598        client (FernLangfuse): Core interface for Langfuse API interaction.
2599        id (str): Unique identifier of the span.
2600        state_type (StateType): Type of the stateful entity (observation or trace).
2601        trace_id (str): Id of trace associated with the span.
2602        task_manager (TaskManager): Manager for handling asynchronous tasks.
2603    """
2604
2605    log = logging.getLogger("langfuse")
2606
2607    def __init__(
2608        self,
2609        client: FernLangfuse,
2610        id: str,
2611        state_type: StateType,
2612        trace_id: str,
2613        task_manager: TaskManager,
2614    ):
2615        """Initialize the StatefulSpanClient."""
2616        super().__init__(client, id, state_type, trace_id, task_manager)
2617
2618    # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
2619    def update(
2620        self,
2621        *,
2622        name: typing.Optional[str] = None,
2623        start_time: typing.Optional[dt.datetime] = None,
2624        end_time: typing.Optional[dt.datetime] = None,
2625        metadata: typing.Optional[typing.Any] = None,
2626        input: typing.Optional[typing.Any] = None,
2627        output: typing.Optional[typing.Any] = None,
2628        level: typing.Optional[SpanLevel] = None,
2629        status_message: typing.Optional[str] = None,
2630        version: typing.Optional[str] = None,
2631        **kwargs,
2632    ) -> "StatefulSpanClient":
2633        """Update the span.
2634
2635        Args:
2636            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2637            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2638            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2639            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2640            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2641            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2642            input (Optional[dict]): The input to the span. Can be any JSON object.
2643            output (Optional[dict]): The output to the span. Can be any JSON object.
2644            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2645            **kwargs: Additional keyword arguments to include in the span.
2646
2647        Returns:
2648            StatefulSpanClient: The updated span. Passthrough for chaining.
2649
2650        Example:
2651            ```python
2652            from langfuse import Langfuse
2653
2654            langfuse = Langfuse()
2655
2656            # Create a trace
2657            trace = langfuse.trace(name = "llm-feature")
2658
2659            # Create a nested span in Langfuse
2660            span = trace.span(name="retrieval")
2661
2662            # Update the span
2663            span = span.update(metadata={"interface": "whatsapp"})
2664            ```
2665        """
2666        try:
2667            span_body = {
2668                "id": self.id,
2669                "trace_id": self.trace_id,  # Included to avoid relying on the order of events sent to the API
2670                "name": name,
2671                "start_time": start_time,
2672                "metadata": metadata,
2673                "input": input,
2674                "output": output,
2675                "level": level,
2676                "status_message": status_message,
2677                "version": version,
2678                "end_time": end_time,
2679                **kwargs,
2680            }
2681            self.log.debug(f"Update span {span_body}...")
2682
2683            request = UpdateSpanBody(**span_body)
2684
2685            event = {
2686                "id": str(uuid.uuid4()),
2687                "type": "span-update",
2688                "body": request.dict(exclude_none=True),
2689            }
2690
2691            self.task_manager.add_task(event)
2692        except Exception as e:
2693            self.log.exception(e)
2694        finally:
2695            return StatefulSpanClient(
2696                self.client,
2697                self.id,
2698                StateType.OBSERVATION,
2699                self.trace_id,
2700                task_manager=self.task_manager,
2701            )
2702
2703    def end(
2704        self,
2705        *,
2706        name: typing.Optional[str] = None,
2707        start_time: typing.Optional[dt.datetime] = None,
2708        end_time: typing.Optional[dt.datetime] = None,
2709        metadata: typing.Optional[typing.Any] = None,
2710        input: typing.Optional[typing.Any] = None,
2711        output: typing.Optional[typing.Any] = None,
2712        level: typing.Optional[SpanLevel] = None,
2713        status_message: typing.Optional[str] = None,
2714        version: typing.Optional[str] = None,
2715        **kwargs,
2716    ) -> "StatefulSpanClient":
2717        """End the span, optionally updating its properties.
2718
2719        Args:
2720            name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
2721            start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
2722            end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
2723            metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
2724            level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
2725            status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
2726            input (Optional[dict]): The input to the span. Can be any JSON object.
2727            output (Optional[dict]): The output to the span. Can be any JSON object.
2728            version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
2729            **kwargs: Additional keyword arguments to include in the span.
2730
2731        Returns:
2732            StatefulSpanClient: The updated span. Passthrough for chaining.
2733
2734        Example:
2735            ```python
2736            from langfuse import Langfuse
2737
2738            langfuse = Langfuse()
2739
2740            # Create a trace
2741            trace = langfuse.trace(name = "llm-feature")
2742
2743            # Create a nested span in Langfuse
2744            span = trace.span(name="retrieval")
2745
2746            # End the span and update its properties
2747            span = span.end(metadata={"interface": "whatsapp"})
2748            ```
2749        """
2750        try:
2751            span_body = {
2752                "name": name,
2753                "start_time": start_time,
2754                "metadata": metadata,
2755                "input": input,
2756                "output": output,
2757                "level": level,
2758                "status_message": status_message,
2759                "version": version,
2760                "end_time": end_time or _get_timestamp(),
2761                **kwargs,
2762            }
2763            return self.update(**span_body)
2764
2765        except Exception as e:
2766            self.log.warning(e)
2767        finally:
2768            return StatefulSpanClient(
2769                self.client,
2770                self.id,
2771                StateType.OBSERVATION,
2772                self.trace_id,
2773                task_manager=self.task_manager,
2774            )
2775
2776    def get_langchain_handler(self, update_parent: bool = False):
2777        """Get langchain callback handler associated with the current span.
2778
2779        Args:
2780            update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run.
2781
2782        Returns:
2783            CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient.
2784        """
2785        from langfuse.callback import CallbackHandler
2786
2787        return CallbackHandler(
2788            stateful_client=self, update_stateful_client=update_parent
2789        )
2790
2791
2792class StatefulTraceClient(StatefulClient):
2793    """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient.
2794
2795    Attributes:
2796        client (FernLangfuse): Core interface for Langfuse API interaction.
2797        id (str): Unique identifier of the trace.
2798        state_type (StateType): Type of the stateful entity (observation or trace).
2799        trace_id (str): The trace ID associated with this client.
2800        task_manager (TaskManager): Manager for handling asynchronous tasks.
2801    """
2802
2803    log = logging.getLogger("langfuse")
2804
2805    def __init__(
2806        self,
2807        client: FernLangfuse,
2808        id: str,
2809        state_type: StateType,
2810        trace_id: str,
2811        task_manager: TaskManager,
2812    ):
2813        """Initialize the StatefulTraceClient."""
2814        super().__init__(client, id, state_type, trace_id, task_manager)
2815        self.task_manager = task_manager
2816
2817    def update(
2818        self,
2819        *,
2820        name: typing.Optional[str] = None,
2821        user_id: typing.Optional[str] = None,
2822        session_id: typing.Optional[str] = None,
2823        version: typing.Optional[str] = None,
2824        release: typing.Optional[str] = None,
2825        input: typing.Optional[typing.Any] = None,
2826        output: typing.Optional[typing.Any] = None,
2827        metadata: typing.Optional[typing.Any] = None,
2828        tags: typing.Optional[typing.List[str]] = None,
2829        public: typing.Optional[bool] = None,
2830        **kwargs,
2831    ) -> "StatefulTraceClient":
2832        """Update the trace.
2833
2834        Args:
2835            name: Identifier of the trace. Useful for sorting/filtering in the UI.
2836            input: The input of the trace. Can be any JSON object.
2837            output: The output of the trace. Can be any JSON object.
2838            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
2839            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
2840            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
2841            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
2842            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
2843            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
2844            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
2845            **kwargs: Additional keyword arguments that can be included in the trace.
2846
2847        Returns:
2848            StatefulTraceClient: The updated trace. Passthrough for chaining.
2849
2850        Example:
2851            ```python
2852            from langfuse import Langfuse
2853
2854            langfuse = Langfuse()
2855
2856            # Create a trace
2857            trace = langfuse.trace(
2858                name="example-application",
2859                user_id="user-1234")
2860            )
2861
2862            # Update the trace
2863            trace = trace.update(
2864                output={"result": "success"},
2865                metadata={"interface": "whatsapp"}
2866            )
2867            ```
2868        """
2869        try:
2870            trace_body = {
2871                "id": self.id,
2872                "name": name,
2873                "userId": user_id,
2874                "sessionId": session_id
2875                or kwargs.get("sessionId", None),  # backward compatibility
2876                "version": version,
2877                "release": release,
2878                "input": input,
2879                "output": output,
2880                "metadata": metadata,
2881                "public": public,
2882                "tags": tags,
2883                **kwargs,
2884            }
2885            self.log.debug(f"Update trace {trace_body}...")
2886
2887            request = TraceBody(**trace_body)
2888
2889            event = {
2890                "id": str(uuid.uuid4()),
2891                "type": "trace-create",
2892                "body": request.dict(exclude_none=True),
2893            }
2894
2895            self.task_manager.add_task(event)
2896
2897        except Exception as e:
2898            self.log.exception(e)
2899        finally:
2900            return StatefulTraceClient(
2901                self.client,
2902                self.id,
2903                StateType.TRACE,
2904                self.trace_id,
2905                task_manager=self.task_manager,
2906            )
2907
2908    def get_langchain_handler(self, update_parent: bool = False):
2909        """Get langchain callback handler associated with the current trace.
2910
2911        This method creates and returns a CallbackHandler instance, linking it with the current
2912        trace. Use this if you want to group multiple Langchain runs within a single trace.
2913
2914        Args:
2915            update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run.
2916
2917        Raises:
2918            ImportError: If the 'langchain' module is not installed, indicating missing functionality.
2919
2920        Returns:
2921            CallbackHandler: Langchain callback handler linked to the current trace.
2922
2923        Example:
2924            ```python
2925            from langfuse import Langfuse
2926
2927            langfuse = Langfuse()
2928
2929            # Create a trace
2930            trace = langfuse.trace(name = "llm-feature")
2931
2932            # Get a langchain callback handler
2933            handler = trace.get_langchain_handler()
2934            ```
2935        """
2936        try:
2937            from langfuse.callback import CallbackHandler
2938
2939            self.log.debug(f"Creating new handler for trace {self.id}")
2940
2941            return CallbackHandler(
2942                stateful_client=self,
2943                debug=self.log.level == logging.DEBUG,
2944                update_stateful_client=update_parent,
2945            )
2946        except Exception as e:
2947            self.log.exception(e)
2948
2949    def getNewHandler(self):
2950        """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated."""
2951        return self.get_langchain_handler()
2952
2953
2954class DatasetItemClient:
2955    """Class for managing dataset items in Langfuse.
2956
2957    Args:
2958        id (str): Unique identifier of the dataset item.
2959        status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
2960        input (Any): Input data of the dataset item.
2961        expected_output (Optional[Any]): Expected output of the dataset item.
2962        metadata (Optional[Any]): Additional metadata of the dataset item.
2963        source_trace_id (Optional[str]): Identifier of the source trace.
2964        source_observation_id (Optional[str]): Identifier of the source observation.
2965        dataset_id (str): Identifier of the dataset to which this item belongs.
2966        dataset_name (str): Name of the dataset to which this item belongs.
2967        created_at (datetime): Timestamp of dataset item creation.
2968        updated_at (datetime): Timestamp of the last update to the dataset item.
2969        langfuse (Langfuse): Instance of Langfuse client for API interactions.
2970
2971    Example:
2972        ```python
2973        from langfuse import Langfuse
2974
2975        langfuse = Langfuse()
2976
2977        dataset = langfuse.get_dataset("<dataset_name>")
2978
2979        for item in dataset.items:
2980            # Generate a completion using the input of every item
2981            completion, generation = llm_app.run(item.input)
2982
2983            # Evaluate the completion
2984            generation.score(
2985                name="example-score",
2986                value=1
2987            )
2988        ```
2989    """
2990
2991    log = logging.getLogger("langfuse")
2992
2993    id: str
2994    status: DatasetStatus
2995    input: typing.Any
2996    expected_output: typing.Optional[typing.Any]
2997    metadata: Optional[Any]
2998    source_trace_id: typing.Optional[str]
2999    source_observation_id: typing.Optional[str]
3000    dataset_id: str
3001    dataset_name: str
3002    created_at: dt.datetime
3003    updated_at: dt.datetime
3004
3005    langfuse: Langfuse
3006
3007    def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse):
3008        """Initialize the DatasetItemClient."""
3009        self.id = dataset_item.id
3010        self.status = dataset_item.status
3011        self.input = dataset_item.input
3012        self.expected_output = dataset_item.expected_output
3013        self.metadata = dataset_item.metadata
3014        self.source_trace_id = dataset_item.source_trace_id
3015        self.source_observation_id = dataset_item.source_observation_id
3016        self.dataset_id = dataset_item.dataset_id
3017        self.dataset_name = dataset_item.dataset_name
3018        self.created_at = dataset_item.created_at
3019        self.updated_at = dataset_item.updated_at
3020
3021        self.langfuse = langfuse
3022
3023    def flush(self, observation: StatefulClient, run_name: str):
3024        """Flushes an observations task manager's queue.
3025
3026        Used before creating a dataset run item to ensure all events are persistent.
3027
3028        Args:
3029            observation (StatefulClient): The observation or trace client associated with the dataset item.
3030            run_name (str): The name of the dataset run.
3031        """
3032        observation.task_manager.flush()
3033
3034    def link(
3035        self,
3036        trace_or_observation: typing.Union[StatefulClient, str, None],
3037        run_name: str,
3038        run_metadata: Optional[Any] = None,
3039        run_description: Optional[str] = None,
3040        trace_id: Optional[str] = None,
3041        observation_id: Optional[str] = None,
3042    ):
3043        """Link the dataset item to observation within a specific dataset run. Creates a dataset run item.
3044
3045        Args:
3046            trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID.
3047            run_name (str): The name of the dataset run.
3048            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3049            run_description (Optional[str]): Description of the dataset run.
3050            trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided.
3051            observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided.
3052        """
3053        parsed_trace_id: str = None
3054        parsed_observation_id: str = None
3055
3056        if isinstance(trace_or_observation, StatefulClient):
3057            # flush the queue before creating the dataset run item
3058            # to ensure that all events are persisted.
3059            if trace_or_observation.state_type == StateType.TRACE:
3060                parsed_trace_id = trace_or_observation.trace_id
3061            elif trace_or_observation.state_type == StateType.OBSERVATION:
3062                parsed_observation_id = trace_or_observation.id
3063                parsed_trace_id = trace_or_observation.trace_id
3064        # legacy support for observation_id
3065        elif isinstance(trace_or_observation, str):
3066            parsed_observation_id = trace_or_observation
3067        elif trace_or_observation is None:
3068            if trace_id is not None:
3069                parsed_trace_id = trace_id
3070                if observation_id is not None:
3071                    parsed_observation_id = observation_id
3072            else:
3073                raise ValueError(
3074                    "trace_id must be provided if trace_or_observation is None"
3075                )
3076        else:
3077            raise ValueError(
3078                "trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item"
3079            )
3080
3081        self.log.debug(
3082            f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}"
3083        )
3084        self.langfuse.client.dataset_run_items.create(
3085            request=CreateDatasetRunItemRequest(
3086                runName=run_name,
3087                datasetItemId=self.id,
3088                traceId=parsed_trace_id,
3089                observationId=parsed_observation_id,
3090                metadata=run_metadata,
3091                runDescription=run_description,
3092            )
3093        )
3094
3095    def get_langchain_handler(
3096        self,
3097        *,
3098        run_name: str,
3099        run_description: Optional[str] = None,
3100        run_metadata: Optional[Any] = None,
3101    ):
3102        """Create and get a langchain callback handler linked to this dataset item.
3103
3104        Args:
3105            run_name (str): The name of the dataset run to be used in the callback handler.
3106            run_description (Optional[str]): Description of the dataset run.
3107            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3108
3109        Returns:
3110            CallbackHandler: An instance of CallbackHandler linked to the dataset item.
3111        """
3112        metadata = {
3113            "dataset_item_id": self.id,
3114            "run_name": run_name,
3115            "dataset_id": self.dataset_id,
3116        }
3117        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3118
3119        self.link(
3120            trace, run_name, run_metadata=run_metadata, run_description=run_description
3121        )
3122
3123        return trace.get_langchain_handler(update_parent=True)
3124
3125    @contextmanager
3126    def observe(
3127        self,
3128        *,
3129        run_name: str,
3130        run_description: Optional[str] = None,
3131        run_metadata: Optional[Any] = None,
3132        trace_id: Optional[str] = None,
3133    ):
3134        """Observes a dataset run within the Langfuse client.
3135
3136        Args:
3137            run_name (str): The name of the dataset run.
3138            root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created.
3139            run_description (Optional[str]): The description of the dataset run.
3140            run_metadata (Optional[Any]): Additional metadata for the dataset run.
3141
3142        Yields:
3143            StatefulTraceClient: The trace associated with the dataset run.
3144        """
3145        from langfuse.decorators import langfuse_context
3146
3147        root_trace_id = trace_id or str(uuid.uuid4())
3148
3149        langfuse_context._set_root_trace_id(root_trace_id)
3150
3151        try:
3152            yield root_trace_id
3153
3154        finally:
3155            self.link(
3156                run_name=run_name,
3157                run_metadata=run_metadata,
3158                run_description=run_description,
3159                trace_or_observation=None,
3160                trace_id=root_trace_id,
3161            )
3162
3163    @contextmanager
3164    def observe_llama_index(
3165        self,
3166        *,
3167        run_name: str,
3168        run_description: Optional[str] = None,
3169        run_metadata: Optional[Any] = None,
3170        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
3171    ):
3172        """Context manager for observing LlamaIndex operations linked to this dataset item.
3173
3174        This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging
3175        and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all
3176        operations performed within the context are linked to the appropriate dataset item and run in Langfuse.
3177
3178        Args:
3179            run_name (str): The name of the dataset run.
3180            run_description (Optional[str]): Description of the dataset run. Defaults to None.
3181            run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None.
3182            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass
3183                to the LlamaIndex integration constructor. Defaults to an empty dictionary.
3184
3185        Yields:
3186            LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations.
3187
3188        Example:
3189            ```python
3190            dataset_item = dataset.items[0]
3191
3192            with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler:
3193                # Perform LlamaIndex operations here
3194                some_llama_index_operation()
3195            ```
3196
3197        Raises:
3198            ImportError: If required modules for LlamaIndex integration are not available.
3199        """
3200        metadata = {
3201            "dataset_item_id": self.id,
3202            "run_name": run_name,
3203            "dataset_id": self.dataset_id,
3204        }
3205        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3206        self.link(
3207            trace, run_name, run_metadata=run_metadata, run_description=run_description
3208        )
3209
3210        try:
3211            import llama_index.core
3212            from llama_index.core import Settings
3213            from llama_index.core.callbacks import CallbackManager
3214
3215            from langfuse.llama_index import LlamaIndexCallbackHandler
3216
3217            callback_handler = LlamaIndexCallbackHandler(
3218                **llama_index_integration_constructor_kwargs,
3219            )
3220            callback_handler.set_root(trace, update_root=True)
3221
3222            # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler
3223            # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it
3224            prev_global_handler = llama_index.core.global_handler
3225            prev_langfuse_handler = None
3226
3227            if isinstance(prev_global_handler, LlamaIndexCallbackHandler):
3228                llama_index.core.global_handler = None
3229
3230            if Settings.callback_manager is None:
3231                Settings.callback_manager = CallbackManager([callback_handler])
3232            else:
3233                for handler in Settings.callback_manager.handlers:
3234                    if isinstance(handler, LlamaIndexCallbackHandler):
3235                        prev_langfuse_handler = handler
3236                        Settings.callback_manager.remove_handler(handler)
3237
3238                Settings.callback_manager.add_handler(callback_handler)
3239
3240        except Exception as e:
3241            self.log.exception(e)
3242
3243        try:
3244            yield callback_handler
3245        finally:
3246            # Reset the handlers
3247            Settings.callback_manager.remove_handler(callback_handler)
3248            if prev_langfuse_handler is not None:
3249                Settings.callback_manager.add_handler(prev_langfuse_handler)
3250
3251            llama_index.core.global_handler = prev_global_handler
3252
3253    def get_llama_index_handler(
3254        self,
3255        *,
3256        run_name: str,
3257        run_description: Optional[str] = None,
3258        run_metadata: Optional[Any] = None,
3259        llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
3260    ):
3261        """Create and get a llama-index callback handler linked to this dataset item.
3262
3263        Args:
3264            run_name (str): The name of the dataset run to be used in the callback handler.
3265            run_description (Optional[str]): Description of the dataset run.
3266            run_metadata (Optional[Any]): Additional metadata to include in dataset run.
3267            llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor.
3268
3269        Returns:
3270            LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item.
3271        """
3272        metadata = {
3273            "dataset_item_id": self.id,
3274            "run_name": run_name,
3275            "dataset_id": self.dataset_id,
3276        }
3277        trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
3278
3279        self.link(
3280            trace, run_name, run_metadata=run_metadata, run_description=run_description
3281        )
3282
3283        try:
3284            from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler
3285
3286            callback_handler = LlamaIndexCallbackHandler(
3287                **llama_index_integration_constructor_kwargs,
3288            )
3289            callback_handler.set_root(trace, update_root=True)
3290
3291            return callback_handler
3292        except Exception as e:
3293            self.log.exception(e)
3294
3295
3296class DatasetClient:
3297    """Class for managing datasets in Langfuse.
3298
3299    Attributes:
3300        id (str): Unique identifier of the dataset.
3301        name (str): Name of the dataset.
3302        description (Optional[str]): Description of the dataset.
3303        metadata (Optional[typing.Any]): Additional metadata of the dataset.
3304        project_id (str): Identifier of the project to which the dataset belongs.
3305        dataset_name (str): Name of the dataset.
3306        created_at (datetime): Timestamp of dataset creation.
3307        updated_at (datetime): Timestamp of the last update to the dataset.
3308        items (List[DatasetItemClient]): List of dataset items associated with the dataset.
3309        runs (List[str]): List of dataset runs associated with the dataset. Deprecated.
3310
3311    Example:
3312        Print the input of each dataset item in a dataset.
3313        ```python
3314        from langfuse import Langfuse
3315
3316        langfuse = Langfuse()
3317
3318        dataset = langfuse.get_dataset("<dataset_name>")
3319
3320        for item in dataset.items:
3321            print(item.input)
3322        ```
3323    """
3324
3325    id: str
3326    name: str
3327    description: Optional[str]
3328    project_id: str
3329    dataset_name: str  # for backward compatibility, to be deprecated
3330    metadata: Optional[Any]
3331    created_at: dt.datetime
3332    updated_at: dt.datetime
3333    items: typing.List[DatasetItemClient]
3334    runs: typing.List[str] = []  # deprecated
3335
3336    def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]):
3337        """Initialize the DatasetClient."""
3338        self.id = dataset.id
3339        self.name = dataset.name
3340        self.description = dataset.description
3341        self.project_id = dataset.project_id
3342        self.metadata = dataset.metadata
3343        self.dataset_name = dataset.name  # for backward compatibility, to be deprecated
3344        self.created_at = dataset.created_at
3345        self.updated_at = dataset.updated_at
3346        self.items = items
@dataclass
class FetchTracesResponse:
 96@dataclass
 97class FetchTracesResponse:
 98    """Response object for fetch_traces method."""
 99
100    data: typing.List[TraceWithDetails]
101    meta: MetaResponse

Response object for fetch_traces method.

FetchTracesResponse( data: List[langfuse.api.TraceWithDetails], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
@dataclass
class FetchTraceResponse:
104@dataclass
105class FetchTraceResponse:
106    """Response object for fetch_trace method."""
107
108    data: TraceWithFullDetails

Response object for fetch_trace method.

FetchTraceResponse( data: langfuse.api.TraceWithFullDetails)
@dataclass
class FetchObservationsResponse:
111@dataclass
112class FetchObservationsResponse:
113    """Response object for fetch_observations method."""
114
115    data: typing.List[ObservationsView]
116    meta: MetaResponse

Response object for fetch_observations method.

FetchObservationsResponse( data: List[langfuse.api.ObservationsView], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
@dataclass
class FetchObservationResponse:
119@dataclass
120class FetchObservationResponse:
121    """Response object for fetch_observation method."""
122
123    data: Observation

Response object for fetch_observation method.

FetchObservationResponse(data: langfuse.api.Observation)
@dataclass
class FetchSessionsResponse:
126@dataclass
127class FetchSessionsResponse:
128    """Response object for fetch_sessions method."""
129
130    data: typing.List[Session]
131    meta: MetaResponse

Response object for fetch_sessions method.

FetchSessionsResponse( data: List[langfuse.api.Session], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.Session]
meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse
class Langfuse:
 134class Langfuse(object):
 135    """Langfuse Python client.
 136
 137    Attributes:
 138        log (logging.Logger): Logger for the Langfuse client.
 139        base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
 140        httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
 141        client (FernLangfuse): Core interface for Langfuse API interaction.
 142        task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
 143        release (str): Identifies the release number or hash of the application.
 144        prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
 145
 146    Example:
 147        Initiating the Langfuse client should always be first step to use Langfuse.
 148        ```python
 149        import os
 150        from langfuse import Langfuse
 151
 152        # Set the public and secret keys as environment variables
 153        os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 154        os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 155
 156        # Initialize the Langfuse client using the credentials
 157        langfuse = Langfuse()
 158        ```
 159    """
 160
 161    log = logging.getLogger("langfuse")
 162    """Logger for the Langfuse client."""
 163
 164    host: str
 165    """Host of Langfuse API."""
 166
 167    def __init__(
 168        self,
 169        public_key: Optional[str] = None,
 170        secret_key: Optional[str] = None,
 171        host: Optional[str] = None,
 172        release: Optional[str] = None,
 173        debug: bool = False,
 174        threads: Optional[int] = None,
 175        flush_at: Optional[int] = None,
 176        flush_interval: Optional[float] = None,
 177        max_retries: Optional[int] = None,
 178        timeout: Optional[int] = None,  # seconds
 179        sdk_integration: Optional[str] = "default",
 180        httpx_client: Optional[httpx.Client] = None,
 181        enabled: Optional[bool] = True,
 182        sample_rate: Optional[float] = None,
 183    ):
 184        """Initialize the Langfuse client.
 185
 186        Args:
 187            public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
 188            secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
 189            host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
 190            release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
 191            debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
 192            threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
 193            flush_at: Max batch size that's sent to the API.
 194            flush_interval: Max delay until a new batch is sent to the API.
 195            max_retries: Max number of retries in case of API/network errors.
 196            timeout: Timeout of API requests in seconds. Defaults to 20 seconds.
 197            httpx_client: Pass your own httpx client for more customizability of requests.
 198            sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
 199            enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
 200            sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
 201
 202        Raises:
 203            ValueError: If public_key or secret_key are not set and not found in environment variables.
 204
 205        Example:
 206            Initiating the Langfuse client should always be first step to use Langfuse.
 207            ```python
 208            import os
 209            from langfuse import Langfuse
 210
 211            # Set the public and secret keys as environment variables
 212            os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
 213            os.environ['LANGFUSE_SECRET_KEY'] = secret_key
 214
 215            # Initialize the Langfuse client using the credentials
 216            langfuse = Langfuse()
 217            ```
 218        """
 219        self.enabled = enabled
 220        public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 221        secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 222        sample_rate = (
 223            sample_rate
 224            if sample_rate
 225            is not None  # needs explicit None check, as 0 is a valid value
 226            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 227        )
 228
 229        if sample_rate is not None and (
 230            sample_rate > 1 or sample_rate < 0
 231        ):  # default value 1 will be set in the taskmanager
 232            self.enabled = False
 233            self.log.warning(
 234                "Langfuse client is disabled since the sample rate provided is not between 0 and 1."
 235            )
 236
 237        threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
 238        flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
 239        flush_interval = flush_interval or float(
 240            os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
 241        )
 242
 243        max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
 244        timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
 245
 246        if not self.enabled:
 247            self.log.warning(
 248                "Langfuse client is disabled. No observability data will be sent."
 249            )
 250
 251        elif not public_key:
 252            self.enabled = False
 253            self.log.warning(
 254                "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 255            )
 256
 257        elif not secret_key:
 258            self.enabled = False
 259            self.log.warning(
 260                "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
 261            )
 262
 263        set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
 264
 265        if set_debug is True:
 266            # Ensures that debug level messages are logged when debug mode is on.
 267            # Otherwise, defaults to WARNING level.
 268            # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
 269            logging.basicConfig()
 270            self.log.setLevel(logging.DEBUG)
 271
 272            clean_logger()
 273        else:
 274            self.log.setLevel(logging.WARNING)
 275            clean_logger()
 276
 277        self.base_url = (
 278            host
 279            if host
 280            else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
 281        )
 282
 283        self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
 284
 285        self.client = FernLangfuse(
 286            base_url=self.base_url,
 287            username=public_key,
 288            password=secret_key,
 289            x_langfuse_sdk_name="python",
 290            x_langfuse_sdk_version=version,
 291            x_langfuse_public_key=public_key,
 292            httpx_client=self.httpx_client,
 293        )
 294
 295        langfuse_client = LangfuseClient(
 296            public_key=public_key,
 297            secret_key=secret_key,
 298            base_url=self.base_url,
 299            version=version,
 300            timeout=timeout,
 301            session=self.httpx_client,
 302        )
 303
 304        args = {
 305            "threads": threads,
 306            "flush_at": flush_at,
 307            "flush_interval": flush_interval,
 308            "max_retries": max_retries,
 309            "client": langfuse_client,
 310            "public_key": public_key,
 311            "sdk_name": "python",
 312            "sdk_version": version,
 313            "sdk_integration": sdk_integration,
 314            "enabled": self.enabled,
 315            "sample_rate": sample_rate,
 316        }
 317
 318        self.task_manager = TaskManager(**args)
 319
 320        self.trace_id = None
 321
 322        self.release = self._get_release_value(release)
 323
 324        self.prompt_cache = PromptCache()
 325
 326    def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
 327        if release:
 328            return release
 329        elif "LANGFUSE_RELEASE" in os.environ:
 330            return os.environ["LANGFUSE_RELEASE"]
 331        else:
 332            return get_common_release_envs()
 333
 334    def get_trace_id(self) -> str:
 335        """Get the current trace id."""
 336        return self.trace_id
 337
 338    def get_trace_url(self) -> str:
 339        """Get the URL of the current trace to view it in the Langfuse UI."""
 340        return f"{self.base_url}/trace/{self.trace_id}"
 341
 342    def get_dataset(
 343        self, name: str, *, fetch_items_page_size: Optional[int] = 50
 344    ) -> "DatasetClient":
 345        """Fetch a dataset by its name.
 346
 347        Args:
 348            name (str): The name of the dataset to fetch.
 349            fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
 350
 351        Returns:
 352            DatasetClient: The dataset with the given name.
 353        """
 354        try:
 355            self.log.debug(f"Getting datasets {name}")
 356            dataset = self.client.datasets.get(dataset_name=name)
 357
 358            dataset_items = []
 359            page = 1
 360            while True:
 361                new_items = self.client.dataset_items.list(
 362                    dataset_name=name, page=page, limit=fetch_items_page_size
 363                )
 364                dataset_items.extend(new_items.data)
 365                if new_items.meta.total_pages <= page:
 366                    break
 367                page += 1
 368
 369            items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
 370
 371            return DatasetClient(dataset, items=items)
 372        except Exception as e:
 373            handle_fern_exception(e)
 374            raise e
 375
 376    def get_dataset_item(self, id: str) -> "DatasetItemClient":
 377        """Get the dataset item with the given id."""
 378        try:
 379            self.log.debug(f"Getting dataset item {id}")
 380            dataset_item = self.client.dataset_items.get(id=id)
 381            return DatasetItemClient(dataset_item, langfuse=self)
 382        except Exception as e:
 383            handle_fern_exception(e)
 384            raise e
 385
 386    def auth_check(self) -> bool:
 387        """Check if the provided credentials (public and secret key) are valid.
 388
 389        Raises:
 390            Exception: If no projects were found for the provided credentials.
 391
 392        Note:
 393            This method is blocking. It is discouraged to use it in production code.
 394        """
 395        try:
 396            projects = self.client.projects.get()
 397            self.log.debug(
 398                f"Auth check successful, found {len(projects.data)} projects"
 399            )
 400            if len(projects.data) == 0:
 401                raise Exception(
 402                    "Auth check failed, no project found for the keys provided."
 403                )
 404            return True
 405
 406        except Exception as e:
 407            handle_fern_exception(e)
 408            raise e
 409
 410    def get_dataset_runs(
 411        self,
 412        dataset_name: str,
 413        *,
 414        page: typing.Optional[int] = None,
 415        limit: typing.Optional[int] = None,
 416    ) -> PaginatedDatasetRuns:
 417        """Get all dataset runs.
 418
 419        Args:
 420            dataset_name (str): Name of the dataset.
 421            page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None.
 422            limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50.
 423
 424        Returns:
 425            PaginatedDatasetRuns: The dataset runs.
 426        """
 427        try:
 428            self.log.debug("Getting dataset runs")
 429            return self.client.datasets.get_runs(
 430                dataset_name=dataset_name, page=page, limit=limit
 431            )
 432        except Exception as e:
 433            handle_fern_exception(e)
 434            raise e
 435
 436    def get_dataset_run(
 437        self,
 438        dataset_name: str,
 439        dataset_run_name: str,
 440    ) -> DatasetRunWithItems:
 441        """Get a dataset run.
 442
 443        Args:
 444            dataset_name: Name of the dataset.
 445            dataset_run_name: Name of the dataset run.
 446
 447        Returns:
 448            DatasetRunWithItems: The dataset run.
 449        """
 450        try:
 451            self.log.debug(
 452                f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
 453            )
 454            return self.client.datasets.get_run(
 455                dataset_name=dataset_name, run_name=dataset_run_name
 456            )
 457        except Exception as e:
 458            handle_fern_exception(e)
 459            raise e
 460
 461    def create_dataset(
 462        self,
 463        name: str,
 464        description: Optional[str] = None,
 465        metadata: Optional[Any] = None,
 466    ) -> Dataset:
 467        """Create a dataset with the given name on Langfuse.
 468
 469        Args:
 470            name: Name of the dataset to create.
 471            description: Description of the dataset. Defaults to None.
 472            metadata: Additional metadata. Defaults to None.
 473
 474        Returns:
 475            Dataset: The created dataset as returned by the Langfuse API.
 476        """
 477        try:
 478            body = CreateDatasetRequest(
 479                name=name, description=description, metadata=metadata
 480            )
 481            self.log.debug(f"Creating datasets {body}")
 482            return self.client.datasets.create(request=body)
 483        except Exception as e:
 484            handle_fern_exception(e)
 485            raise e
 486
 487    def create_dataset_item(
 488        self,
 489        dataset_name: str,
 490        input: Optional[Any] = None,
 491        expected_output: Optional[Any] = None,
 492        metadata: Optional[Any] = None,
 493        source_trace_id: Optional[str] = None,
 494        source_observation_id: Optional[str] = None,
 495        status: Optional[DatasetStatus] = None,
 496        id: Optional[str] = None,
 497    ) -> DatasetItem:
 498        """Create a dataset item.
 499
 500        Upserts if an item with id already exists.
 501
 502        Args:
 503            dataset_name: Name of the dataset in which the dataset item should be created.
 504            input: Input data. Defaults to None. Can contain any dict, list or scalar.
 505            expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
 506            metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
 507            source_trace_id: Id of the source trace. Defaults to None.
 508            source_observation_id: Id of the source observation. Defaults to None.
 509            status: Status of the dataset item. Defaults to ACTIVE for newly created items.
 510            id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
 511
 512        Returns:
 513            DatasetItem: The created dataset item as returned by the Langfuse API.
 514
 515        Example:
 516            ```python
 517            from langfuse import Langfuse
 518
 519            langfuse = Langfuse()
 520
 521            # Uploading items to the Langfuse dataset named "capital_cities"
 522            langfuse.create_dataset_item(
 523                dataset_name="capital_cities",
 524                input={"input": {"country": "Italy"}},
 525                expected_output={"expected_output": "Rome"},
 526                metadata={"foo": "bar"}
 527            )
 528            ```
 529        """
 530        try:
 531            body = CreateDatasetItemRequest(
 532                datasetName=dataset_name,
 533                input=input,
 534                expectedOutput=expected_output,
 535                metadata=metadata,
 536                sourceTraceId=source_trace_id,
 537                sourceObservationId=source_observation_id,
 538                status=status,
 539                id=id,
 540            )
 541            self.log.debug(f"Creating dataset item {body}")
 542            return self.client.dataset_items.create(request=body)
 543        except Exception as e:
 544            handle_fern_exception(e)
 545            raise e
 546
 547    def fetch_trace(
 548        self,
 549        id: str,
 550    ) -> FetchTraceResponse:
 551        """Fetch a trace via the Langfuse API by its id.
 552
 553        Args:
 554            id: The id of the trace to fetch.
 555
 556        Returns:
 557            FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`.
 558
 559        Raises:
 560            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 561        """
 562        try:
 563            self.log.debug(f"Getting trace {id}")
 564            trace = self.client.trace.get(id)
 565            return FetchTraceResponse(data=trace)
 566        except Exception as e:
 567            handle_fern_exception(e)
 568            raise e
 569
 570    def get_trace(
 571        self,
 572        id: str,
 573    ) -> TraceWithFullDetails:
 574        """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead.
 575
 576        Args:
 577            id: The id of the trace to fetch.
 578
 579        Returns:
 580            TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
 581
 582        Raises:
 583            Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
 584        """
 585        warnings.warn(
 586            "get_trace is deprecated, use fetch_trace instead.",
 587            DeprecationWarning,
 588        )
 589
 590        try:
 591            self.log.debug(f"Getting trace {id}")
 592            return self.client.trace.get(id)
 593        except Exception as e:
 594            handle_fern_exception(e)
 595            raise e
 596
 597    def fetch_traces(
 598        self,
 599        *,
 600        page: Optional[int] = None,
 601        limit: Optional[int] = None,
 602        user_id: Optional[str] = None,
 603        name: Optional[str] = None,
 604        session_id: Optional[str] = None,
 605        from_timestamp: Optional[dt.datetime] = None,
 606        to_timestamp: Optional[dt.datetime] = None,
 607        order_by: Optional[str] = None,
 608        tags: Optional[Union[str, Sequence[str]]] = None,
 609    ) -> FetchTracesResponse:
 610        """Fetch a list of traces in the current project matching the given parameters.
 611
 612        Args:
 613            page (Optional[int]): Page number, starts at 1. Defaults to None.
 614            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 615            name (Optional[str]): Filter by name of traces. Defaults to None.
 616            user_id (Optional[str]): Filter by user_id. Defaults to None.
 617            session_id (Optional[str]): Filter by session_id. Defaults to None.
 618            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 619            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 620            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 621            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 622
 623        Returns:
 624            FetchTracesResponse, list of traces on `data` and metadata on `meta`.
 625
 626        Raises:
 627            Exception: If an error occurred during the request.
 628        """
 629        try:
 630            self.log.debug(
 631                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 632            )
 633            res = self.client.trace.list(
 634                page=page,
 635                limit=limit,
 636                name=name,
 637                user_id=user_id,
 638                session_id=session_id,
 639                from_timestamp=from_timestamp,
 640                to_timestamp=to_timestamp,
 641                order_by=order_by,
 642                tags=tags,
 643            )
 644            return FetchTracesResponse(data=res.data, meta=res.meta)
 645        except Exception as e:
 646            handle_fern_exception(e)
 647            raise e
 648
 649    def get_traces(
 650        self,
 651        *,
 652        page: Optional[int] = None,
 653        limit: Optional[int] = None,
 654        user_id: Optional[str] = None,
 655        name: Optional[str] = None,
 656        session_id: Optional[str] = None,
 657        from_timestamp: Optional[dt.datetime] = None,
 658        to_timestamp: Optional[dt.datetime] = None,
 659        order_by: Optional[str] = None,
 660        tags: Optional[Union[str, Sequence[str]]] = None,
 661    ) -> Traces:
 662        """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead.
 663
 664        Args:
 665            page (Optional[int]): Page number, starts at 1. Defaults to None.
 666            limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
 667            name (Optional[str]): Filter by name of traces. Defaults to None.
 668            user_id (Optional[str]): Filter by user_id. Defaults to None.
 669            session_id (Optional[str]): Filter by session_id. Defaults to None.
 670            from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
 671            to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
 672            order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
 673            tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
 674
 675        Returns:
 676            List of Traces
 677
 678        Raises:
 679            Exception: If an error occurred during the request.
 680        """
 681        warnings.warn(
 682            "get_traces is deprecated, use fetch_traces instead.",
 683            DeprecationWarning,
 684        )
 685        try:
 686            self.log.debug(
 687                f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
 688            )
 689            return self.client.trace.list(
 690                page=page,
 691                limit=limit,
 692                name=name,
 693                user_id=user_id,
 694                session_id=session_id,
 695                from_timestamp=from_timestamp,
 696                to_timestamp=to_timestamp,
 697                order_by=order_by,
 698                tags=tags,
 699            )
 700        except Exception as e:
 701            handle_fern_exception(e)
 702            raise e
 703
 704    def fetch_observations(
 705        self,
 706        *,
 707        page: typing.Optional[int] = None,
 708        limit: typing.Optional[int] = None,
 709        name: typing.Optional[str] = None,
 710        user_id: typing.Optional[str] = None,
 711        trace_id: typing.Optional[str] = None,
 712        parent_observation_id: typing.Optional[str] = None,
 713        from_start_time: typing.Optional[dt.datetime] = None,
 714        to_start_time: typing.Optional[dt.datetime] = None,
 715        type: typing.Optional[str] = None,
 716    ) -> FetchObservationsResponse:
 717        """Get a list of observations in the current project matching the given parameters.
 718
 719        Args:
 720            page (Optional[int]): Page number of the observations to return. Defaults to None.
 721            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 722            name (Optional[str]): Name of the observations to return. Defaults to None.
 723            user_id (Optional[str]): User identifier. Defaults to None.
 724            trace_id (Optional[str]): Trace identifier. Defaults to None.
 725            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 726            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 727            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 728            type (Optional[str]): Type of the observation. Defaults to None.
 729
 730        Returns:
 731            FetchObservationsResponse, list of observations on `data` and metadata on `meta`.
 732
 733        Raises:
 734            Exception: If an error occurred during the request.
 735        """
 736        try:
 737            self.log.debug(
 738                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 739            )
 740            res = self.client.observations.get_many(
 741                page=page,
 742                limit=limit,
 743                name=name,
 744                user_id=user_id,
 745                trace_id=trace_id,
 746                parent_observation_id=parent_observation_id,
 747                from_start_time=from_start_time,
 748                to_start_time=to_start_time,
 749                type=type,
 750            )
 751            return FetchObservationsResponse(data=res.data, meta=res.meta)
 752        except Exception as e:
 753            self.log.exception(e)
 754            raise e
 755
 756    def get_observations(
 757        self,
 758        *,
 759        page: typing.Optional[int] = None,
 760        limit: typing.Optional[int] = None,
 761        name: typing.Optional[str] = None,
 762        user_id: typing.Optional[str] = None,
 763        trace_id: typing.Optional[str] = None,
 764        parent_observation_id: typing.Optional[str] = None,
 765        from_start_time: typing.Optional[dt.datetime] = None,
 766        to_start_time: typing.Optional[dt.datetime] = None,
 767        type: typing.Optional[str] = None,
 768    ) -> ObservationsViews:
 769        """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead.
 770
 771        Args:
 772            page (Optional[int]): Page number of the observations to return. Defaults to None.
 773            limit (Optional[int]): Maximum number of observations to return. Defaults to None.
 774            name (Optional[str]): Name of the observations to return. Defaults to None.
 775            user_id (Optional[str]): User identifier. Defaults to None.
 776            trace_id (Optional[str]): Trace identifier. Defaults to None.
 777            parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
 778            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 779            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 780            type (Optional[str]): Type of the observation. Defaults to None.
 781
 782        Returns:
 783            List of ObservationsViews: List of observations in the project matching the given parameters.
 784
 785        Raises:
 786            Exception: If an error occurred during the request.
 787        """
 788        warnings.warn(
 789            "get_observations is deprecated, use fetch_observations instead.",
 790            DeprecationWarning,
 791        )
 792        try:
 793            self.log.debug(
 794                f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
 795            )
 796            return self.client.observations.get_many(
 797                page=page,
 798                limit=limit,
 799                name=name,
 800                user_id=user_id,
 801                trace_id=trace_id,
 802                parent_observation_id=parent_observation_id,
 803                from_start_time=from_start_time,
 804                to_start_time=to_start_time,
 805                type=type,
 806            )
 807        except Exception as e:
 808            handle_fern_exception(e)
 809            raise e
 810
 811    def get_generations(
 812        self,
 813        *,
 814        page: typing.Optional[int] = None,
 815        limit: typing.Optional[int] = None,
 816        name: typing.Optional[str] = None,
 817        user_id: typing.Optional[str] = None,
 818        trace_id: typing.Optional[str] = None,
 819        from_start_time: typing.Optional[dt.datetime] = None,
 820        to_start_time: typing.Optional[dt.datetime] = None,
 821        parent_observation_id: typing.Optional[str] = None,
 822    ) -> ObservationsViews:
 823        """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead.
 824
 825        Args:
 826            page (Optional[int]): Page number of the generations to return. Defaults to None.
 827            limit (Optional[int]): Maximum number of generations to return. Defaults to None.
 828            name (Optional[str]): Name of the generations to return. Defaults to None.
 829            user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
 830            trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
 831            from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
 832            to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
 833            parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
 834
 835        Returns:
 836            List of ObservationsViews: List of generations in the project matching the given parameters.
 837
 838        Raises:
 839            Exception: If an error occurred during the request.
 840        """
 841        warnings.warn(
 842            "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.",
 843            DeprecationWarning,
 844        )
 845        return self.get_observations(
 846            page=page,
 847            limit=limit,
 848            name=name,
 849            user_id=user_id,
 850            trace_id=trace_id,
 851            parent_observation_id=parent_observation_id,
 852            from_start_time=from_start_time,
 853            to_start_time=to_start_time,
 854            type="GENERATION",
 855        )
 856
 857    def fetch_observation(
 858        self,
 859        id: str,
 860    ) -> FetchObservationResponse:
 861        """Get an observation in the current project with the given identifier.
 862
 863        Args:
 864            id: The identifier of the observation to fetch.
 865
 866        Returns:
 867            FetchObservationResponse: The observation with the given id on `data`.
 868
 869        Raises:
 870            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 871        """
 872        try:
 873            self.log.debug(f"Getting observation {id}")
 874            observation = self.client.observations.get(id)
 875            return FetchObservationResponse(data=observation)
 876        except Exception as e:
 877            handle_fern_exception(e)
 878            raise e
 879
 880    def get_observation(
 881        self,
 882        id: str,
 883    ) -> Observation:
 884        """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead.
 885
 886        Args:
 887            id: The identifier of the observation to fetch.
 888
 889        Raises:
 890            Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
 891        """
 892        warnings.warn(
 893            "get_observation is deprecated, use fetch_observation instead.",
 894            DeprecationWarning,
 895        )
 896        try:
 897            self.log.debug(f"Getting observation {id}")
 898            return self.client.observations.get(id)
 899        except Exception as e:
 900            handle_fern_exception(e)
 901            raise e
 902
 903    def fetch_sessions(
 904        self,
 905        *,
 906        page: typing.Optional[int] = None,
 907        limit: typing.Optional[int] = None,
 908        from_timestamp: typing.Optional[dt.datetime] = None,
 909        to_timestamp: typing.Optional[dt.datetime] = None,
 910    ) -> FetchSessionsResponse:
 911        """Get a list of sessions in the current project.
 912
 913        Args:
 914            page (Optional[int]): Page number of the sessions to return. Defaults to None.
 915            limit (Optional[int]): Maximum number of sessions to return. Defaults to None.
 916            from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None.
 917            to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None.
 918
 919        Returns:
 920            FetchSessionsResponse, list of sessions on `data` and metadata on `meta`.
 921
 922        Raises:
 923            Exception: If an error occurred during the request.
 924        """
 925        try:
 926            self.log.debug(
 927                f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}"
 928            )
 929            res = self.client.sessions.list(
 930                page=page,
 931                limit=limit,
 932                from_timestamp=from_timestamp,
 933                to_timestamp=to_timestamp,
 934            )
 935            return FetchSessionsResponse(data=res.data, meta=res.meta)
 936        except Exception as e:
 937            handle_fern_exception(e)
 938            raise e
 939
 940    @overload
 941    def get_prompt(
 942        self,
 943        name: str,
 944        version: Optional[int] = None,
 945        *,
 946        label: Optional[str] = None,
 947        type: Literal["chat"],
 948        cache_ttl_seconds: Optional[int] = None,
 949        fallback: Optional[List[ChatMessageDict]] = None,
 950        max_retries: Optional[int] = None,
 951        fetch_timeout_seconds: Optional[int] = None,
 952    ) -> ChatPromptClient: ...
 953
 954    @overload
 955    def get_prompt(
 956        self,
 957        name: str,
 958        version: Optional[int] = None,
 959        *,
 960        label: Optional[str] = None,
 961        type: Literal["text"] = "text",
 962        cache_ttl_seconds: Optional[int] = None,
 963        fallback: Optional[str] = None,
 964        max_retries: Optional[int] = None,
 965        fetch_timeout_seconds: Optional[int] = None,
 966    ) -> TextPromptClient: ...
 967
 968    def get_prompt(
 969        self,
 970        name: str,
 971        version: Optional[int] = None,
 972        *,
 973        label: Optional[str] = None,
 974        type: Literal["chat", "text"] = "text",
 975        cache_ttl_seconds: Optional[int] = None,
 976        fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None,
 977        max_retries: Optional[int] = None,
 978        fetch_timeout_seconds: Optional[int] = None,
 979    ) -> PromptClient:
 980        """Get a prompt.
 981
 982        This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
 983        in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
 984        and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
 985        return the expired prompt as a fallback.
 986
 987        Args:
 988            name (str): The name of the prompt to retrieve.
 989
 990        Keyword Args:
 991            version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 992            label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
 993            cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
 994            keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0.
 995            type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
 996            fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None.
 997            max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds.
 998            fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default.
 999
1000        Returns:
1001            The prompt object retrieved from the cache or directly fetched if not cached or expired of type
1002            - TextPromptClient, if type argument is 'text'.
1003            - ChatPromptClient, if type argument is 'chat'.
1004
1005        Raises:
1006            Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
1007            expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
1008        """
1009        if version is not None and label is not None:
1010            raise ValueError("Cannot specify both version and label at the same time.")
1011
1012        if not name:
1013            raise ValueError("Prompt name cannot be empty.")
1014
1015        cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
1016        bounded_max_retries = self._get_bounded_max_retries(
1017            max_retries, default_max_retries=2, max_retries_upper_bound=4
1018        )
1019
1020        self.log.debug(f"Getting prompt '{cache_key}'")
1021        cached_prompt = self.prompt_cache.get(cache_key)
1022
1023        if cached_prompt is None or cache_ttl_seconds == 0:
1024            self.log.debug(
1025                f"Prompt '{cache_key}' not found in cache or caching disabled."
1026            )
1027            try:
1028                return self._fetch_prompt_and_update_cache(
1029                    name,
1030                    version=version,
1031                    label=label,
1032                    ttl_seconds=cache_ttl_seconds,
1033                    max_retries=bounded_max_retries,
1034                    fetch_timeout_seconds=fetch_timeout_seconds,
1035                )
1036            except Exception as e:
1037                if fallback:
1038                    self.log.warning(
1039                        f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}"
1040                    )
1041
1042                    fallback_client_args = {
1043                        "name": name,
1044                        "prompt": fallback,
1045                        "type": type,
1046                        "version": version or 0,
1047                        "config": {},
1048                        "labels": [label] if label else [],
1049                        "tags": [],
1050                    }
1051
1052                    if type == "text":
1053                        return TextPromptClient(
1054                            prompt=Prompt_Text(**fallback_client_args),
1055                            is_fallback=True,
1056                        )
1057
1058                    if type == "chat":
1059                        return ChatPromptClient(
1060                            prompt=Prompt_Chat(**fallback_client_args),
1061                            is_fallback=True,
1062                        )
1063
1064                raise e
1065
1066        if cached_prompt.is_expired():
1067            self.log.debug(f"Stale prompt '{cache_key}' found in cache.")
1068            try:
1069                # refresh prompt in background thread, refresh_prompt deduplicates tasks
1070                self.log.debug(f"Refreshing prompt '{cache_key}' in background.")
1071                self.prompt_cache.add_refresh_prompt_task(
1072                    cache_key,
1073                    lambda: self._fetch_prompt_and_update_cache(
1074                        name,
1075                        version=version,
1076                        label=label,
1077                        ttl_seconds=cache_ttl_seconds,
1078                        max_retries=bounded_max_retries,
1079                        fetch_timeout_seconds=fetch_timeout_seconds,
1080                    ),
1081                )
1082                self.log.debug(f"Returning stale prompt '{cache_key}' from cache.")
1083                # return stale prompt
1084                return cached_prompt.value
1085
1086            except Exception as e:
1087                self.log.warning(
1088                    f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}"
1089                )
1090                # creation of refresh prompt task failed, return stale prompt
1091                return cached_prompt.value
1092
1093        return cached_prompt.value
1094
1095    def _fetch_prompt_and_update_cache(
1096        self,
1097        name: str,
1098        *,
1099        version: Optional[int] = None,
1100        label: Optional[str] = None,
1101        ttl_seconds: Optional[int] = None,
1102        max_retries: int,
1103        fetch_timeout_seconds,
1104    ) -> PromptClient:
1105        try:
1106            cache_key = PromptCache.generate_cache_key(
1107                name, version=version, label=label
1108            )
1109
1110            self.log.debug(f"Fetching prompt '{cache_key}' from server...")
1111
1112            @backoff.on_exception(
1113                backoff.constant, Exception, max_tries=max_retries, logger=None
1114            )
1115            def fetch_prompts():
1116                return self.client.prompts.get(
1117                    self._url_encode(name),
1118                    version=version,
1119                    label=label,
1120                    request_options={
1121                        "timeout_in_seconds": fetch_timeout_seconds,
1122                    }
1123                    if fetch_timeout_seconds is not None
1124                    else None,
1125                )
1126
1127            prompt_response = fetch_prompts()
1128
1129            if prompt_response.type == "chat":
1130                prompt = ChatPromptClient(prompt_response)
1131            else:
1132                prompt = TextPromptClient(prompt_response)
1133
1134            self.prompt_cache.set(cache_key, prompt, ttl_seconds)
1135
1136            return prompt
1137
1138        except Exception as e:
1139            self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}")
1140            raise e
1141
1142    def _get_bounded_max_retries(
1143        self,
1144        max_retries: Optional[int],
1145        *,
1146        default_max_retries: int = 2,
1147        max_retries_upper_bound: int = 4,
1148    ) -> int:
1149        if max_retries is None:
1150            return default_max_retries
1151
1152        bounded_max_retries = min(
1153            max(max_retries, 0),
1154            max_retries_upper_bound,
1155        )
1156
1157        return bounded_max_retries
1158
1159    @overload
1160    def create_prompt(
1161        self,
1162        *,
1163        name: str,
1164        prompt: List[ChatMessageDict],
1165        is_active: Optional[bool] = None,  # deprecated
1166        labels: List[str] = [],
1167        tags: Optional[List[str]] = None,
1168        type: Optional[Literal["chat"]],
1169        config: Optional[Any] = None,
1170    ) -> ChatPromptClient: ...
1171
1172    @overload
1173    def create_prompt(
1174        self,
1175        *,
1176        name: str,
1177        prompt: str,
1178        is_active: Optional[bool] = None,  # deprecated
1179        labels: List[str] = [],
1180        tags: Optional[List[str]] = None,
1181        type: Optional[Literal["text"]] = "text",
1182        config: Optional[Any] = None,
1183    ) -> TextPromptClient: ...
1184
1185    def create_prompt(
1186        self,
1187        *,
1188        name: str,
1189        prompt: Union[str, List[ChatMessageDict]],
1190        is_active: Optional[bool] = None,  # deprecated
1191        labels: List[str] = [],
1192        tags: Optional[List[str]] = None,
1193        type: Optional[Literal["chat", "text"]] = "text",
1194        config: Optional[Any] = None,
1195    ) -> PromptClient:
1196        """Create a new prompt in Langfuse.
1197
1198        Keyword Args:
1199            name : The name of the prompt to be created.
1200            prompt : The content of the prompt to be created.
1201            is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
1202            labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
1203            tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
1204            config: Additional structured data to be saved with the prompt. Defaults to None.
1205            type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
1206
1207        Returns:
1208            TextPromptClient: The prompt if type argument is 'text'.
1209            ChatPromptClient: The prompt if type argument is 'chat'.
1210        """
1211        try:
1212            self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
1213
1214            # Handle deprecated is_active flag
1215            if is_active:
1216                self.log.warning(
1217                    "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
1218                )
1219
1220                labels = labels if "production" in labels else labels + ["production"]
1221
1222            if type == "chat":
1223                if not isinstance(prompt, list):
1224                    raise ValueError(
1225                        "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
1226                    )
1227                request = CreatePromptRequest_Chat(
1228                    name=name,
1229                    prompt=prompt,
1230                    labels=labels,
1231                    tags=tags,
1232                    config=config or {},
1233                    type="chat",
1234                )
1235                server_prompt = self.client.prompts.create(request=request)
1236
1237                return ChatPromptClient(prompt=server_prompt)
1238
1239            if not isinstance(prompt, str):
1240                raise ValueError("For 'text' type, 'prompt' must be a string.")
1241
1242            request = CreatePromptRequest_Text(
1243                name=name,
1244                prompt=prompt,
1245                labels=labels,
1246                tags=tags,
1247                config=config or {},
1248                type="text",
1249            )
1250
1251            server_prompt = self.client.prompts.create(request=request)
1252            return TextPromptClient(prompt=server_prompt)
1253
1254        except Exception as e:
1255            handle_fern_exception(e)
1256            raise e
1257
1258    def _url_encode(self, url: str) -> str:
1259        return urllib.parse.quote(url)
1260
1261    def trace(
1262        self,
1263        *,
1264        id: typing.Optional[str] = None,
1265        name: typing.Optional[str] = None,
1266        user_id: typing.Optional[str] = None,
1267        session_id: typing.Optional[str] = None,
1268        version: typing.Optional[str] = None,
1269        input: typing.Optional[typing.Any] = None,
1270        output: typing.Optional[typing.Any] = None,
1271        metadata: typing.Optional[typing.Any] = None,
1272        tags: typing.Optional[typing.List[str]] = None,
1273        timestamp: typing.Optional[dt.datetime] = None,
1274        public: typing.Optional[bool] = None,
1275        **kwargs,
1276    ) -> "StatefulTraceClient":
1277        """Create a trace.
1278
1279        Args:
1280            id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
1281            name: Identifier of the trace. Useful for sorting/filtering in the UI.
1282            input: The input of the trace. Can be any JSON object.
1283            output: The output of the trace. Can be any JSON object.
1284            metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
1285            user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
1286            session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
1287            version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
1288            release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
1289            tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
1290            timestamp: The timestamp of the trace. Defaults to the current time if not provided.
1291            public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
1292            **kwargs: Additional keyword arguments that can be included in the trace.
1293
1294        Returns:
1295            StatefulTraceClient: The created trace.
1296
1297        Example:
1298            ```python
1299            from langfuse import Langfuse
1300
1301            langfuse = Langfuse()
1302
1303            trace = langfuse.trace(
1304                name="example-application",
1305                user_id="user-1234")
1306            )
1307            ```
1308        """
1309        new_id = id or str(uuid.uuid4())
1310        self.trace_id = new_id
1311        try:
1312            new_dict = {
1313                "id": new_id,
1314                "name": name,
1315                "userId": user_id,
1316                "sessionId": session_id
1317                or kwargs.get("sessionId", None),  # backward compatibility
1318                "release": self.release,
1319                "version": version,
1320                "metadata": metadata,
1321                "input": input,
1322                "output": output,
1323                "tags": tags,
1324                "timestamp": timestamp or _get_timestamp(),
1325                "public": public,
1326            }
1327            if kwargs is not None:
1328                new_dict.update(kwargs)
1329
1330            new_body = TraceBody(**new_dict)
1331
1332            self.log.debug(f"Creating trace {new_body}")
1333            event = {
1334                "id": str(uuid.uuid4()),
1335                "type": "trace-create",
1336                "body": new_body.dict(exclude_none=True),
1337            }
1338
1339            self.task_manager.add_task(
1340                event,
1341            )
1342
1343        except Exception as e:
1344            self.log.exception(e)
1345        finally:
1346            self._log_memory_usage()
1347
1348            return StatefulTraceClient(
1349                self.client, new_id, StateType.TRACE, new_id, self.task_manager
1350            )
1351
1352    def _log_memory_usage(self):
1353        try:
1354            is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
1355            report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
1356            top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
1357
1358            if (
1359                not is_malloc_tracing_enabled
1360                or report_interval <= 0
1361                or round(time.monotonic()) % report_interval != 0
1362            ):
1363                return
1364
1365            snapshot = tracemalloc.take_snapshot().statistics("lineno")
1366
1367            total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
1368            memory_usage_total_items = [f"{stat}" for stat in snapshot]
1369            memory_usage_langfuse_items = [
1370                stat for stat in memory_usage_total_items if "/langfuse/" in stat
1371            ]
1372
1373            logged_memory_usage = {
1374                "all_files": [f"{stat}" for stat in memory_usage_total_items][
1375                    :top_k_items
1376                ],
1377                "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
1378                    :top_k_items
1379                ],
1380                "total_usage": f"{total_memory_usage:.2f} MB",
1381                "langfuse_queue_length": self.task_manager._queue.qsize(),
1382            }
1383
1384            self.log.debug<