langfuse.client
1from contextlib import contextmanager 2import datetime as dt 3import logging 4import os 5import typing 6import uuid 7import backoff 8import httpx 9from enum import Enum 10import time 11import tracemalloc 12from typing import ( 13 Any, 14 Dict, 15 Optional, 16 Literal, 17 Union, 18 List, 19 Sequence, 20 overload, 21) 22import urllib.parse 23import warnings 24from dataclasses import dataclass 25 26 27from langfuse.api.resources.commons.types.dataset_run_with_items import ( 28 DatasetRunWithItems, 29) 30from langfuse.api.resources.commons.types.observations_view import ObservationsView 31from langfuse.api.resources.commons.types.session import Session 32from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails 33from langfuse.api.resources.datasets.types.paginated_dataset_runs import ( 34 PaginatedDatasetRuns, 35) 36from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody 37from langfuse.api.resources.ingestion.types.create_generation_body import ( 38 CreateGenerationBody, 39) 40from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody 41from langfuse.api.resources.ingestion.types.score_body import ScoreBody 42from langfuse.api.resources.ingestion.types.trace_body import TraceBody 43from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody 44from langfuse.api.resources.ingestion.types.update_generation_body import ( 45 UpdateGenerationBody, 46) 47from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody 48from langfuse.api.resources.observations.types.observations_views import ( 49 ObservationsViews, 50) 51from langfuse.api.resources.prompts.types import ( 52 CreatePromptRequest_Chat, 53 CreatePromptRequest_Text, 54 Prompt_Text, 55 Prompt_Chat, 56) 57from langfuse.api.resources.trace.types.traces import Traces 58from langfuse.api.resources.utils.resources.pagination.types.meta_response import ( 59 MetaResponse, 60) 61from langfuse.model import ( 62 CreateDatasetItemRequest, 63 CreateDatasetRequest, 64 CreateDatasetRunItemRequest, 65 ChatMessageDict, 66 DatasetItem, 67 DatasetStatus, 68 ModelUsage, 69 PromptClient, 70 ChatPromptClient, 71 TextPromptClient, 72) 73from langfuse.parse_error import ( 74 handle_fern_exception, 75) 76from langfuse.prompt_cache import PromptCache 77 78try: 79 import pydantic.v1 as pydantic # type: ignore 80except ImportError: 81 import pydantic # type: ignore 82 83from langfuse.api.client import FernLangfuse 84from langfuse.environment import get_common_release_envs 85from langfuse.logging import clean_logger 86from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails 87from langfuse.request import LangfuseClient 88from langfuse.task_manager import TaskManager 89from langfuse.types import SpanLevel, ScoreDataType 90from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp 91 92from .version import __version__ as version 93 94 95@dataclass 96class FetchTracesResponse: 97 """Response object for fetch_traces method.""" 98 99 data: typing.List[TraceWithDetails] 100 meta: MetaResponse 101 102 103@dataclass 104class FetchTraceResponse: 105 """Response object for fetch_trace method.""" 106 107 data: TraceWithFullDetails 108 109 110@dataclass 111class FetchObservationsResponse: 112 """Response object for fetch_observations method.""" 113 114 data: typing.List[ObservationsView] 115 meta: MetaResponse 116 117 118@dataclass 119class FetchObservationResponse: 120 """Response object for fetch_observation method.""" 121 122 data: Observation 123 124 125@dataclass 126class FetchSessionsResponse: 127 """Response object for fetch_sessions method.""" 128 129 data: typing.List[Session] 130 meta: MetaResponse 131 132 133class Langfuse(object): 134 """Langfuse Python client. 135 136 Attributes: 137 log (logging.Logger): Logger for the Langfuse client. 138 base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction. 139 httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API. 140 client (FernLangfuse): Core interface for Langfuse API interaction. 141 task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks. 142 release (str): Identifies the release number or hash of the application. 143 prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances. 144 145 Example: 146 Initiating the Langfuse client should always be first step to use Langfuse. 147 ```python 148 import os 149 from langfuse import Langfuse 150 151 # Set the public and secret keys as environment variables 152 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 153 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 154 155 # Initialize the Langfuse client using the credentials 156 langfuse = Langfuse() 157 ``` 158 """ 159 160 log = logging.getLogger("langfuse") 161 """Logger for the Langfuse client.""" 162 163 host: str 164 """Host of Langfuse API.""" 165 166 def __init__( 167 self, 168 public_key: Optional[str] = None, 169 secret_key: Optional[str] = None, 170 host: Optional[str] = None, 171 release: Optional[str] = None, 172 debug: bool = False, 173 threads: Optional[int] = None, 174 flush_at: Optional[int] = None, 175 flush_interval: Optional[float] = None, 176 max_retries: Optional[int] = None, 177 timeout: Optional[int] = None, # seconds 178 sdk_integration: Optional[str] = "default", 179 httpx_client: Optional[httpx.Client] = None, 180 enabled: Optional[bool] = True, 181 sample_rate: Optional[float] = None, 182 ): 183 """Initialize the Langfuse client. 184 185 Args: 186 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 187 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 188 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 189 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 190 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 191 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 192 flush_at: Max batch size that's sent to the API. 193 flush_interval: Max delay until a new batch is sent to the API. 194 max_retries: Max number of retries in case of API/network errors. 195 timeout: Timeout of API requests in seconds. Defaults to 20 seconds. 196 httpx_client: Pass your own httpx client for more customizability of requests. 197 sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly. 198 enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops. 199 sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable. 200 201 Raises: 202 ValueError: If public_key or secret_key are not set and not found in environment variables. 203 204 Example: 205 Initiating the Langfuse client should always be first step to use Langfuse. 206 ```python 207 import os 208 from langfuse import Langfuse 209 210 # Set the public and secret keys as environment variables 211 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 212 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 213 214 # Initialize the Langfuse client using the credentials 215 langfuse = Langfuse() 216 ``` 217 """ 218 self.enabled = enabled 219 public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 220 secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 221 sample_rate = ( 222 sample_rate 223 if sample_rate 224 is not None # needs explicit None check, as 0 is a valid value 225 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 226 ) 227 228 if sample_rate is not None and ( 229 sample_rate > 1 or sample_rate < 0 230 ): # default value 1 will be set in the taskmanager 231 self.enabled = False 232 self.log.warning( 233 "Langfuse client is disabled since the sample rate provided is not between 0 and 1." 234 ) 235 236 threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1)) 237 flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15)) 238 flush_interval = flush_interval or float( 239 os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5) 240 ) 241 242 max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3)) 243 timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20)) 244 245 if not self.enabled: 246 self.log.warning( 247 "Langfuse client is disabled. No observability data will be sent." 248 ) 249 250 elif not public_key: 251 self.enabled = False 252 self.log.warning( 253 "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 254 ) 255 256 elif not secret_key: 257 self.enabled = False 258 self.log.warning( 259 "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 260 ) 261 262 set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True") 263 264 if set_debug is True: 265 # Ensures that debug level messages are logged when debug mode is on. 266 # Otherwise, defaults to WARNING level. 267 # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided 268 logging.basicConfig() 269 self.log.setLevel(logging.DEBUG) 270 271 clean_logger() 272 else: 273 self.log.setLevel(logging.WARNING) 274 clean_logger() 275 276 self.base_url = ( 277 host 278 if host 279 else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") 280 ) 281 282 self.httpx_client = httpx_client or httpx.Client(timeout=timeout) 283 284 self.client = FernLangfuse( 285 base_url=self.base_url, 286 username=public_key, 287 password=secret_key, 288 x_langfuse_sdk_name="python", 289 x_langfuse_sdk_version=version, 290 x_langfuse_public_key=public_key, 291 httpx_client=self.httpx_client, 292 ) 293 294 langfuse_client = LangfuseClient( 295 public_key=public_key, 296 secret_key=secret_key, 297 base_url=self.base_url, 298 version=version, 299 timeout=timeout, 300 session=self.httpx_client, 301 ) 302 303 args = { 304 "threads": threads, 305 "flush_at": flush_at, 306 "flush_interval": flush_interval, 307 "max_retries": max_retries, 308 "client": langfuse_client, 309 "public_key": public_key, 310 "sdk_name": "python", 311 "sdk_version": version, 312 "sdk_integration": sdk_integration, 313 "enabled": self.enabled, 314 "sample_rate": sample_rate, 315 } 316 317 self.task_manager = TaskManager(**args) 318 319 self.trace_id = None 320 321 self.release = self._get_release_value(release) 322 323 self.prompt_cache = PromptCache() 324 325 def _get_release_value(self, release: Optional[str] = None) -> Optional[str]: 326 if release: 327 return release 328 elif "LANGFUSE_RELEASE" in os.environ: 329 return os.environ["LANGFUSE_RELEASE"] 330 else: 331 return get_common_release_envs() 332 333 def get_trace_id(self) -> str: 334 """Get the current trace id.""" 335 return self.trace_id 336 337 def get_trace_url(self) -> str: 338 """Get the URL of the current trace to view it in the Langfuse UI.""" 339 return f"{self.base_url}/trace/{self.trace_id}" 340 341 def get_dataset( 342 self, name: str, *, fetch_items_page_size: Optional[int] = 50 343 ) -> "DatasetClient": 344 """Fetch a dataset by its name. 345 346 Args: 347 name (str): The name of the dataset to fetch. 348 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 349 350 Returns: 351 DatasetClient: The dataset with the given name. 352 """ 353 try: 354 self.log.debug(f"Getting datasets {name}") 355 dataset = self.client.datasets.get(dataset_name=name) 356 357 dataset_items = [] 358 page = 1 359 while True: 360 new_items = self.client.dataset_items.list( 361 dataset_name=name, page=page, limit=fetch_items_page_size 362 ) 363 dataset_items.extend(new_items.data) 364 if new_items.meta.total_pages <= page: 365 break 366 page += 1 367 368 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 369 370 return DatasetClient(dataset, items=items) 371 except Exception as e: 372 handle_fern_exception(e) 373 raise e 374 375 def get_dataset_item(self, id: str) -> "DatasetItemClient": 376 """Get the dataset item with the given id.""" 377 try: 378 self.log.debug(f"Getting dataset item {id}") 379 dataset_item = self.client.dataset_items.get(id=id) 380 return DatasetItemClient(dataset_item, langfuse=self) 381 except Exception as e: 382 handle_fern_exception(e) 383 raise e 384 385 def auth_check(self) -> bool: 386 """Check if the provided credentials (public and secret key) are valid. 387 388 Raises: 389 Exception: If no projects were found for the provided credentials. 390 391 Note: 392 This method is blocking. It is discouraged to use it in production code. 393 """ 394 try: 395 projects = self.client.projects.get() 396 self.log.debug( 397 f"Auth check successful, found {len(projects.data)} projects" 398 ) 399 if len(projects.data) == 0: 400 raise Exception( 401 "Auth check failed, no project found for the keys provided." 402 ) 403 return True 404 405 except Exception as e: 406 handle_fern_exception(e) 407 raise e 408 409 def get_dataset_runs( 410 self, 411 dataset_name: str, 412 *, 413 page: typing.Optional[int] = None, 414 limit: typing.Optional[int] = None, 415 ) -> PaginatedDatasetRuns: 416 """Get all dataset runs. 417 418 Args: 419 dataset_name (str): Name of the dataset. 420 page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None. 421 limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50. 422 423 Returns: 424 PaginatedDatasetRuns: The dataset runs. 425 """ 426 try: 427 self.log.debug("Getting dataset runs") 428 return self.client.datasets.get_runs( 429 dataset_name=dataset_name, page=page, limit=limit 430 ) 431 except Exception as e: 432 handle_fern_exception(e) 433 raise e 434 435 def get_dataset_run( 436 self, 437 dataset_name: str, 438 dataset_run_name: str, 439 ) -> DatasetRunWithItems: 440 """Get a dataset run. 441 442 Args: 443 dataset_name: Name of the dataset. 444 dataset_run_name: Name of the dataset run. 445 446 Returns: 447 DatasetRunWithItems: The dataset run. 448 """ 449 try: 450 self.log.debug( 451 f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}" 452 ) 453 return self.client.datasets.get_run( 454 dataset_name=dataset_name, run_name=dataset_run_name 455 ) 456 except Exception as e: 457 handle_fern_exception(e) 458 raise e 459 460 def create_dataset( 461 self, 462 name: str, 463 description: Optional[str] = None, 464 metadata: Optional[Any] = None, 465 ) -> Dataset: 466 """Create a dataset with the given name on Langfuse. 467 468 Args: 469 name: Name of the dataset to create. 470 description: Description of the dataset. Defaults to None. 471 metadata: Additional metadata. Defaults to None. 472 473 Returns: 474 Dataset: The created dataset as returned by the Langfuse API. 475 """ 476 try: 477 body = CreateDatasetRequest( 478 name=name, description=description, metadata=metadata 479 ) 480 self.log.debug(f"Creating datasets {body}") 481 return self.client.datasets.create(request=body) 482 except Exception as e: 483 handle_fern_exception(e) 484 raise e 485 486 def create_dataset_item( 487 self, 488 dataset_name: str, 489 input: Optional[Any] = None, 490 expected_output: Optional[Any] = None, 491 metadata: Optional[Any] = None, 492 source_trace_id: Optional[str] = None, 493 source_observation_id: Optional[str] = None, 494 status: Optional[DatasetStatus] = None, 495 id: Optional[str] = None, 496 ) -> DatasetItem: 497 """Create a dataset item. 498 499 Upserts if an item with id already exists. 500 501 Args: 502 dataset_name: Name of the dataset in which the dataset item should be created. 503 input: Input data. Defaults to None. Can contain any dict, list or scalar. 504 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 505 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 506 source_trace_id: Id of the source trace. Defaults to None. 507 source_observation_id: Id of the source observation. Defaults to None. 508 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 509 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 510 511 Returns: 512 DatasetItem: The created dataset item as returned by the Langfuse API. 513 514 Example: 515 ```python 516 from langfuse import Langfuse 517 518 langfuse = Langfuse() 519 520 # Uploading items to the Langfuse dataset named "capital_cities" 521 langfuse.create_dataset_item( 522 dataset_name="capital_cities", 523 input={"input": {"country": "Italy"}}, 524 expected_output={"expected_output": "Rome"}, 525 metadata={"foo": "bar"} 526 ) 527 ``` 528 """ 529 try: 530 body = CreateDatasetItemRequest( 531 datasetName=dataset_name, 532 input=input, 533 expectedOutput=expected_output, 534 metadata=metadata, 535 sourceTraceId=source_trace_id, 536 sourceObservationId=source_observation_id, 537 status=status, 538 id=id, 539 ) 540 self.log.debug(f"Creating dataset item {body}") 541 return self.client.dataset_items.create(request=body) 542 except Exception as e: 543 handle_fern_exception(e) 544 raise e 545 546 def fetch_trace( 547 self, 548 id: str, 549 ) -> FetchTraceResponse: 550 """Fetch a trace via the Langfuse API by its id. 551 552 Args: 553 id: The id of the trace to fetch. 554 555 Returns: 556 FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`. 557 558 Raises: 559 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 560 """ 561 try: 562 self.log.debug(f"Getting trace {id}") 563 trace = self.client.trace.get(id) 564 return FetchTraceResponse(data=trace) 565 except Exception as e: 566 handle_fern_exception(e) 567 raise e 568 569 def get_trace( 570 self, 571 id: str, 572 ) -> TraceWithFullDetails: 573 """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead. 574 575 Args: 576 id: The id of the trace to fetch. 577 578 Returns: 579 TraceWithFullDetails: The trace with full details as returned by the Langfuse API. 580 581 Raises: 582 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 583 """ 584 warnings.warn( 585 "get_trace is deprecated, use fetch_trace instead.", 586 DeprecationWarning, 587 ) 588 589 try: 590 self.log.debug(f"Getting trace {id}") 591 return self.client.trace.get(id) 592 except Exception as e: 593 handle_fern_exception(e) 594 raise e 595 596 def fetch_traces( 597 self, 598 *, 599 page: Optional[int] = None, 600 limit: Optional[int] = None, 601 user_id: Optional[str] = None, 602 name: Optional[str] = None, 603 session_id: Optional[str] = None, 604 from_timestamp: Optional[dt.datetime] = None, 605 to_timestamp: Optional[dt.datetime] = None, 606 order_by: Optional[str] = None, 607 tags: Optional[Union[str, Sequence[str]]] = None, 608 ) -> FetchTracesResponse: 609 """Fetch a list of traces in the current project matching the given parameters. 610 611 Args: 612 page (Optional[int]): Page number, starts at 1. Defaults to None. 613 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 614 name (Optional[str]): Filter by name of traces. Defaults to None. 615 user_id (Optional[str]): Filter by user_id. Defaults to None. 616 session_id (Optional[str]): Filter by session_id. Defaults to None. 617 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 618 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 619 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 620 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 621 622 Returns: 623 FetchTracesResponse, list of traces on `data` and metadata on `meta`. 624 625 Raises: 626 Exception: If an error occurred during the request. 627 """ 628 try: 629 self.log.debug( 630 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 631 ) 632 res = self.client.trace.list( 633 page=page, 634 limit=limit, 635 name=name, 636 user_id=user_id, 637 session_id=session_id, 638 from_timestamp=from_timestamp, 639 to_timestamp=to_timestamp, 640 order_by=order_by, 641 tags=tags, 642 ) 643 return FetchTracesResponse(data=res.data, meta=res.meta) 644 except Exception as e: 645 handle_fern_exception(e) 646 raise e 647 648 def get_traces( 649 self, 650 *, 651 page: Optional[int] = None, 652 limit: Optional[int] = None, 653 user_id: Optional[str] = None, 654 name: Optional[str] = None, 655 session_id: Optional[str] = None, 656 from_timestamp: Optional[dt.datetime] = None, 657 to_timestamp: Optional[dt.datetime] = None, 658 order_by: Optional[str] = None, 659 tags: Optional[Union[str, Sequence[str]]] = None, 660 ) -> Traces: 661 """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead. 662 663 Args: 664 page (Optional[int]): Page number, starts at 1. Defaults to None. 665 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 666 name (Optional[str]): Filter by name of traces. Defaults to None. 667 user_id (Optional[str]): Filter by user_id. Defaults to None. 668 session_id (Optional[str]): Filter by session_id. Defaults to None. 669 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 670 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 671 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 672 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 673 674 Returns: 675 List of Traces 676 677 Raises: 678 Exception: If an error occurred during the request. 679 """ 680 warnings.warn( 681 "get_traces is deprecated, use fetch_traces instead.", 682 DeprecationWarning, 683 ) 684 try: 685 self.log.debug( 686 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 687 ) 688 return self.client.trace.list( 689 page=page, 690 limit=limit, 691 name=name, 692 user_id=user_id, 693 session_id=session_id, 694 from_timestamp=from_timestamp, 695 to_timestamp=to_timestamp, 696 order_by=order_by, 697 tags=tags, 698 ) 699 except Exception as e: 700 handle_fern_exception(e) 701 raise e 702 703 def fetch_observations( 704 self, 705 *, 706 page: typing.Optional[int] = None, 707 limit: typing.Optional[int] = None, 708 name: typing.Optional[str] = None, 709 user_id: typing.Optional[str] = None, 710 trace_id: typing.Optional[str] = None, 711 parent_observation_id: typing.Optional[str] = None, 712 from_start_time: typing.Optional[dt.datetime] = None, 713 to_start_time: typing.Optional[dt.datetime] = None, 714 type: typing.Optional[str] = None, 715 ) -> FetchObservationsResponse: 716 """Get a list of observations in the current project matching the given parameters. 717 718 Args: 719 page (Optional[int]): Page number of the observations to return. Defaults to None. 720 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 721 name (Optional[str]): Name of the observations to return. Defaults to None. 722 user_id (Optional[str]): User identifier. Defaults to None. 723 trace_id (Optional[str]): Trace identifier. Defaults to None. 724 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 725 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 726 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 727 type (Optional[str]): Type of the observation. Defaults to None. 728 729 Returns: 730 FetchObservationsResponse, list of observations on `data` and metadata on `meta`. 731 732 Raises: 733 Exception: If an error occurred during the request. 734 """ 735 try: 736 self.log.debug( 737 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 738 ) 739 res = self.client.observations.get_many( 740 page=page, 741 limit=limit, 742 name=name, 743 user_id=user_id, 744 trace_id=trace_id, 745 parent_observation_id=parent_observation_id, 746 from_start_time=from_start_time, 747 to_start_time=to_start_time, 748 type=type, 749 ) 750 return FetchObservationsResponse(data=res.data, meta=res.meta) 751 except Exception as e: 752 self.log.exception(e) 753 raise e 754 755 def get_observations( 756 self, 757 *, 758 page: typing.Optional[int] = None, 759 limit: typing.Optional[int] = None, 760 name: typing.Optional[str] = None, 761 user_id: typing.Optional[str] = None, 762 trace_id: typing.Optional[str] = None, 763 parent_observation_id: typing.Optional[str] = None, 764 from_start_time: typing.Optional[dt.datetime] = None, 765 to_start_time: typing.Optional[dt.datetime] = None, 766 type: typing.Optional[str] = None, 767 ) -> ObservationsViews: 768 """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead. 769 770 Args: 771 page (Optional[int]): Page number of the observations to return. Defaults to None. 772 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 773 name (Optional[str]): Name of the observations to return. Defaults to None. 774 user_id (Optional[str]): User identifier. Defaults to None. 775 trace_id (Optional[str]): Trace identifier. Defaults to None. 776 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 777 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 778 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 779 type (Optional[str]): Type of the observation. Defaults to None. 780 781 Returns: 782 List of ObservationsViews: List of observations in the project matching the given parameters. 783 784 Raises: 785 Exception: If an error occurred during the request. 786 """ 787 warnings.warn( 788 "get_observations is deprecated, use fetch_observations instead.", 789 DeprecationWarning, 790 ) 791 try: 792 self.log.debug( 793 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 794 ) 795 return self.client.observations.get_many( 796 page=page, 797 limit=limit, 798 name=name, 799 user_id=user_id, 800 trace_id=trace_id, 801 parent_observation_id=parent_observation_id, 802 from_start_time=from_start_time, 803 to_start_time=to_start_time, 804 type=type, 805 ) 806 except Exception as e: 807 handle_fern_exception(e) 808 raise e 809 810 def get_generations( 811 self, 812 *, 813 page: typing.Optional[int] = None, 814 limit: typing.Optional[int] = None, 815 name: typing.Optional[str] = None, 816 user_id: typing.Optional[str] = None, 817 trace_id: typing.Optional[str] = None, 818 from_start_time: typing.Optional[dt.datetime] = None, 819 to_start_time: typing.Optional[dt.datetime] = None, 820 parent_observation_id: typing.Optional[str] = None, 821 ) -> ObservationsViews: 822 """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead. 823 824 Args: 825 page (Optional[int]): Page number of the generations to return. Defaults to None. 826 limit (Optional[int]): Maximum number of generations to return. Defaults to None. 827 name (Optional[str]): Name of the generations to return. Defaults to None. 828 user_id (Optional[str]): User identifier of the generations to return. Defaults to None. 829 trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None. 830 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 831 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 832 parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None. 833 834 Returns: 835 List of ObservationsViews: List of generations in the project matching the given parameters. 836 837 Raises: 838 Exception: If an error occurred during the request. 839 """ 840 warnings.warn( 841 "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.", 842 DeprecationWarning, 843 ) 844 return self.get_observations( 845 page=page, 846 limit=limit, 847 name=name, 848 user_id=user_id, 849 trace_id=trace_id, 850 parent_observation_id=parent_observation_id, 851 from_start_time=from_start_time, 852 to_start_time=to_start_time, 853 type="GENERATION", 854 ) 855 856 def fetch_observation( 857 self, 858 id: str, 859 ) -> FetchObservationResponse: 860 """Get an observation in the current project with the given identifier. 861 862 Args: 863 id: The identifier of the observation to fetch. 864 865 Returns: 866 FetchObservationResponse: The observation with the given id on `data`. 867 868 Raises: 869 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 870 """ 871 try: 872 self.log.debug(f"Getting observation {id}") 873 observation = self.client.observations.get(id) 874 return FetchObservationResponse(data=observation) 875 except Exception as e: 876 handle_fern_exception(e) 877 raise e 878 879 def get_observation( 880 self, 881 id: str, 882 ) -> Observation: 883 """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead. 884 885 Args: 886 id: The identifier of the observation to fetch. 887 888 Raises: 889 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 890 """ 891 warnings.warn( 892 "get_observation is deprecated, use fetch_observation instead.", 893 DeprecationWarning, 894 ) 895 try: 896 self.log.debug(f"Getting observation {id}") 897 return self.client.observations.get(id) 898 except Exception as e: 899 handle_fern_exception(e) 900 raise e 901 902 def fetch_sessions( 903 self, 904 *, 905 page: typing.Optional[int] = None, 906 limit: typing.Optional[int] = None, 907 from_timestamp: typing.Optional[dt.datetime] = None, 908 to_timestamp: typing.Optional[dt.datetime] = None, 909 ) -> FetchSessionsResponse: 910 """Get a list of sessions in the current project. 911 912 Args: 913 page (Optional[int]): Page number of the sessions to return. Defaults to None. 914 limit (Optional[int]): Maximum number of sessions to return. Defaults to None. 915 from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None. 916 to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None. 917 918 Returns: 919 FetchSessionsResponse, list of sessions on `data` and metadata on `meta`. 920 921 Raises: 922 Exception: If an error occurred during the request. 923 """ 924 try: 925 self.log.debug( 926 f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}" 927 ) 928 res = self.client.sessions.list( 929 page=page, 930 limit=limit, 931 from_timestamp=from_timestamp, 932 to_timestamp=to_timestamp, 933 ) 934 return FetchSessionsResponse(data=res.data, meta=res.meta) 935 except Exception as e: 936 handle_fern_exception(e) 937 raise e 938 939 @overload 940 def get_prompt( 941 self, 942 name: str, 943 version: Optional[int] = None, 944 *, 945 label: Optional[str] = None, 946 type: Literal["chat"], 947 cache_ttl_seconds: Optional[int] = None, 948 fallback: Optional[List[ChatMessageDict]] = None, 949 max_retries: Optional[int] = None, 950 fetch_timeout_seconds: Optional[int] = None, 951 ) -> ChatPromptClient: ... 952 953 @overload 954 def get_prompt( 955 self, 956 name: str, 957 version: Optional[int] = None, 958 *, 959 label: Optional[str] = None, 960 type: Literal["text"] = "text", 961 cache_ttl_seconds: Optional[int] = None, 962 fallback: Optional[str] = None, 963 max_retries: Optional[int] = None, 964 fetch_timeout_seconds: Optional[int] = None, 965 ) -> TextPromptClient: ... 966 967 def get_prompt( 968 self, 969 name: str, 970 version: Optional[int] = None, 971 *, 972 label: Optional[str] = None, 973 type: Literal["chat", "text"] = "text", 974 cache_ttl_seconds: Optional[int] = None, 975 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 976 max_retries: Optional[int] = None, 977 fetch_timeout_seconds: Optional[int] = None, 978 ) -> PromptClient: 979 """Get a prompt. 980 981 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 982 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 983 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 984 return the expired prompt as a fallback. 985 986 Args: 987 name (str): The name of the prompt to retrieve. 988 989 Keyword Args: 990 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 991 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 992 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 993 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 994 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 995 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 996 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 997 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default. 998 999 Returns: 1000 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 1001 - TextPromptClient, if type argument is 'text'. 1002 - ChatPromptClient, if type argument is 'chat'. 1003 1004 Raises: 1005 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 1006 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 1007 """ 1008 if version is not None and label is not None: 1009 raise ValueError("Cannot specify both version and label at the same time.") 1010 1011 if not name: 1012 raise ValueError("Prompt name cannot be empty.") 1013 1014 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 1015 bounded_max_retries = self._get_bounded_max_retries( 1016 max_retries, default_max_retries=2, max_retries_upper_bound=4 1017 ) 1018 1019 self.log.debug(f"Getting prompt '{cache_key}'") 1020 cached_prompt = self.prompt_cache.get(cache_key) 1021 1022 if cached_prompt is None or cache_ttl_seconds == 0: 1023 self.log.debug( 1024 f"Prompt '{cache_key}' not found in cache or caching disabled." 1025 ) 1026 try: 1027 return self._fetch_prompt_and_update_cache( 1028 name, 1029 version=version, 1030 label=label, 1031 ttl_seconds=cache_ttl_seconds, 1032 max_retries=bounded_max_retries, 1033 fetch_timeout_seconds=fetch_timeout_seconds, 1034 ) 1035 except Exception as e: 1036 if fallback: 1037 self.log.warning( 1038 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 1039 ) 1040 1041 fallback_client_args = { 1042 "name": name, 1043 "prompt": fallback, 1044 "type": type, 1045 "version": version or 0, 1046 "config": {}, 1047 "labels": [label] if label else [], 1048 "tags": [], 1049 } 1050 1051 if type == "text": 1052 return TextPromptClient( 1053 prompt=Prompt_Text(**fallback_client_args), 1054 is_fallback=True, 1055 ) 1056 1057 if type == "chat": 1058 return ChatPromptClient( 1059 prompt=Prompt_Chat(**fallback_client_args), 1060 is_fallback=True, 1061 ) 1062 1063 raise e 1064 1065 if cached_prompt.is_expired(): 1066 self.log.debug(f"Stale prompt '{cache_key}' found in cache.") 1067 try: 1068 # refresh prompt in background thread, refresh_prompt deduplicates tasks 1069 self.log.debug(f"Refreshing prompt '{cache_key}' in background.") 1070 self.prompt_cache.add_refresh_prompt_task( 1071 cache_key, 1072 lambda: self._fetch_prompt_and_update_cache( 1073 name, 1074 version=version, 1075 label=label, 1076 ttl_seconds=cache_ttl_seconds, 1077 max_retries=bounded_max_retries, 1078 fetch_timeout_seconds=fetch_timeout_seconds, 1079 ), 1080 ) 1081 self.log.debug(f"Returning stale prompt '{cache_key}' from cache.") 1082 # return stale prompt 1083 return cached_prompt.value 1084 1085 except Exception as e: 1086 self.log.warning( 1087 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 1088 ) 1089 # creation of refresh prompt task failed, return stale prompt 1090 return cached_prompt.value 1091 1092 return cached_prompt.value 1093 1094 def _fetch_prompt_and_update_cache( 1095 self, 1096 name: str, 1097 *, 1098 version: Optional[int] = None, 1099 label: Optional[str] = None, 1100 ttl_seconds: Optional[int] = None, 1101 max_retries: int, 1102 fetch_timeout_seconds, 1103 ) -> PromptClient: 1104 try: 1105 cache_key = PromptCache.generate_cache_key( 1106 name, version=version, label=label 1107 ) 1108 1109 self.log.debug(f"Fetching prompt '{cache_key}' from server...") 1110 1111 @backoff.on_exception( 1112 backoff.constant, Exception, max_tries=max_retries, logger=None 1113 ) 1114 def fetch_prompts(): 1115 return self.client.prompts.get( 1116 self._url_encode(name), 1117 version=version, 1118 label=label, 1119 request_options={ 1120 "timeout_in_seconds": fetch_timeout_seconds, 1121 } 1122 if fetch_timeout_seconds is not None 1123 else None, 1124 ) 1125 1126 prompt_response = fetch_prompts() 1127 1128 if prompt_response.type == "chat": 1129 prompt = ChatPromptClient(prompt_response) 1130 else: 1131 prompt = TextPromptClient(prompt_response) 1132 1133 self.prompt_cache.set(cache_key, prompt, ttl_seconds) 1134 1135 return prompt 1136 1137 except Exception as e: 1138 self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}") 1139 raise e 1140 1141 def _get_bounded_max_retries( 1142 self, 1143 max_retries: Optional[int], 1144 *, 1145 default_max_retries: int = 2, 1146 max_retries_upper_bound: int = 4, 1147 ) -> int: 1148 if max_retries is None: 1149 return default_max_retries 1150 1151 bounded_max_retries = min( 1152 max(max_retries, 0), 1153 max_retries_upper_bound, 1154 ) 1155 1156 return bounded_max_retries 1157 1158 @overload 1159 def create_prompt( 1160 self, 1161 *, 1162 name: str, 1163 prompt: List[ChatMessageDict], 1164 is_active: Optional[bool] = None, # deprecated 1165 labels: List[str] = [], 1166 tags: Optional[List[str]] = None, 1167 type: Optional[Literal["chat"]], 1168 config: Optional[Any] = None, 1169 ) -> ChatPromptClient: ... 1170 1171 @overload 1172 def create_prompt( 1173 self, 1174 *, 1175 name: str, 1176 prompt: str, 1177 is_active: Optional[bool] = None, # deprecated 1178 labels: List[str] = [], 1179 tags: Optional[List[str]] = None, 1180 type: Optional[Literal["text"]] = "text", 1181 config: Optional[Any] = None, 1182 ) -> TextPromptClient: ... 1183 1184 def create_prompt( 1185 self, 1186 *, 1187 name: str, 1188 prompt: Union[str, List[ChatMessageDict]], 1189 is_active: Optional[bool] = None, # deprecated 1190 labels: List[str] = [], 1191 tags: Optional[List[str]] = None, 1192 type: Optional[Literal["chat", "text"]] = "text", 1193 config: Optional[Any] = None, 1194 ) -> PromptClient: 1195 """Create a new prompt in Langfuse. 1196 1197 Keyword Args: 1198 name : The name of the prompt to be created. 1199 prompt : The content of the prompt to be created. 1200 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 1201 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 1202 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 1203 config: Additional structured data to be saved with the prompt. Defaults to None. 1204 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 1205 1206 Returns: 1207 TextPromptClient: The prompt if type argument is 'text'. 1208 ChatPromptClient: The prompt if type argument is 'chat'. 1209 """ 1210 try: 1211 self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}") 1212 1213 # Handle deprecated is_active flag 1214 if is_active: 1215 self.log.warning( 1216 "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead." 1217 ) 1218 1219 labels = labels if "production" in labels else labels + ["production"] 1220 1221 if type == "chat": 1222 if not isinstance(prompt, list): 1223 raise ValueError( 1224 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 1225 ) 1226 request = CreatePromptRequest_Chat( 1227 name=name, 1228 prompt=prompt, 1229 labels=labels, 1230 tags=tags, 1231 config=config or {}, 1232 type="chat", 1233 ) 1234 server_prompt = self.client.prompts.create(request=request) 1235 1236 return ChatPromptClient(prompt=server_prompt) 1237 1238 if not isinstance(prompt, str): 1239 raise ValueError("For 'text' type, 'prompt' must be a string.") 1240 1241 request = CreatePromptRequest_Text( 1242 name=name, 1243 prompt=prompt, 1244 labels=labels, 1245 tags=tags, 1246 config=config or {}, 1247 type="text", 1248 ) 1249 1250 server_prompt = self.client.prompts.create(request=request) 1251 return TextPromptClient(prompt=server_prompt) 1252 1253 except Exception as e: 1254 handle_fern_exception(e) 1255 raise e 1256 1257 def _url_encode(self, url: str) -> str: 1258 return urllib.parse.quote(url) 1259 1260 def trace( 1261 self, 1262 *, 1263 id: typing.Optional[str] = None, 1264 name: typing.Optional[str] = None, 1265 user_id: typing.Optional[str] = None, 1266 session_id: typing.Optional[str] = None, 1267 version: typing.Optional[str] = None, 1268 input: typing.Optional[typing.Any] = None, 1269 output: typing.Optional[typing.Any] = None, 1270 metadata: typing.Optional[typing.Any] = None, 1271 tags: typing.Optional[typing.List[str]] = None, 1272 timestamp: typing.Optional[dt.datetime] = None, 1273 public: typing.Optional[bool] = None, 1274 **kwargs, 1275 ) -> "StatefulTraceClient": 1276 """Create a trace. 1277 1278 Args: 1279 id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id. 1280 name: Identifier of the trace. Useful for sorting/filtering in the UI. 1281 input: The input of the trace. Can be any JSON object. 1282 output: The output of the trace. Can be any JSON object. 1283 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 1284 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 1285 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 1286 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 1287 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 1288 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 1289 timestamp: The timestamp of the trace. Defaults to the current time if not provided. 1290 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 1291 **kwargs: Additional keyword arguments that can be included in the trace. 1292 1293 Returns: 1294 StatefulTraceClient: The created trace. 1295 1296 Example: 1297 ```python 1298 from langfuse import Langfuse 1299 1300 langfuse = Langfuse() 1301 1302 trace = langfuse.trace( 1303 name="example-application", 1304 user_id="user-1234") 1305 ) 1306 ``` 1307 """ 1308 new_id = id or str(uuid.uuid4()) 1309 self.trace_id = new_id 1310 try: 1311 new_dict = { 1312 "id": new_id, 1313 "name": name, 1314 "userId": user_id, 1315 "sessionId": session_id 1316 or kwargs.get("sessionId", None), # backward compatibility 1317 "release": self.release, 1318 "version": version, 1319 "metadata": metadata, 1320 "input": input, 1321 "output": output, 1322 "tags": tags, 1323 "timestamp": timestamp or _get_timestamp(), 1324 "public": public, 1325 } 1326 if kwargs is not None: 1327 new_dict.update(kwargs) 1328 1329 new_body = TraceBody(**new_dict) 1330 1331 self.log.debug(f"Creating trace {new_body}") 1332 event = { 1333 "id": str(uuid.uuid4()), 1334 "type": "trace-create", 1335 "body": new_body.dict(exclude_none=True), 1336 } 1337 1338 self.task_manager.add_task( 1339 event, 1340 ) 1341 1342 except Exception as e: 1343 self.log.exception(e) 1344 finally: 1345 self._log_memory_usage() 1346 1347 return StatefulTraceClient( 1348 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1349 ) 1350 1351 def _log_memory_usage(self): 1352 try: 1353 is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0))) 1354 report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0)) 1355 top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10)) 1356 1357 if ( 1358 not is_malloc_tracing_enabled 1359 or report_interval <= 0 1360 or round(time.monotonic()) % report_interval != 0 1361 ): 1362 return 1363 1364 snapshot = tracemalloc.take_snapshot().statistics("lineno") 1365 1366 total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024 1367 memory_usage_total_items = [f"{stat}" for stat in snapshot] 1368 memory_usage_langfuse_items = [ 1369 stat for stat in memory_usage_total_items if "/langfuse/" in stat 1370 ] 1371 1372 logged_memory_usage = { 1373 "all_files": [f"{stat}" for stat in memory_usage_total_items][ 1374 :top_k_items 1375 ], 1376 "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][ 1377 :top_k_items 1378 ], 1379 "total_usage": f"{total_memory_usage:.2f} MB", 1380 "langfuse_queue_length": self.task_manager._queue.qsize(), 1381 } 1382 1383 self.log.debug("Memory usage: ", logged_memory_usage) 1384 1385 event = SdkLogBody(log=logged_memory_usage) 1386 self.task_manager.add_task( 1387 { 1388 "id": str(uuid.uuid4()), 1389 "type": "sdk-log", 1390 "timestamp": _get_timestamp(), 1391 "body": event.dict(), 1392 } 1393 ) 1394 1395 except Exception as e: 1396 self.log.exception(e) 1397 1398 @overload 1399 def score( 1400 self, 1401 *, 1402 name: str, 1403 value: float, 1404 data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 1405 trace_id: typing.Optional[str] = None, 1406 id: typing.Optional[str] = None, 1407 comment: typing.Optional[str] = None, 1408 observation_id: typing.Optional[str] = None, 1409 config_id: typing.Optional[str] = None, 1410 **kwargs, 1411 ) -> "StatefulClient": ... 1412 1413 @overload 1414 def score( 1415 self, 1416 *, 1417 name: str, 1418 value: str, 1419 data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 1420 trace_id: typing.Optional[str] = None, 1421 id: typing.Optional[str] = None, 1422 comment: typing.Optional[str] = None, 1423 observation_id: typing.Optional[str] = None, 1424 config_id: typing.Optional[str] = None, 1425 **kwargs, 1426 ) -> "StatefulClient": ... 1427 1428 def score( 1429 self, 1430 *, 1431 name: str, 1432 value: typing.Union[float, str], 1433 data_type: typing.Optional[ScoreDataType] = None, 1434 trace_id: typing.Optional[str] = None, 1435 id: typing.Optional[str] = None, 1436 comment: typing.Optional[str] = None, 1437 observation_id: typing.Optional[str] = None, 1438 config_id: typing.Optional[str] = None, 1439 **kwargs, 1440 ) -> "StatefulClient": 1441 """Create a score attached to a trace (and optionally an observation). 1442 1443 Args: 1444 name (str): Identifier of the score. 1445 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. 1446 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 1447 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 1448 trace_id (str): The id of the trace to which the score should be attached. 1449 id (Optional[str]): The id of the score. If not provided, a new UUID is generated. 1450 comment (Optional[str]): Additional context/explanation of the score. 1451 observation_id (Optional[str]): The id of the observation to which the score should be attached. 1452 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 1453 **kwargs: Additional keyword arguments to include in the score. 1454 1455 Returns: 1456 StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided). 1457 1458 Example: 1459 ```python 1460 from langfuse import Langfuse 1461 1462 langfuse = Langfuse() 1463 1464 # Create a trace 1465 trace = langfuse.trace(name="example-application") 1466 1467 # Get id of created trace 1468 trace_id = trace.id 1469 1470 # Add score to the trace 1471 trace = langfuse.score( 1472 trace_id=trace_id, 1473 name="user-explicit-feedback", 1474 value=0.9, 1475 comment="I like how personalized the response is" 1476 ) 1477 ``` 1478 """ 1479 trace_id = trace_id or self.trace_id or str(uuid.uuid4()) 1480 new_id = id or str(uuid.uuid4()) 1481 try: 1482 new_dict = { 1483 "id": new_id, 1484 "trace_id": trace_id, 1485 "observation_id": observation_id, 1486 "name": name, 1487 "value": value, 1488 "data_type": data_type, 1489 "comment": comment, 1490 "config_id": config_id, 1491 **kwargs, 1492 } 1493 1494 self.log.debug(f"Creating score {new_dict}...") 1495 new_body = ScoreBody(**new_dict) 1496 1497 event = { 1498 "id": str(uuid.uuid4()), 1499 "type": "score-create", 1500 "body": new_body.dict(exclude_none=True), 1501 } 1502 self.task_manager.add_task(event) 1503 1504 except Exception as e: 1505 self.log.exception(e) 1506 finally: 1507 if observation_id is not None: 1508 return StatefulClient( 1509 self.client, 1510 observation_id, 1511 StateType.OBSERVATION, 1512 trace_id, 1513 self.task_manager, 1514 ) 1515 else: 1516 return StatefulClient( 1517 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1518 ) 1519 1520 def span( 1521 self, 1522 *, 1523 id: typing.Optional[str] = None, 1524 trace_id: typing.Optional[str] = None, 1525 parent_observation_id: typing.Optional[str] = None, 1526 name: typing.Optional[str] = None, 1527 start_time: typing.Optional[dt.datetime] = None, 1528 end_time: typing.Optional[dt.datetime] = None, 1529 metadata: typing.Optional[typing.Any] = None, 1530 level: typing.Optional[SpanLevel] = None, 1531 status_message: typing.Optional[str] = None, 1532 input: typing.Optional[typing.Any] = None, 1533 output: typing.Optional[typing.Any] = None, 1534 version: typing.Optional[str] = None, 1535 **kwargs, 1536 ) -> "StatefulSpanClient": 1537 """Create a span. 1538 1539 A span represents durations of units of work in a trace. 1540 Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1541 1542 If no trace_id is provided, a new trace is created just for this span. 1543 1544 Args: 1545 id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. 1546 trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated. 1547 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1548 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 1549 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 1550 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 1551 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 1552 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1553 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 1554 input (Optional[dict]): The input to the span. Can be any JSON object. 1555 output (Optional[dict]): The output to the span. Can be any JSON object. 1556 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 1557 **kwargs: Additional keyword arguments to include in the span. 1558 1559 Returns: 1560 StatefulSpanClient: The created span. 1561 1562 Example: 1563 ```python 1564 from langfuse import Langfuse 1565 1566 langfuse = Langfuse() 1567 1568 trace = langfuse.trace(name = "llm-feature") 1569 1570 # Create a span 1571 retrieval = langfuse.span(name = "retrieval", trace_id = trace.id) 1572 1573 # Create a nested span 1574 nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id) 1575 ``` 1576 """ 1577 new_span_id = id or str(uuid.uuid4()) 1578 new_trace_id = trace_id or str(uuid.uuid4()) 1579 self.trace_id = new_trace_id 1580 try: 1581 span_body = { 1582 "id": new_span_id, 1583 "trace_id": new_trace_id, 1584 "name": name, 1585 "start_time": start_time or _get_timestamp(), 1586 "metadata": metadata, 1587 "input": input, 1588 "output": output, 1589 "level": level, 1590 "status_message": status_message, 1591 "parent_observation_id": parent_observation_id, 1592 "version": version, 1593 "end_time": end_time, 1594 "trace": {"release": self.release}, 1595 **kwargs, 1596 } 1597 1598 if trace_id is None: 1599 self._generate_trace(new_trace_id, name or new_trace_id) 1600 1601 self.log.debug(f"Creating span {span_body}...") 1602 1603 span_body = CreateSpanBody(**span_body) 1604 1605 event = { 1606 "id": str(uuid.uuid4()), 1607 "type": "span-create", 1608 "body": span_body.dict(exclude_none=True), 1609 } 1610 1611 self.log.debug(f"Creating span {event}...") 1612 self.task_manager.add_task(event) 1613 1614 except Exception as e: 1615 self.log.exception(e) 1616 finally: 1617 self._log_memory_usage() 1618 1619 return StatefulSpanClient( 1620 self.client, 1621 new_span_id, 1622 StateType.OBSERVATION, 1623 new_trace_id, 1624 self.task_manager, 1625 ) 1626 1627 def event( 1628 self, 1629 *, 1630 id: typing.Optional[str] = None, 1631 trace_id: typing.Optional[str] = None, 1632 parent_observation_id: typing.Optional[str] = None, 1633 name: typing.Optional[str] = None, 1634 start_time: typing.Optional[dt.datetime] = None, 1635 metadata: typing.Optional[typing.Any] = None, 1636 input: typing.Optional[typing.Any] = None, 1637 output: typing.Optional[typing.Any] = None, 1638 level: typing.Optional[SpanLevel] = None, 1639 status_message: typing.Optional[str] = None, 1640 version: typing.Optional[str] = None, 1641 **kwargs, 1642 ) -> "StatefulSpanClient": 1643 """Create an event. 1644 1645 An event represents a discrete event in a trace. 1646 Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1647 1648 If no trace_id is provided, a new trace is created just for this event. 1649 1650 Args: 1651 id (Optional[str]): The id of the event can be set, otherwise a random id is generated. 1652 trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event. 1653 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1654 name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. 1655 start_time (Optional[datetime]): The time at which the event started, defaults to the current time. 1656 metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. 1657 input (Optional[Any]): The input to the event. Can be any JSON object. 1658 output (Optional[Any]): The output to the event. Can be any JSON object. 1659 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1660 status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. 1661 version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. 1662 **kwargs: Additional keyword arguments to include in the event. 1663 1664 Returns: 1665 StatefulSpanClient: The created event. 1666 1667 Example: 1668 ```python 1669 from langfuse import Langfuse 1670 1671 langfuse = Langfuse() 1672 1673 trace = langfuse.trace(name = "llm-feature") 1674 1675 # Create an event 1676 retrieval = langfuse.event(name = "retrieval", trace_id = trace.id) 1677 ``` 1678 """ 1679 event_id = id or str(uuid.uuid4()) 1680 new_trace_id = trace_id or str(uuid.uuid4()) 1681 self.trace_id = new_trace_id 1682 try: 1683 event_body = { 1684 "id": event_id, 1685 "trace_id": new_trace_id, 1686 "name": name, 1687 "start_time": start_time or _get_timestamp(), 1688 "metadata": metadata, 1689 "input": input, 1690 "output": output, 1691 "level": level, 1692 "status_message": status_message, 1693 "parent_observation_id": parent_observation_id, 1694 "version": version, 1695 "trace": {"release": self.release}, 1696 **kwargs, 1697 } 1698 1699 if trace_id is None: 1700 self._generate_trace(new_trace_id, name or new_trace_id) 1701 1702 request = CreateEventBody(**event_body) 1703 1704 event = { 1705 "id": str(uuid.uuid4()), 1706 "type": "event-create", 1707 "body": request.dict(exclude_none=True), 1708 } 1709 1710 self.log.debug(f"Creating event {event}...") 1711 self.task_manager.add_task(event) 1712 1713 except Exception as e: 1714 self.log.exception(e) 1715 finally: 1716 return StatefulSpanClient( 1717 self.client, 1718 event_id, 1719 StateType.OBSERVATION, 1720 new_trace_id, 1721 self.task_manager, 1722 ) 1723 1724 def generation( 1725 self, 1726 *, 1727 id: typing.Optional[str] = None, 1728 trace_id: typing.Optional[str] = None, 1729 parent_observation_id: typing.Optional[str] = None, 1730 name: typing.Optional[str] = None, 1731 start_time: typing.Optional[dt.datetime] = None, 1732 end_time: typing.Optional[dt.datetime] = None, 1733 completion_start_time: typing.Optional[dt.datetime] = None, 1734 metadata: typing.Optional[typing.Any] = None, 1735 level: typing.Optional[SpanLevel] = None, 1736 status_message: typing.Optional[str] = None, 1737 version: typing.Optional[str] = None, 1738 model: typing.Optional[str] = None, 1739 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 1740 input: typing.Optional[typing.Any] = None, 1741 output: typing.Optional[typing.Any] = None, 1742 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 1743 prompt: typing.Optional[PromptClient] = None, 1744 **kwargs, 1745 ) -> "StatefulGenerationClient": 1746 """Create a generation. 1747 1748 A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. 1749 1750 Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1751 1752 If no trace_id is provided, a new trace is created just for this generation. 1753 1754 Args: 1755 id (Optional[str]): The id of the generation can be set, defaults to random id. 1756 trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created 1757 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1758 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 1759 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 1760 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 1761 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 1762 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 1763 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1764 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 1765 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 1766 model (Optional[str]): The name of the model used for the generation. 1767 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 1768 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 1769 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 1770 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 1771 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 1772 **kwargs: Additional keyword arguments to include in the generation. 1773 1774 Returns: 1775 StatefulGenerationClient: The created generation. 1776 1777 Example: 1778 ```python 1779 from langfuse import Langfuse 1780 1781 langfuse = Langfuse() 1782 1783 # Create a generation in Langfuse 1784 generation = langfuse.generation( 1785 name="summary-generation", 1786 model="gpt-3.5-turbo", 1787 model_parameters={"maxTokens": "1000", "temperature": "0.9"}, 1788 input=[{"role": "system", "content": "You are a helpful assistant."}, 1789 {"role": "user", "content": "Please generate a summary of the following documents ..."}], 1790 metadata={"interface": "whatsapp"} 1791 ) 1792 ``` 1793 """ 1794 new_trace_id = trace_id or str(uuid.uuid4()) 1795 new_generation_id = id or str(uuid.uuid4()) 1796 self.trace_id = new_trace_id 1797 try: 1798 generation_body = { 1799 "id": new_generation_id, 1800 "trace_id": new_trace_id, 1801 "release": self.release, 1802 "name": name, 1803 "start_time": start_time or _get_timestamp(), 1804 "metadata": metadata, 1805 "input": input, 1806 "output": output, 1807 "level": level, 1808 "status_message": status_message, 1809 "parent_observation_id": parent_observation_id, 1810 "version": version, 1811 "end_time": end_time, 1812 "completion_start_time": completion_start_time, 1813 "model": model, 1814 "model_parameters": model_parameters, 1815 "usage": _convert_usage_input(usage) if usage is not None else None, 1816 "trace": {"release": self.release}, 1817 **_create_prompt_context(prompt), 1818 **kwargs, 1819 } 1820 1821 if trace_id is None: 1822 trace = { 1823 "id": new_trace_id, 1824 "release": self.release, 1825 "name": name, 1826 } 1827 request = TraceBody(**trace) 1828 1829 event = { 1830 "id": str(uuid.uuid4()), 1831 "type": "trace-create", 1832 "body": request.dict(exclude_none=True), 1833 } 1834 1835 self.log.debug(f"Creating trace {event}...") 1836 1837 self.task_manager.add_task(event) 1838 1839 self.log.debug(f"Creating generation max {generation_body} {usage}...") 1840 request = CreateGenerationBody(**generation_body) 1841 1842 event = { 1843 "id": str(uuid.uuid4()), 1844 "type": "generation-create", 1845 "body": request.dict(exclude_none=True), 1846 } 1847 1848 self.log.debug(f"Creating top-level generation {event} ...") 1849 self.task_manager.add_task(event) 1850 1851 except Exception as e: 1852 self.log.exception(e) 1853 finally: 1854 return StatefulGenerationClient( 1855 self.client, 1856 new_generation_id, 1857 StateType.OBSERVATION, 1858 new_trace_id, 1859 self.task_manager, 1860 ) 1861 1862 def _generate_trace(self, trace_id: str, name: str): 1863 trace_dict = { 1864 "id": trace_id, 1865 "release": self.release, 1866 "name": name, 1867 } 1868 1869 trace_body = TraceBody(**trace_dict) 1870 1871 event = { 1872 "id": str(uuid.uuid4()), 1873 "type": "trace-create", 1874 "body": trace_body.dict(exclude_none=True), 1875 } 1876 1877 self.log.debug(f"Creating trace {event}...") 1878 self.task_manager.add_task(event) 1879 1880 def join(self): 1881 """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter. 1882 1883 If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes. 1884 To guarantee all messages have been delivered, you still need to call flush(). 1885 """ 1886 try: 1887 return self.task_manager.join() 1888 except Exception as e: 1889 self.log.exception(e) 1890 1891 def flush(self): 1892 """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down. 1893 1894 Example: 1895 ```python 1896 from langfuse import Langfuse 1897 1898 langfuse = Langfuse() 1899 1900 # Some operations with Langfuse 1901 1902 # Flushing all events to end Langfuse cleanly 1903 langfuse.flush() 1904 ``` 1905 """ 1906 try: 1907 return self.task_manager.flush() 1908 except Exception as e: 1909 self.log.exception(e) 1910 1911 def shutdown(self): 1912 """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated. 1913 1914 This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API. 1915 As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API. 1916 """ 1917 try: 1918 return self.task_manager.shutdown() 1919 except Exception as e: 1920 self.log.exception(e) 1921 1922 1923class StateType(Enum): 1924 """Enum to distinguish observation and trace states. 1925 1926 Attributes: 1927 OBSERVATION (int): Observation state. 1928 TRACE (int): Trace state. 1929 """ 1930 1931 OBSERVATION = 1 1932 TRACE = 0 1933 1934 1935class StatefulClient(object): 1936 """Base class for handling stateful operations in the Langfuse system. 1937 1938 This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events, 1939 associating them with either an observation or a trace based on the specified state type. 1940 1941 Attributes: 1942 client (FernLangfuse): Core interface for Langfuse API interactions. 1943 id (str): Unique identifier of the stateful client (either observation or trace). 1944 state_type (StateType): Enum indicating whether the client is an observation or a trace. 1945 trace_id (str): Id of the trace associated with the stateful client. 1946 task_manager (TaskManager): Manager handling asynchronous tasks for the client. 1947 """ 1948 1949 log = logging.getLogger("langfuse") 1950 1951 def __init__( 1952 self, 1953 client: FernLangfuse, 1954 id: str, 1955 state_type: StateType, 1956 trace_id: str, 1957 task_manager: TaskManager, 1958 ): 1959 """Initialize the StatefulClient. 1960 1961 Args: 1962 client (FernLangfuse): Core interface for Langfuse API interactions. 1963 id (str): Unique identifier of the stateful client (either observation or trace). 1964 state_type (StateType): Enum indicating whether the client is an observation or a trace. 1965 trace_id (str): Id of the trace associated with the stateful client. 1966 task_manager (TaskManager): Manager handling asynchronous tasks for the client. 1967 """ 1968 self.client = client 1969 self.trace_id = trace_id 1970 self.id = id 1971 self.state_type = state_type 1972 self.task_manager = task_manager 1973 1974 def _add_state_to_event(self, body: dict): 1975 if self.state_type == StateType.OBSERVATION: 1976 body["parent_observation_id"] = self.id 1977 body["trace_id"] = self.trace_id 1978 else: 1979 body["trace_id"] = self.id 1980 return body 1981 1982 def _add_default_values(self, body: dict): 1983 if body.get("start_time") is None: 1984 body["start_time"] = _get_timestamp() 1985 return body 1986 1987 def generation( 1988 self, 1989 *, 1990 id: typing.Optional[str] = None, 1991 name: typing.Optional[str] = None, 1992 start_time: typing.Optional[dt.datetime] = None, 1993 end_time: typing.Optional[dt.datetime] = None, 1994 metadata: typing.Optional[typing.Any] = None, 1995 level: typing.Optional[SpanLevel] = None, 1996 status_message: typing.Optional[str] = None, 1997 version: typing.Optional[str] = None, 1998 completion_start_time: typing.Optional[dt.datetime] = None, 1999 model: typing.Optional[str] = None, 2000 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2001 input: typing.Optional[typing.Any] = None, 2002 output: typing.Optional[typing.Any] = None, 2003 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2004 prompt: typing.Optional[PromptClient] = None, 2005 **kwargs, 2006 ) -> "StatefulGenerationClient": 2007 """Create a generation nested within the current observation or trace. 2008 2009 A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. 2010 2011 Args: 2012 id (Optional[str]): The id of the generation can be set, defaults to random id. 2013 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2014 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2015 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 2016 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2017 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2018 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2019 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2020 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2021 model (Optional[str]): The name of the model used for the generation. 2022 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2023 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2024 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2025 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2026 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2027 **kwargs: Additional keyword arguments to include in the generation. 2028 2029 Returns: 2030 StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations. 2031 2032 Example: 2033 ```python 2034 from langfuse import Langfuse 2035 2036 langfuse = Langfuse() 2037 2038 # Create a trace 2039 trace = langfuse.trace(name = "llm-feature") 2040 2041 # Create a nested generation in Langfuse 2042 generation = trace.generation( 2043 name="summary-generation", 2044 model="gpt-3.5-turbo", 2045 model_parameters={"maxTokens": "1000", "temperature": "0.9"}, 2046 input=[{"role": "system", "content": "You are a helpful assistant."}, 2047 {"role": "user", "content": "Please generate a summary of the following documents ..."}], 2048 metadata={"interface": "whatsapp"} 2049 ) 2050 ``` 2051 """ 2052 generation_id = id or str(uuid.uuid4()) 2053 try: 2054 generation_body = { 2055 "id": generation_id, 2056 "name": name, 2057 "start_time": start_time or _get_timestamp(), 2058 "metadata": metadata, 2059 "level": level, 2060 "status_message": status_message, 2061 "version": version, 2062 "end_time": end_time, 2063 "completion_start_time": completion_start_time, 2064 "model": model, 2065 "model_parameters": model_parameters, 2066 "input": input, 2067 "output": output, 2068 "usage": _convert_usage_input(usage) if usage is not None else None, 2069 **_create_prompt_context(prompt), 2070 **kwargs, 2071 } 2072 2073 generation_body = self._add_state_to_event(generation_body) 2074 new_body = self._add_default_values(generation_body) 2075 2076 new_body = CreateGenerationBody(**new_body) 2077 2078 event = { 2079 "id": str(uuid.uuid4()), 2080 "type": "generation-create", 2081 "body": new_body.dict(exclude_none=True, exclude_unset=False), 2082 } 2083 2084 self.log.debug(f"Creating generation {new_body}...") 2085 self.task_manager.add_task(event) 2086 2087 except Exception as e: 2088 self.log.exception(e) 2089 finally: 2090 return StatefulGenerationClient( 2091 self.client, 2092 generation_id, 2093 StateType.OBSERVATION, 2094 self.trace_id, 2095 task_manager=self.task_manager, 2096 ) 2097 2098 def span( 2099 self, 2100 *, 2101 id: typing.Optional[str] = None, 2102 name: typing.Optional[str] = None, 2103 start_time: typing.Optional[dt.datetime] = None, 2104 end_time: typing.Optional[dt.datetime] = None, 2105 metadata: typing.Optional[typing.Any] = None, 2106 input: typing.Optional[typing.Any] = None, 2107 output: typing.Optional[typing.Any] = None, 2108 level: typing.Optional[SpanLevel] = None, 2109 status_message: typing.Optional[str] = None, 2110 version: typing.Optional[str] = None, 2111 **kwargs, 2112 ) -> "StatefulSpanClient": 2113 """Create a span nested within the current observation or trace. 2114 2115 A span represents durations of units of work in a trace. 2116 2117 Args: 2118 id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. 2119 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2120 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2121 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2122 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2123 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2124 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2125 input (Optional[dict]): The input to the span. Can be any JSON object. 2126 output (Optional[dict]): The output to the span. Can be any JSON object. 2127 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2128 **kwargs: Additional keyword arguments to include in the span. 2129 2130 Returns: 2131 StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations. 2132 2133 Example: 2134 ```python 2135 from langfuse import Langfuse 2136 2137 langfuse = Langfuse() 2138 2139 # Create a trace 2140 trace = langfuse.trace(name = "llm-feature") 2141 2142 # Create a span 2143 retrieval = langfuse.span(name = "retrieval") 2144 ``` 2145 """ 2146 span_id = id or str(uuid.uuid4()) 2147 try: 2148 span_body = { 2149 "id": span_id, 2150 "name": name, 2151 "start_time": start_time or _get_timestamp(), 2152 "metadata": metadata, 2153 "input": input, 2154 "output": output, 2155 "level": level, 2156 "status_message": status_message, 2157 "version": version, 2158 "end_time": end_time, 2159 **kwargs, 2160 } 2161 2162 self.log.debug(f"Creating span {span_body}...") 2163 2164 new_dict = self._add_state_to_event(span_body) 2165 new_body = self._add_default_values(new_dict) 2166 2167 event = CreateSpanBody(**new_body) 2168 2169 event = { 2170 "id": str(uuid.uuid4()), 2171 "type": "span-create", 2172 "body": event.dict(exclude_none=True), 2173 } 2174 2175 self.task_manager.add_task(event) 2176 except Exception as e: 2177 self.log.exception(e) 2178 finally: 2179 return StatefulSpanClient( 2180 self.client, 2181 span_id, 2182 StateType.OBSERVATION, 2183 self.trace_id, 2184 task_manager=self.task_manager, 2185 ) 2186 2187 @overload 2188 def score( 2189 self, 2190 *, 2191 id: typing.Optional[str] = None, 2192 name: str, 2193 value: float, 2194 data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2195 comment: typing.Optional[str] = None, 2196 config_id: typing.Optional[str] = None, 2197 **kwargs, 2198 ) -> "StatefulClient": ... 2199 2200 @overload 2201 def score( 2202 self, 2203 *, 2204 id: typing.Optional[str] = None, 2205 name: str, 2206 value: str, 2207 data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2208 comment: typing.Optional[str] = None, 2209 config_id: typing.Optional[str] = None, 2210 **kwargs, 2211 ) -> "StatefulClient": ... 2212 2213 def score( 2214 self, 2215 *, 2216 id: typing.Optional[str] = None, 2217 name: str, 2218 value: typing.Union[float, str], 2219 data_type: typing.Optional[ScoreDataType] = None, 2220 comment: typing.Optional[str] = None, 2221 config_id: typing.Optional[str] = None, 2222 **kwargs, 2223 ) -> "StatefulClient": 2224 """Create a score attached for the current observation or trace. 2225 2226 Args: 2227 name (str): Identifier of the score. 2228 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. 2229 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 2230 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 2231 comment (Optional[str]): Additional context/explanation of the score. 2232 id (Optional[str]): The id of the score. If not provided, a new UUID is generated. 2233 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 2234 **kwargs: Additional keyword arguments to include in the score. 2235 2236 Returns: 2237 StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining. 2238 2239 Example: 2240 ```python 2241 from langfuse import Langfuse 2242 2243 langfuse = Langfuse() 2244 2245 # Create a trace 2246 trace = langfuse.trace(name="example-application") 2247 2248 # Add score to the trace 2249 trace = trace.score( 2250 name="user-explicit-feedback", 2251 value=0.8, 2252 comment="I like how personalized the response is" 2253 ) 2254 ``` 2255 """ 2256 score_id = id or str(uuid.uuid4()) 2257 try: 2258 new_score = { 2259 "id": score_id, 2260 "trace_id": self.trace_id, 2261 "name": name, 2262 "value": value, 2263 "data_type": data_type, 2264 "comment": comment, 2265 "config_id": config_id, 2266 **kwargs, 2267 } 2268 2269 self.log.debug(f"Creating score {new_score}...") 2270 2271 new_dict = self._add_state_to_event(new_score) 2272 2273 if self.state_type == StateType.OBSERVATION: 2274 new_dict["observationId"] = self.id 2275 2276 request = ScoreBody(**new_dict) 2277 2278 event = { 2279 "id": str(uuid.uuid4()), 2280 "type": "score-create", 2281 "body": request.dict(exclude_none=True), 2282 } 2283 2284 self.task_manager.add_task(event) 2285 2286 except Exception as e: 2287 self.log.exception(e) 2288 finally: 2289 return StatefulClient( 2290 self.client, 2291 self.id, 2292 self.state_type, 2293 self.trace_id, 2294 task_manager=self.task_manager, 2295 ) 2296 2297 def event( 2298 self, 2299 *, 2300 id: typing.Optional[str] = None, 2301 name: typing.Optional[str] = None, 2302 start_time: typing.Optional[dt.datetime] = None, 2303 metadata: typing.Optional[typing.Any] = None, 2304 input: typing.Optional[typing.Any] = None, 2305 output: typing.Optional[typing.Any] = None, 2306 level: typing.Optional[SpanLevel] = None, 2307 status_message: typing.Optional[str] = None, 2308 version: typing.Optional[str] = None, 2309 **kwargs, 2310 ) -> "StatefulClient": 2311 """Create an event nested within the current observation or trace. 2312 2313 An event represents a discrete event in a trace. 2314 2315 Args: 2316 id (Optional[str]): The id of the event can be set, otherwise a random id is generated. 2317 name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. 2318 start_time (Optional[datetime]): The time at which the event started, defaults to the current time. 2319 metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. 2320 input (Optional[Any]): The input to the event. Can be any JSON object. 2321 output (Optional[Any]): The output to the event. Can be any JSON object. 2322 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2323 status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. 2324 version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. 2325 **kwargs: Additional keyword arguments to include in the event. 2326 2327 Returns: 2328 StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations. 2329 2330 Example: 2331 ```python 2332 from langfuse import Langfuse 2333 2334 langfuse = Langfuse() 2335 2336 # Create a trace 2337 trace = langfuse.trace(name = "llm-feature") 2338 2339 # Create an event 2340 retrieval = trace.event(name = "retrieval") 2341 ``` 2342 """ 2343 event_id = id or str(uuid.uuid4()) 2344 try: 2345 event_body = { 2346 "id": event_id, 2347 "name": name, 2348 "start_time": start_time or _get_timestamp(), 2349 "metadata": metadata, 2350 "input": input, 2351 "output": output, 2352 "level": level, 2353 "status_message": status_message, 2354 "version": version, 2355 **kwargs, 2356 } 2357 2358 new_dict = self._add_state_to_event(event_body) 2359 new_body = self._add_default_values(new_dict) 2360 2361 request = CreateEventBody(**new_body) 2362 2363 event = { 2364 "id": str(uuid.uuid4()), 2365 "type": "event-create", 2366 "body": request.dict(exclude_none=True), 2367 } 2368 2369 self.log.debug(f"Creating event {event}...") 2370 self.task_manager.add_task(event) 2371 2372 except Exception as e: 2373 self.log.exception(e) 2374 finally: 2375 return StatefulClient( 2376 self.client, 2377 event_id, 2378 StateType.OBSERVATION, 2379 self.trace_id, 2380 self.task_manager, 2381 ) 2382 2383 def get_trace_url(self): 2384 """Get the URL to see the current trace in the Langfuse UI.""" 2385 return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}" 2386 2387 2388class StatefulGenerationClient(StatefulClient): 2389 """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient. 2390 2391 This client extends the capabilities of the StatefulClient to specifically handle generation, 2392 allowing for the creation, update, and termination of generation processes in Langfuse. 2393 2394 Attributes: 2395 client (FernLangfuse): Core interface for Langfuse API interaction. 2396 id (str): Unique identifier of the generation. 2397 state_type (StateType): Type of the stateful entity (observation or trace). 2398 trace_id (str): Id of trace associated with the generation. 2399 task_manager (TaskManager): Manager for handling asynchronous tasks. 2400 """ 2401 2402 log = logging.getLogger("langfuse") 2403 2404 def __init__( 2405 self, 2406 client: FernLangfuse, 2407 id: str, 2408 state_type: StateType, 2409 trace_id: str, 2410 task_manager: TaskManager, 2411 ): 2412 """Initialize the StatefulGenerationClient.""" 2413 super().__init__(client, id, state_type, trace_id, task_manager) 2414 2415 # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY 2416 def update( 2417 self, 2418 *, 2419 name: typing.Optional[str] = None, 2420 start_time: typing.Optional[dt.datetime] = None, 2421 end_time: typing.Optional[dt.datetime] = None, 2422 completion_start_time: typing.Optional[dt.datetime] = None, 2423 metadata: typing.Optional[typing.Any] = None, 2424 level: typing.Optional[SpanLevel] = None, 2425 status_message: typing.Optional[str] = None, 2426 version: typing.Optional[str] = None, 2427 model: typing.Optional[str] = None, 2428 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2429 input: typing.Optional[typing.Any] = None, 2430 output: typing.Optional[typing.Any] = None, 2431 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2432 prompt: typing.Optional[PromptClient] = None, 2433 **kwargs, 2434 ) -> "StatefulGenerationClient": 2435 """Update the generation. 2436 2437 Args: 2438 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2439 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2440 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 2441 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2442 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2443 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2444 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2445 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2446 model (Optional[str]): The name of the model used for the generation. 2447 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2448 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2449 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2450 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2451 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2452 **kwargs: Additional keyword arguments to include in the generation. 2453 2454 Returns: 2455 StatefulGenerationClient: The updated generation. Passthrough for chaining. 2456 2457 Example: 2458 ```python 2459 from langfuse import Langfuse 2460 2461 langfuse = Langfuse() 2462 2463 # Create a trace 2464 trace = langfuse.trace(name = "llm-feature") 2465 2466 # Create a nested generation in Langfuse 2467 generation = trace.generation(name="summary-generation") 2468 2469 # Update the generation 2470 generation = generation.update(metadata={"interface": "whatsapp"}) 2471 ``` 2472 """ 2473 try: 2474 generation_body = { 2475 "id": self.id, 2476 "trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API 2477 "name": name, 2478 "start_time": start_time, 2479 "metadata": metadata, 2480 "level": level, 2481 "status_message": status_message, 2482 "version": version, 2483 "end_time": end_time, 2484 "completion_start_time": completion_start_time, 2485 "model": model, 2486 "model_parameters": model_parameters, 2487 "input": input, 2488 "output": output, 2489 "usage": _convert_usage_input(usage) if usage is not None else None, 2490 **_create_prompt_context(prompt), 2491 **kwargs, 2492 } 2493 2494 self.log.debug(f"Update generation {generation_body}...") 2495 2496 request = UpdateGenerationBody(**generation_body) 2497 2498 event = { 2499 "id": str(uuid.uuid4()), 2500 "type": "generation-update", 2501 "body": request.dict(exclude_none=True, exclude_unset=False), 2502 } 2503 2504 self.log.debug(f"Update generation {event}...") 2505 self.task_manager.add_task(event) 2506 2507 except Exception as e: 2508 self.log.exception(e) 2509 finally: 2510 return StatefulGenerationClient( 2511 self.client, 2512 self.id, 2513 StateType.OBSERVATION, 2514 self.trace_id, 2515 task_manager=self.task_manager, 2516 ) 2517 2518 def end( 2519 self, 2520 *, 2521 name: typing.Optional[str] = None, 2522 start_time: typing.Optional[dt.datetime] = None, 2523 end_time: typing.Optional[dt.datetime] = None, 2524 completion_start_time: typing.Optional[dt.datetime] = None, 2525 metadata: typing.Optional[typing.Any] = None, 2526 level: typing.Optional[SpanLevel] = None, 2527 status_message: typing.Optional[str] = None, 2528 version: typing.Optional[str] = None, 2529 model: typing.Optional[str] = None, 2530 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2531 input: typing.Optional[typing.Any] = None, 2532 output: typing.Optional[typing.Any] = None, 2533 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2534 prompt: typing.Optional[PromptClient] = None, 2535 **kwargs, 2536 ) -> "StatefulGenerationClient": 2537 """End the generation, optionally updating its properties. 2538 2539 Args: 2540 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2541 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2542 end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time. 2543 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2544 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2545 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2546 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2547 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2548 model (Optional[str]): The name of the model used for the generation. 2549 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2550 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2551 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2552 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2553 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2554 **kwargs: Additional keyword arguments to include in the generation. 2555 2556 Returns: 2557 StatefulGenerationClient: The ended generation. Passthrough for chaining. 2558 2559 Example: 2560 ```python 2561 from langfuse import Langfuse 2562 2563 langfuse = Langfuse() 2564 2565 # Create a trace 2566 trace = langfuse.trace(name = "llm-feature") 2567 2568 # Create a nested generation in Langfuse 2569 generation = trace.generation(name="summary-generation") 2570 2571 # End the generation and update its properties 2572 generation = generation.end(metadata={"interface": "whatsapp"}) 2573 ``` 2574 """ 2575 return self.update( 2576 name=name, 2577 start_time=start_time, 2578 end_time=end_time or _get_timestamp(), 2579 metadata=metadata, 2580 level=level, 2581 status_message=status_message, 2582 version=version, 2583 completion_start_time=completion_start_time, 2584 model=model, 2585 model_parameters=model_parameters, 2586 input=input, 2587 output=output, 2588 usage=usage, 2589 prompt=prompt, 2590 **kwargs, 2591 ) 2592 2593 2594class StatefulSpanClient(StatefulClient): 2595 """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient. 2596 2597 Attributes: 2598 client (FernLangfuse): Core interface for Langfuse API interaction. 2599 id (str): Unique identifier of the span. 2600 state_type (StateType): Type of the stateful entity (observation or trace). 2601 trace_id (str): Id of trace associated with the span. 2602 task_manager (TaskManager): Manager for handling asynchronous tasks. 2603 """ 2604 2605 log = logging.getLogger("langfuse") 2606 2607 def __init__( 2608 self, 2609 client: FernLangfuse, 2610 id: str, 2611 state_type: StateType, 2612 trace_id: str, 2613 task_manager: TaskManager, 2614 ): 2615 """Initialize the StatefulSpanClient.""" 2616 super().__init__(client, id, state_type, trace_id, task_manager) 2617 2618 # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY 2619 def update( 2620 self, 2621 *, 2622 name: typing.Optional[str] = None, 2623 start_time: typing.Optional[dt.datetime] = None, 2624 end_time: typing.Optional[dt.datetime] = None, 2625 metadata: typing.Optional[typing.Any] = None, 2626 input: typing.Optional[typing.Any] = None, 2627 output: typing.Optional[typing.Any] = None, 2628 level: typing.Optional[SpanLevel] = None, 2629 status_message: typing.Optional[str] = None, 2630 version: typing.Optional[str] = None, 2631 **kwargs, 2632 ) -> "StatefulSpanClient": 2633 """Update the span. 2634 2635 Args: 2636 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2637 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2638 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2639 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2640 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2641 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2642 input (Optional[dict]): The input to the span. Can be any JSON object. 2643 output (Optional[dict]): The output to the span. Can be any JSON object. 2644 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2645 **kwargs: Additional keyword arguments to include in the span. 2646 2647 Returns: 2648 StatefulSpanClient: The updated span. Passthrough for chaining. 2649 2650 Example: 2651 ```python 2652 from langfuse import Langfuse 2653 2654 langfuse = Langfuse() 2655 2656 # Create a trace 2657 trace = langfuse.trace(name = "llm-feature") 2658 2659 # Create a nested span in Langfuse 2660 span = trace.span(name="retrieval") 2661 2662 # Update the span 2663 span = span.update(metadata={"interface": "whatsapp"}) 2664 ``` 2665 """ 2666 try: 2667 span_body = { 2668 "id": self.id, 2669 "trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API 2670 "name": name, 2671 "start_time": start_time, 2672 "metadata": metadata, 2673 "input": input, 2674 "output": output, 2675 "level": level, 2676 "status_message": status_message, 2677 "version": version, 2678 "end_time": end_time, 2679 **kwargs, 2680 } 2681 self.log.debug(f"Update span {span_body}...") 2682 2683 request = UpdateSpanBody(**span_body) 2684 2685 event = { 2686 "id": str(uuid.uuid4()), 2687 "type": "span-update", 2688 "body": request.dict(exclude_none=True), 2689 } 2690 2691 self.task_manager.add_task(event) 2692 except Exception as e: 2693 self.log.exception(e) 2694 finally: 2695 return StatefulSpanClient( 2696 self.client, 2697 self.id, 2698 StateType.OBSERVATION, 2699 self.trace_id, 2700 task_manager=self.task_manager, 2701 ) 2702 2703 def end( 2704 self, 2705 *, 2706 name: typing.Optional[str] = None, 2707 start_time: typing.Optional[dt.datetime] = None, 2708 end_time: typing.Optional[dt.datetime] = None, 2709 metadata: typing.Optional[typing.Any] = None, 2710 input: typing.Optional[typing.Any] = None, 2711 output: typing.Optional[typing.Any] = None, 2712 level: typing.Optional[SpanLevel] = None, 2713 status_message: typing.Optional[str] = None, 2714 version: typing.Optional[str] = None, 2715 **kwargs, 2716 ) -> "StatefulSpanClient": 2717 """End the span, optionally updating its properties. 2718 2719 Args: 2720 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2721 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2722 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2723 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2724 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2725 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2726 input (Optional[dict]): The input to the span. Can be any JSON object. 2727 output (Optional[dict]): The output to the span. Can be any JSON object. 2728 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2729 **kwargs: Additional keyword arguments to include in the span. 2730 2731 Returns: 2732 StatefulSpanClient: The updated span. Passthrough for chaining. 2733 2734 Example: 2735 ```python 2736 from langfuse import Langfuse 2737 2738 langfuse = Langfuse() 2739 2740 # Create a trace 2741 trace = langfuse.trace(name = "llm-feature") 2742 2743 # Create a nested span in Langfuse 2744 span = trace.span(name="retrieval") 2745 2746 # End the span and update its properties 2747 span = span.end(metadata={"interface": "whatsapp"}) 2748 ``` 2749 """ 2750 try: 2751 span_body = { 2752 "name": name, 2753 "start_time": start_time, 2754 "metadata": metadata, 2755 "input": input, 2756 "output": output, 2757 "level": level, 2758 "status_message": status_message, 2759 "version": version, 2760 "end_time": end_time or _get_timestamp(), 2761 **kwargs, 2762 } 2763 return self.update(**span_body) 2764 2765 except Exception as e: 2766 self.log.warning(e) 2767 finally: 2768 return StatefulSpanClient( 2769 self.client, 2770 self.id, 2771 StateType.OBSERVATION, 2772 self.trace_id, 2773 task_manager=self.task_manager, 2774 ) 2775 2776 def get_langchain_handler(self, update_parent: bool = False): 2777 """Get langchain callback handler associated with the current span. 2778 2779 Args: 2780 update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run. 2781 2782 Returns: 2783 CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient. 2784 """ 2785 from langfuse.callback import CallbackHandler 2786 2787 return CallbackHandler( 2788 stateful_client=self, update_stateful_client=update_parent 2789 ) 2790 2791 2792class StatefulTraceClient(StatefulClient): 2793 """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient. 2794 2795 Attributes: 2796 client (FernLangfuse): Core interface for Langfuse API interaction. 2797 id (str): Unique identifier of the trace. 2798 state_type (StateType): Type of the stateful entity (observation or trace). 2799 trace_id (str): The trace ID associated with this client. 2800 task_manager (TaskManager): Manager for handling asynchronous tasks. 2801 """ 2802 2803 log = logging.getLogger("langfuse") 2804 2805 def __init__( 2806 self, 2807 client: FernLangfuse, 2808 id: str, 2809 state_type: StateType, 2810 trace_id: str, 2811 task_manager: TaskManager, 2812 ): 2813 """Initialize the StatefulTraceClient.""" 2814 super().__init__(client, id, state_type, trace_id, task_manager) 2815 self.task_manager = task_manager 2816 2817 def update( 2818 self, 2819 *, 2820 name: typing.Optional[str] = None, 2821 user_id: typing.Optional[str] = None, 2822 session_id: typing.Optional[str] = None, 2823 version: typing.Optional[str] = None, 2824 release: typing.Optional[str] = None, 2825 input: typing.Optional[typing.Any] = None, 2826 output: typing.Optional[typing.Any] = None, 2827 metadata: typing.Optional[typing.Any] = None, 2828 tags: typing.Optional[typing.List[str]] = None, 2829 public: typing.Optional[bool] = None, 2830 **kwargs, 2831 ) -> "StatefulTraceClient": 2832 """Update the trace. 2833 2834 Args: 2835 name: Identifier of the trace. Useful for sorting/filtering in the UI. 2836 input: The input of the trace. Can be any JSON object. 2837 output: The output of the trace. Can be any JSON object. 2838 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 2839 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 2840 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 2841 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 2842 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 2843 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 2844 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 2845 **kwargs: Additional keyword arguments that can be included in the trace. 2846 2847 Returns: 2848 StatefulTraceClient: The updated trace. Passthrough for chaining. 2849 2850 Example: 2851 ```python 2852 from langfuse import Langfuse 2853 2854 langfuse = Langfuse() 2855 2856 # Create a trace 2857 trace = langfuse.trace( 2858 name="example-application", 2859 user_id="user-1234") 2860 ) 2861 2862 # Update the trace 2863 trace = trace.update( 2864 output={"result": "success"}, 2865 metadata={"interface": "whatsapp"} 2866 ) 2867 ``` 2868 """ 2869 try: 2870 trace_body = { 2871 "id": self.id, 2872 "name": name, 2873 "userId": user_id, 2874 "sessionId": session_id 2875 or kwargs.get("sessionId", None), # backward compatibility 2876 "version": version, 2877 "release": release, 2878 "input": input, 2879 "output": output, 2880 "metadata": metadata, 2881 "public": public, 2882 "tags": tags, 2883 **kwargs, 2884 } 2885 self.log.debug(f"Update trace {trace_body}...") 2886 2887 request = TraceBody(**trace_body) 2888 2889 event = { 2890 "id": str(uuid.uuid4()), 2891 "type": "trace-create", 2892 "body": request.dict(exclude_none=True), 2893 } 2894 2895 self.task_manager.add_task(event) 2896 2897 except Exception as e: 2898 self.log.exception(e) 2899 finally: 2900 return StatefulTraceClient( 2901 self.client, 2902 self.id, 2903 StateType.TRACE, 2904 self.trace_id, 2905 task_manager=self.task_manager, 2906 ) 2907 2908 def get_langchain_handler(self, update_parent: bool = False): 2909 """Get langchain callback handler associated with the current trace. 2910 2911 This method creates and returns a CallbackHandler instance, linking it with the current 2912 trace. Use this if you want to group multiple Langchain runs within a single trace. 2913 2914 Args: 2915 update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run. 2916 2917 Raises: 2918 ImportError: If the 'langchain' module is not installed, indicating missing functionality. 2919 2920 Returns: 2921 CallbackHandler: Langchain callback handler linked to the current trace. 2922 2923 Example: 2924 ```python 2925 from langfuse import Langfuse 2926 2927 langfuse = Langfuse() 2928 2929 # Create a trace 2930 trace = langfuse.trace(name = "llm-feature") 2931 2932 # Get a langchain callback handler 2933 handler = trace.get_langchain_handler() 2934 ``` 2935 """ 2936 try: 2937 from langfuse.callback import CallbackHandler 2938 2939 self.log.debug(f"Creating new handler for trace {self.id}") 2940 2941 return CallbackHandler( 2942 stateful_client=self, 2943 debug=self.log.level == logging.DEBUG, 2944 update_stateful_client=update_parent, 2945 ) 2946 except Exception as e: 2947 self.log.exception(e) 2948 2949 def getNewHandler(self): 2950 """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated.""" 2951 return self.get_langchain_handler() 2952 2953 2954class DatasetItemClient: 2955 """Class for managing dataset items in Langfuse. 2956 2957 Args: 2958 id (str): Unique identifier of the dataset item. 2959 status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'. 2960 input (Any): Input data of the dataset item. 2961 expected_output (Optional[Any]): Expected output of the dataset item. 2962 metadata (Optional[Any]): Additional metadata of the dataset item. 2963 source_trace_id (Optional[str]): Identifier of the source trace. 2964 source_observation_id (Optional[str]): Identifier of the source observation. 2965 dataset_id (str): Identifier of the dataset to which this item belongs. 2966 dataset_name (str): Name of the dataset to which this item belongs. 2967 created_at (datetime): Timestamp of dataset item creation. 2968 updated_at (datetime): Timestamp of the last update to the dataset item. 2969 langfuse (Langfuse): Instance of Langfuse client for API interactions. 2970 2971 Example: 2972 ```python 2973 from langfuse import Langfuse 2974 2975 langfuse = Langfuse() 2976 2977 dataset = langfuse.get_dataset("<dataset_name>") 2978 2979 for item in dataset.items: 2980 # Generate a completion using the input of every item 2981 completion, generation = llm_app.run(item.input) 2982 2983 # Evaluate the completion 2984 generation.score( 2985 name="example-score", 2986 value=1 2987 ) 2988 ``` 2989 """ 2990 2991 log = logging.getLogger("langfuse") 2992 2993 id: str 2994 status: DatasetStatus 2995 input: typing.Any 2996 expected_output: typing.Optional[typing.Any] 2997 metadata: Optional[Any] 2998 source_trace_id: typing.Optional[str] 2999 source_observation_id: typing.Optional[str] 3000 dataset_id: str 3001 dataset_name: str 3002 created_at: dt.datetime 3003 updated_at: dt.datetime 3004 3005 langfuse: Langfuse 3006 3007 def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse): 3008 """Initialize the DatasetItemClient.""" 3009 self.id = dataset_item.id 3010 self.status = dataset_item.status 3011 self.input = dataset_item.input 3012 self.expected_output = dataset_item.expected_output 3013 self.metadata = dataset_item.metadata 3014 self.source_trace_id = dataset_item.source_trace_id 3015 self.source_observation_id = dataset_item.source_observation_id 3016 self.dataset_id = dataset_item.dataset_id 3017 self.dataset_name = dataset_item.dataset_name 3018 self.created_at = dataset_item.created_at 3019 self.updated_at = dataset_item.updated_at 3020 3021 self.langfuse = langfuse 3022 3023 def flush(self, observation: StatefulClient, run_name: str): 3024 """Flushes an observations task manager's queue. 3025 3026 Used before creating a dataset run item to ensure all events are persistent. 3027 3028 Args: 3029 observation (StatefulClient): The observation or trace client associated with the dataset item. 3030 run_name (str): The name of the dataset run. 3031 """ 3032 observation.task_manager.flush() 3033 3034 def link( 3035 self, 3036 trace_or_observation: typing.Union[StatefulClient, str, None], 3037 run_name: str, 3038 run_metadata: Optional[Any] = None, 3039 run_description: Optional[str] = None, 3040 trace_id: Optional[str] = None, 3041 observation_id: Optional[str] = None, 3042 ): 3043 """Link the dataset item to observation within a specific dataset run. Creates a dataset run item. 3044 3045 Args: 3046 trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID. 3047 run_name (str): The name of the dataset run. 3048 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3049 run_description (Optional[str]): Description of the dataset run. 3050 trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided. 3051 observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided. 3052 """ 3053 parsed_trace_id: str = None 3054 parsed_observation_id: str = None 3055 3056 if isinstance(trace_or_observation, StatefulClient): 3057 # flush the queue before creating the dataset run item 3058 # to ensure that all events are persisted. 3059 if trace_or_observation.state_type == StateType.TRACE: 3060 parsed_trace_id = trace_or_observation.trace_id 3061 elif trace_or_observation.state_type == StateType.OBSERVATION: 3062 parsed_observation_id = trace_or_observation.id 3063 parsed_trace_id = trace_or_observation.trace_id 3064 # legacy support for observation_id 3065 elif isinstance(trace_or_observation, str): 3066 parsed_observation_id = trace_or_observation 3067 elif trace_or_observation is None: 3068 if trace_id is not None: 3069 parsed_trace_id = trace_id 3070 if observation_id is not None: 3071 parsed_observation_id = observation_id 3072 else: 3073 raise ValueError( 3074 "trace_id must be provided if trace_or_observation is None" 3075 ) 3076 else: 3077 raise ValueError( 3078 "trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item" 3079 ) 3080 3081 self.log.debug( 3082 f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}" 3083 ) 3084 self.langfuse.client.dataset_run_items.create( 3085 request=CreateDatasetRunItemRequest( 3086 runName=run_name, 3087 datasetItemId=self.id, 3088 traceId=parsed_trace_id, 3089 observationId=parsed_observation_id, 3090 metadata=run_metadata, 3091 runDescription=run_description, 3092 ) 3093 ) 3094 3095 def get_langchain_handler( 3096 self, 3097 *, 3098 run_name: str, 3099 run_description: Optional[str] = None, 3100 run_metadata: Optional[Any] = None, 3101 ): 3102 """Create and get a langchain callback handler linked to this dataset item. 3103 3104 Args: 3105 run_name (str): The name of the dataset run to be used in the callback handler. 3106 run_description (Optional[str]): Description of the dataset run. 3107 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3108 3109 Returns: 3110 CallbackHandler: An instance of CallbackHandler linked to the dataset item. 3111 """ 3112 metadata = { 3113 "dataset_item_id": self.id, 3114 "run_name": run_name, 3115 "dataset_id": self.dataset_id, 3116 } 3117 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3118 3119 self.link( 3120 trace, run_name, run_metadata=run_metadata, run_description=run_description 3121 ) 3122 3123 return trace.get_langchain_handler(update_parent=True) 3124 3125 @contextmanager 3126 def observe( 3127 self, 3128 *, 3129 run_name: str, 3130 run_description: Optional[str] = None, 3131 run_metadata: Optional[Any] = None, 3132 trace_id: Optional[str] = None, 3133 ): 3134 """Observes a dataset run within the Langfuse client. 3135 3136 Args: 3137 run_name (str): The name of the dataset run. 3138 root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created. 3139 run_description (Optional[str]): The description of the dataset run. 3140 run_metadata (Optional[Any]): Additional metadata for the dataset run. 3141 3142 Yields: 3143 StatefulTraceClient: The trace associated with the dataset run. 3144 """ 3145 from langfuse.decorators import langfuse_context 3146 3147 root_trace_id = trace_id or str(uuid.uuid4()) 3148 3149 langfuse_context._set_root_trace_id(root_trace_id) 3150 3151 try: 3152 yield root_trace_id 3153 3154 finally: 3155 self.link( 3156 run_name=run_name, 3157 run_metadata=run_metadata, 3158 run_description=run_description, 3159 trace_or_observation=None, 3160 trace_id=root_trace_id, 3161 ) 3162 3163 @contextmanager 3164 def observe_llama_index( 3165 self, 3166 *, 3167 run_name: str, 3168 run_description: Optional[str] = None, 3169 run_metadata: Optional[Any] = None, 3170 llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {}, 3171 ): 3172 """Context manager for observing LlamaIndex operations linked to this dataset item. 3173 3174 This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging 3175 and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all 3176 operations performed within the context are linked to the appropriate dataset item and run in Langfuse. 3177 3178 Args: 3179 run_name (str): The name of the dataset run. 3180 run_description (Optional[str]): Description of the dataset run. Defaults to None. 3181 run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None. 3182 llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass 3183 to the LlamaIndex integration constructor. Defaults to an empty dictionary. 3184 3185 Yields: 3186 LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations. 3187 3188 Example: 3189 ```python 3190 dataset_item = dataset.items[0] 3191 3192 with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler: 3193 # Perform LlamaIndex operations here 3194 some_llama_index_operation() 3195 ``` 3196 3197 Raises: 3198 ImportError: If required modules for LlamaIndex integration are not available. 3199 """ 3200 metadata = { 3201 "dataset_item_id": self.id, 3202 "run_name": run_name, 3203 "dataset_id": self.dataset_id, 3204 } 3205 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3206 self.link( 3207 trace, run_name, run_metadata=run_metadata, run_description=run_description 3208 ) 3209 3210 try: 3211 import llama_index.core 3212 from llama_index.core import Settings 3213 from llama_index.core.callbacks import CallbackManager 3214 3215 from langfuse.llama_index import LlamaIndexCallbackHandler 3216 3217 callback_handler = LlamaIndexCallbackHandler( 3218 **llama_index_integration_constructor_kwargs, 3219 ) 3220 callback_handler.set_root(trace, update_root=True) 3221 3222 # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler 3223 # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it 3224 prev_global_handler = llama_index.core.global_handler 3225 prev_langfuse_handler = None 3226 3227 if isinstance(prev_global_handler, LlamaIndexCallbackHandler): 3228 llama_index.core.global_handler = None 3229 3230 if Settings.callback_manager is None: 3231 Settings.callback_manager = CallbackManager([callback_handler]) 3232 else: 3233 for handler in Settings.callback_manager.handlers: 3234 if isinstance(handler, LlamaIndexCallbackHandler): 3235 prev_langfuse_handler = handler 3236 Settings.callback_manager.remove_handler(handler) 3237 3238 Settings.callback_manager.add_handler(callback_handler) 3239 3240 except Exception as e: 3241 self.log.exception(e) 3242 3243 try: 3244 yield callback_handler 3245 finally: 3246 # Reset the handlers 3247 Settings.callback_manager.remove_handler(callback_handler) 3248 if prev_langfuse_handler is not None: 3249 Settings.callback_manager.add_handler(prev_langfuse_handler) 3250 3251 llama_index.core.global_handler = prev_global_handler 3252 3253 def get_llama_index_handler( 3254 self, 3255 *, 3256 run_name: str, 3257 run_description: Optional[str] = None, 3258 run_metadata: Optional[Any] = None, 3259 llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {}, 3260 ): 3261 """Create and get a llama-index callback handler linked to this dataset item. 3262 3263 Args: 3264 run_name (str): The name of the dataset run to be used in the callback handler. 3265 run_description (Optional[str]): Description of the dataset run. 3266 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3267 llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor. 3268 3269 Returns: 3270 LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item. 3271 """ 3272 metadata = { 3273 "dataset_item_id": self.id, 3274 "run_name": run_name, 3275 "dataset_id": self.dataset_id, 3276 } 3277 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3278 3279 self.link( 3280 trace, run_name, run_metadata=run_metadata, run_description=run_description 3281 ) 3282 3283 try: 3284 from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler 3285 3286 callback_handler = LlamaIndexCallbackHandler( 3287 **llama_index_integration_constructor_kwargs, 3288 ) 3289 callback_handler.set_root(trace, update_root=True) 3290 3291 return callback_handler 3292 except Exception as e: 3293 self.log.exception(e) 3294 3295 3296class DatasetClient: 3297 """Class for managing datasets in Langfuse. 3298 3299 Attributes: 3300 id (str): Unique identifier of the dataset. 3301 name (str): Name of the dataset. 3302 description (Optional[str]): Description of the dataset. 3303 metadata (Optional[typing.Any]): Additional metadata of the dataset. 3304 project_id (str): Identifier of the project to which the dataset belongs. 3305 dataset_name (str): Name of the dataset. 3306 created_at (datetime): Timestamp of dataset creation. 3307 updated_at (datetime): Timestamp of the last update to the dataset. 3308 items (List[DatasetItemClient]): List of dataset items associated with the dataset. 3309 runs (List[str]): List of dataset runs associated with the dataset. Deprecated. 3310 3311 Example: 3312 Print the input of each dataset item in a dataset. 3313 ```python 3314 from langfuse import Langfuse 3315 3316 langfuse = Langfuse() 3317 3318 dataset = langfuse.get_dataset("<dataset_name>") 3319 3320 for item in dataset.items: 3321 print(item.input) 3322 ``` 3323 """ 3324 3325 id: str 3326 name: str 3327 description: Optional[str] 3328 project_id: str 3329 dataset_name: str # for backward compatibility, to be deprecated 3330 metadata: Optional[Any] 3331 created_at: dt.datetime 3332 updated_at: dt.datetime 3333 items: typing.List[DatasetItemClient] 3334 runs: typing.List[str] = [] # deprecated 3335 3336 def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]): 3337 """Initialize the DatasetClient.""" 3338 self.id = dataset.id 3339 self.name = dataset.name 3340 self.description = dataset.description 3341 self.project_id = dataset.project_id 3342 self.metadata = dataset.metadata 3343 self.dataset_name = dataset.name # for backward compatibility, to be deprecated 3344 self.created_at = dataset.created_at 3345 self.updated_at = dataset.updated_at 3346 self.items = items
@dataclass
class
FetchTracesResponse:
96@dataclass 97class FetchTracesResponse: 98 """Response object for fetch_traces method.""" 99 100 data: typing.List[TraceWithDetails] 101 meta: MetaResponse
Response object for fetch_traces method.
FetchTracesResponse( data: List[langfuse.api.TraceWithDetails], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.TraceWithDetails]
@dataclass
class
FetchTraceResponse:
104@dataclass 105class FetchTraceResponse: 106 """Response object for fetch_trace method.""" 107 108 data: TraceWithFullDetails
Response object for fetch_trace method.
FetchTraceResponse( data: langfuse.api.TraceWithFullDetails)
@dataclass
class
FetchObservationsResponse:
111@dataclass 112class FetchObservationsResponse: 113 """Response object for fetch_observations method.""" 114 115 data: typing.List[ObservationsView] 116 meta: MetaResponse
Response object for fetch_observations method.
FetchObservationsResponse( data: List[langfuse.api.ObservationsView], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.ObservationsView]
@dataclass
class
FetchObservationResponse:
119@dataclass 120class FetchObservationResponse: 121 """Response object for fetch_observation method.""" 122 123 data: Observation
Response object for fetch_observation method.
FetchObservationResponse(data: langfuse.api.Observation)
data: langfuse.api.Observation
@dataclass
class
FetchSessionsResponse:
126@dataclass 127class FetchSessionsResponse: 128 """Response object for fetch_sessions method.""" 129 130 data: typing.List[Session] 131 meta: MetaResponse
Response object for fetch_sessions method.
FetchSessionsResponse( data: List[langfuse.api.Session], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.Session]
class
Langfuse:
134class Langfuse(object): 135 """Langfuse Python client. 136 137 Attributes: 138 log (logging.Logger): Logger for the Langfuse client. 139 base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction. 140 httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API. 141 client (FernLangfuse): Core interface for Langfuse API interaction. 142 task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks. 143 release (str): Identifies the release number or hash of the application. 144 prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances. 145 146 Example: 147 Initiating the Langfuse client should always be first step to use Langfuse. 148 ```python 149 import os 150 from langfuse import Langfuse 151 152 # Set the public and secret keys as environment variables 153 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 154 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 155 156 # Initialize the Langfuse client using the credentials 157 langfuse = Langfuse() 158 ``` 159 """ 160 161 log = logging.getLogger("langfuse") 162 """Logger for the Langfuse client.""" 163 164 host: str 165 """Host of Langfuse API.""" 166 167 def __init__( 168 self, 169 public_key: Optional[str] = None, 170 secret_key: Optional[str] = None, 171 host: Optional[str] = None, 172 release: Optional[str] = None, 173 debug: bool = False, 174 threads: Optional[int] = None, 175 flush_at: Optional[int] = None, 176 flush_interval: Optional[float] = None, 177 max_retries: Optional[int] = None, 178 timeout: Optional[int] = None, # seconds 179 sdk_integration: Optional[str] = "default", 180 httpx_client: Optional[httpx.Client] = None, 181 enabled: Optional[bool] = True, 182 sample_rate: Optional[float] = None, 183 ): 184 """Initialize the Langfuse client. 185 186 Args: 187 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 188 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 189 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 190 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 191 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 192 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 193 flush_at: Max batch size that's sent to the API. 194 flush_interval: Max delay until a new batch is sent to the API. 195 max_retries: Max number of retries in case of API/network errors. 196 timeout: Timeout of API requests in seconds. Defaults to 20 seconds. 197 httpx_client: Pass your own httpx client for more customizability of requests. 198 sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly. 199 enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops. 200 sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable. 201 202 Raises: 203 ValueError: If public_key or secret_key are not set and not found in environment variables. 204 205 Example: 206 Initiating the Langfuse client should always be first step to use Langfuse. 207 ```python 208 import os 209 from langfuse import Langfuse 210 211 # Set the public and secret keys as environment variables 212 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 213 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 214 215 # Initialize the Langfuse client using the credentials 216 langfuse = Langfuse() 217 ``` 218 """ 219 self.enabled = enabled 220 public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 221 secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 222 sample_rate = ( 223 sample_rate 224 if sample_rate 225 is not None # needs explicit None check, as 0 is a valid value 226 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 227 ) 228 229 if sample_rate is not None and ( 230 sample_rate > 1 or sample_rate < 0 231 ): # default value 1 will be set in the taskmanager 232 self.enabled = False 233 self.log.warning( 234 "Langfuse client is disabled since the sample rate provided is not between 0 and 1." 235 ) 236 237 threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1)) 238 flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15)) 239 flush_interval = flush_interval or float( 240 os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5) 241 ) 242 243 max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3)) 244 timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20)) 245 246 if not self.enabled: 247 self.log.warning( 248 "Langfuse client is disabled. No observability data will be sent." 249 ) 250 251 elif not public_key: 252 self.enabled = False 253 self.log.warning( 254 "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 255 ) 256 257 elif not secret_key: 258 self.enabled = False 259 self.log.warning( 260 "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 261 ) 262 263 set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True") 264 265 if set_debug is True: 266 # Ensures that debug level messages are logged when debug mode is on. 267 # Otherwise, defaults to WARNING level. 268 # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided 269 logging.basicConfig() 270 self.log.setLevel(logging.DEBUG) 271 272 clean_logger() 273 else: 274 self.log.setLevel(logging.WARNING) 275 clean_logger() 276 277 self.base_url = ( 278 host 279 if host 280 else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") 281 ) 282 283 self.httpx_client = httpx_client or httpx.Client(timeout=timeout) 284 285 self.client = FernLangfuse( 286 base_url=self.base_url, 287 username=public_key, 288 password=secret_key, 289 x_langfuse_sdk_name="python", 290 x_langfuse_sdk_version=version, 291 x_langfuse_public_key=public_key, 292 httpx_client=self.httpx_client, 293 ) 294 295 langfuse_client = LangfuseClient( 296 public_key=public_key, 297 secret_key=secret_key, 298 base_url=self.base_url, 299 version=version, 300 timeout=timeout, 301 session=self.httpx_client, 302 ) 303 304 args = { 305 "threads": threads, 306 "flush_at": flush_at, 307 "flush_interval": flush_interval, 308 "max_retries": max_retries, 309 "client": langfuse_client, 310 "public_key": public_key, 311 "sdk_name": "python", 312 "sdk_version": version, 313 "sdk_integration": sdk_integration, 314 "enabled": self.enabled, 315 "sample_rate": sample_rate, 316 } 317 318 self.task_manager = TaskManager(**args) 319 320 self.trace_id = None 321 322 self.release = self._get_release_value(release) 323 324 self.prompt_cache = PromptCache() 325 326 def _get_release_value(self, release: Optional[str] = None) -> Optional[str]: 327 if release: 328 return release 329 elif "LANGFUSE_RELEASE" in os.environ: 330 return os.environ["LANGFUSE_RELEASE"] 331 else: 332 return get_common_release_envs() 333 334 def get_trace_id(self) -> str: 335 """Get the current trace id.""" 336 return self.trace_id 337 338 def get_trace_url(self) -> str: 339 """Get the URL of the current trace to view it in the Langfuse UI.""" 340 return f"{self.base_url}/trace/{self.trace_id}" 341 342 def get_dataset( 343 self, name: str, *, fetch_items_page_size: Optional[int] = 50 344 ) -> "DatasetClient": 345 """Fetch a dataset by its name. 346 347 Args: 348 name (str): The name of the dataset to fetch. 349 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 350 351 Returns: 352 DatasetClient: The dataset with the given name. 353 """ 354 try: 355 self.log.debug(f"Getting datasets {name}") 356 dataset = self.client.datasets.get(dataset_name=name) 357 358 dataset_items = [] 359 page = 1 360 while True: 361 new_items = self.client.dataset_items.list( 362 dataset_name=name, page=page, limit=fetch_items_page_size 363 ) 364 dataset_items.extend(new_items.data) 365 if new_items.meta.total_pages <= page: 366 break 367 page += 1 368 369 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 370 371 return DatasetClient(dataset, items=items) 372 except Exception as e: 373 handle_fern_exception(e) 374 raise e 375 376 def get_dataset_item(self, id: str) -> "DatasetItemClient": 377 """Get the dataset item with the given id.""" 378 try: 379 self.log.debug(f"Getting dataset item {id}") 380 dataset_item = self.client.dataset_items.get(id=id) 381 return DatasetItemClient(dataset_item, langfuse=self) 382 except Exception as e: 383 handle_fern_exception(e) 384 raise e 385 386 def auth_check(self) -> bool: 387 """Check if the provided credentials (public and secret key) are valid. 388 389 Raises: 390 Exception: If no projects were found for the provided credentials. 391 392 Note: 393 This method is blocking. It is discouraged to use it in production code. 394 """ 395 try: 396 projects = self.client.projects.get() 397 self.log.debug( 398 f"Auth check successful, found {len(projects.data)} projects" 399 ) 400 if len(projects.data) == 0: 401 raise Exception( 402 "Auth check failed, no project found for the keys provided." 403 ) 404 return True 405 406 except Exception as e: 407 handle_fern_exception(e) 408 raise e 409 410 def get_dataset_runs( 411 self, 412 dataset_name: str, 413 *, 414 page: typing.Optional[int] = None, 415 limit: typing.Optional[int] = None, 416 ) -> PaginatedDatasetRuns: 417 """Get all dataset runs. 418 419 Args: 420 dataset_name (str): Name of the dataset. 421 page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None. 422 limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50. 423 424 Returns: 425 PaginatedDatasetRuns: The dataset runs. 426 """ 427 try: 428 self.log.debug("Getting dataset runs") 429 return self.client.datasets.get_runs( 430 dataset_name=dataset_name, page=page, limit=limit 431 ) 432 except Exception as e: 433 handle_fern_exception(e) 434 raise e 435 436 def get_dataset_run( 437 self, 438 dataset_name: str, 439 dataset_run_name: str, 440 ) -> DatasetRunWithItems: 441 """Get a dataset run. 442 443 Args: 444 dataset_name: Name of the dataset. 445 dataset_run_name: Name of the dataset run. 446 447 Returns: 448 DatasetRunWithItems: The dataset run. 449 """ 450 try: 451 self.log.debug( 452 f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}" 453 ) 454 return self.client.datasets.get_run( 455 dataset_name=dataset_name, run_name=dataset_run_name 456 ) 457 except Exception as e: 458 handle_fern_exception(e) 459 raise e 460 461 def create_dataset( 462 self, 463 name: str, 464 description: Optional[str] = None, 465 metadata: Optional[Any] = None, 466 ) -> Dataset: 467 """Create a dataset with the given name on Langfuse. 468 469 Args: 470 name: Name of the dataset to create. 471 description: Description of the dataset. Defaults to None. 472 metadata: Additional metadata. Defaults to None. 473 474 Returns: 475 Dataset: The created dataset as returned by the Langfuse API. 476 """ 477 try: 478 body = CreateDatasetRequest( 479 name=name, description=description, metadata=metadata 480 ) 481 self.log.debug(f"Creating datasets {body}") 482 return self.client.datasets.create(request=body) 483 except Exception as e: 484 handle_fern_exception(e) 485 raise e 486 487 def create_dataset_item( 488 self, 489 dataset_name: str, 490 input: Optional[Any] = None, 491 expected_output: Optional[Any] = None, 492 metadata: Optional[Any] = None, 493 source_trace_id: Optional[str] = None, 494 source_observation_id: Optional[str] = None, 495 status: Optional[DatasetStatus] = None, 496 id: Optional[str] = None, 497 ) -> DatasetItem: 498 """Create a dataset item. 499 500 Upserts if an item with id already exists. 501 502 Args: 503 dataset_name: Name of the dataset in which the dataset item should be created. 504 input: Input data. Defaults to None. Can contain any dict, list or scalar. 505 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 506 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 507 source_trace_id: Id of the source trace. Defaults to None. 508 source_observation_id: Id of the source observation. Defaults to None. 509 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 510 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 511 512 Returns: 513 DatasetItem: The created dataset item as returned by the Langfuse API. 514 515 Example: 516 ```python 517 from langfuse import Langfuse 518 519 langfuse = Langfuse() 520 521 # Uploading items to the Langfuse dataset named "capital_cities" 522 langfuse.create_dataset_item( 523 dataset_name="capital_cities", 524 input={"input": {"country": "Italy"}}, 525 expected_output={"expected_output": "Rome"}, 526 metadata={"foo": "bar"} 527 ) 528 ``` 529 """ 530 try: 531 body = CreateDatasetItemRequest( 532 datasetName=dataset_name, 533 input=input, 534 expectedOutput=expected_output, 535 metadata=metadata, 536 sourceTraceId=source_trace_id, 537 sourceObservationId=source_observation_id, 538 status=status, 539 id=id, 540 ) 541 self.log.debug(f"Creating dataset item {body}") 542 return self.client.dataset_items.create(request=body) 543 except Exception as e: 544 handle_fern_exception(e) 545 raise e 546 547 def fetch_trace( 548 self, 549 id: str, 550 ) -> FetchTraceResponse: 551 """Fetch a trace via the Langfuse API by its id. 552 553 Args: 554 id: The id of the trace to fetch. 555 556 Returns: 557 FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`. 558 559 Raises: 560 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 561 """ 562 try: 563 self.log.debug(f"Getting trace {id}") 564 trace = self.client.trace.get(id) 565 return FetchTraceResponse(data=trace) 566 except Exception as e: 567 handle_fern_exception(e) 568 raise e 569 570 def get_trace( 571 self, 572 id: str, 573 ) -> TraceWithFullDetails: 574 """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead. 575 576 Args: 577 id: The id of the trace to fetch. 578 579 Returns: 580 TraceWithFullDetails: The trace with full details as returned by the Langfuse API. 581 582 Raises: 583 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 584 """ 585 warnings.warn( 586 "get_trace is deprecated, use fetch_trace instead.", 587 DeprecationWarning, 588 ) 589 590 try: 591 self.log.debug(f"Getting trace {id}") 592 return self.client.trace.get(id) 593 except Exception as e: 594 handle_fern_exception(e) 595 raise e 596 597 def fetch_traces( 598 self, 599 *, 600 page: Optional[int] = None, 601 limit: Optional[int] = None, 602 user_id: Optional[str] = None, 603 name: Optional[str] = None, 604 session_id: Optional[str] = None, 605 from_timestamp: Optional[dt.datetime] = None, 606 to_timestamp: Optional[dt.datetime] = None, 607 order_by: Optional[str] = None, 608 tags: Optional[Union[str, Sequence[str]]] = None, 609 ) -> FetchTracesResponse: 610 """Fetch a list of traces in the current project matching the given parameters. 611 612 Args: 613 page (Optional[int]): Page number, starts at 1. Defaults to None. 614 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 615 name (Optional[str]): Filter by name of traces. Defaults to None. 616 user_id (Optional[str]): Filter by user_id. Defaults to None. 617 session_id (Optional[str]): Filter by session_id. Defaults to None. 618 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 619 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 620 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 621 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 622 623 Returns: 624 FetchTracesResponse, list of traces on `data` and metadata on `meta`. 625 626 Raises: 627 Exception: If an error occurred during the request. 628 """ 629 try: 630 self.log.debug( 631 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 632 ) 633 res = self.client.trace.list( 634 page=page, 635 limit=limit, 636 name=name, 637 user_id=user_id, 638 session_id=session_id, 639 from_timestamp=from_timestamp, 640 to_timestamp=to_timestamp, 641 order_by=order_by, 642 tags=tags, 643 ) 644 return FetchTracesResponse(data=res.data, meta=res.meta) 645 except Exception as e: 646 handle_fern_exception(e) 647 raise e 648 649 def get_traces( 650 self, 651 *, 652 page: Optional[int] = None, 653 limit: Optional[int] = None, 654 user_id: Optional[str] = None, 655 name: Optional[str] = None, 656 session_id: Optional[str] = None, 657 from_timestamp: Optional[dt.datetime] = None, 658 to_timestamp: Optional[dt.datetime] = None, 659 order_by: Optional[str] = None, 660 tags: Optional[Union[str, Sequence[str]]] = None, 661 ) -> Traces: 662 """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead. 663 664 Args: 665 page (Optional[int]): Page number, starts at 1. Defaults to None. 666 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 667 name (Optional[str]): Filter by name of traces. Defaults to None. 668 user_id (Optional[str]): Filter by user_id. Defaults to None. 669 session_id (Optional[str]): Filter by session_id. Defaults to None. 670 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 671 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 672 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 673 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 674 675 Returns: 676 List of Traces 677 678 Raises: 679 Exception: If an error occurred during the request. 680 """ 681 warnings.warn( 682 "get_traces is deprecated, use fetch_traces instead.", 683 DeprecationWarning, 684 ) 685 try: 686 self.log.debug( 687 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 688 ) 689 return self.client.trace.list( 690 page=page, 691 limit=limit, 692 name=name, 693 user_id=user_id, 694 session_id=session_id, 695 from_timestamp=from_timestamp, 696 to_timestamp=to_timestamp, 697 order_by=order_by, 698 tags=tags, 699 ) 700 except Exception as e: 701 handle_fern_exception(e) 702 raise e 703 704 def fetch_observations( 705 self, 706 *, 707 page: typing.Optional[int] = None, 708 limit: typing.Optional[int] = None, 709 name: typing.Optional[str] = None, 710 user_id: typing.Optional[str] = None, 711 trace_id: typing.Optional[str] = None, 712 parent_observation_id: typing.Optional[str] = None, 713 from_start_time: typing.Optional[dt.datetime] = None, 714 to_start_time: typing.Optional[dt.datetime] = None, 715 type: typing.Optional[str] = None, 716 ) -> FetchObservationsResponse: 717 """Get a list of observations in the current project matching the given parameters. 718 719 Args: 720 page (Optional[int]): Page number of the observations to return. Defaults to None. 721 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 722 name (Optional[str]): Name of the observations to return. Defaults to None. 723 user_id (Optional[str]): User identifier. Defaults to None. 724 trace_id (Optional[str]): Trace identifier. Defaults to None. 725 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 726 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 727 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 728 type (Optional[str]): Type of the observation. Defaults to None. 729 730 Returns: 731 FetchObservationsResponse, list of observations on `data` and metadata on `meta`. 732 733 Raises: 734 Exception: If an error occurred during the request. 735 """ 736 try: 737 self.log.debug( 738 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 739 ) 740 res = self.client.observations.get_many( 741 page=page, 742 limit=limit, 743 name=name, 744 user_id=user_id, 745 trace_id=trace_id, 746 parent_observation_id=parent_observation_id, 747 from_start_time=from_start_time, 748 to_start_time=to_start_time, 749 type=type, 750 ) 751 return FetchObservationsResponse(data=res.data, meta=res.meta) 752 except Exception as e: 753 self.log.exception(e) 754 raise e 755 756 def get_observations( 757 self, 758 *, 759 page: typing.Optional[int] = None, 760 limit: typing.Optional[int] = None, 761 name: typing.Optional[str] = None, 762 user_id: typing.Optional[str] = None, 763 trace_id: typing.Optional[str] = None, 764 parent_observation_id: typing.Optional[str] = None, 765 from_start_time: typing.Optional[dt.datetime] = None, 766 to_start_time: typing.Optional[dt.datetime] = None, 767 type: typing.Optional[str] = None, 768 ) -> ObservationsViews: 769 """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead. 770 771 Args: 772 page (Optional[int]): Page number of the observations to return. Defaults to None. 773 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 774 name (Optional[str]): Name of the observations to return. Defaults to None. 775 user_id (Optional[str]): User identifier. Defaults to None. 776 trace_id (Optional[str]): Trace identifier. Defaults to None. 777 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 778 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 779 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 780 type (Optional[str]): Type of the observation. Defaults to None. 781 782 Returns: 783 List of ObservationsViews: List of observations in the project matching the given parameters. 784 785 Raises: 786 Exception: If an error occurred during the request. 787 """ 788 warnings.warn( 789 "get_observations is deprecated, use fetch_observations instead.", 790 DeprecationWarning, 791 ) 792 try: 793 self.log.debug( 794 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 795 ) 796 return self.client.observations.get_many( 797 page=page, 798 limit=limit, 799 name=name, 800 user_id=user_id, 801 trace_id=trace_id, 802 parent_observation_id=parent_observation_id, 803 from_start_time=from_start_time, 804 to_start_time=to_start_time, 805 type=type, 806 ) 807 except Exception as e: 808 handle_fern_exception(e) 809 raise e 810 811 def get_generations( 812 self, 813 *, 814 page: typing.Optional[int] = None, 815 limit: typing.Optional[int] = None, 816 name: typing.Optional[str] = None, 817 user_id: typing.Optional[str] = None, 818 trace_id: typing.Optional[str] = None, 819 from_start_time: typing.Optional[dt.datetime] = None, 820 to_start_time: typing.Optional[dt.datetime] = None, 821 parent_observation_id: typing.Optional[str] = None, 822 ) -> ObservationsViews: 823 """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead. 824 825 Args: 826 page (Optional[int]): Page number of the generations to return. Defaults to None. 827 limit (Optional[int]): Maximum number of generations to return. Defaults to None. 828 name (Optional[str]): Name of the generations to return. Defaults to None. 829 user_id (Optional[str]): User identifier of the generations to return. Defaults to None. 830 trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None. 831 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 832 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 833 parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None. 834 835 Returns: 836 List of ObservationsViews: List of generations in the project matching the given parameters. 837 838 Raises: 839 Exception: If an error occurred during the request. 840 """ 841 warnings.warn( 842 "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.", 843 DeprecationWarning, 844 ) 845 return self.get_observations( 846 page=page, 847 limit=limit, 848 name=name, 849 user_id=user_id, 850 trace_id=trace_id, 851 parent_observation_id=parent_observation_id, 852 from_start_time=from_start_time, 853 to_start_time=to_start_time, 854 type="GENERATION", 855 ) 856 857 def fetch_observation( 858 self, 859 id: str, 860 ) -> FetchObservationResponse: 861 """Get an observation in the current project with the given identifier. 862 863 Args: 864 id: The identifier of the observation to fetch. 865 866 Returns: 867 FetchObservationResponse: The observation with the given id on `data`. 868 869 Raises: 870 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 871 """ 872 try: 873 self.log.debug(f"Getting observation {id}") 874 observation = self.client.observations.get(id) 875 return FetchObservationResponse(data=observation) 876 except Exception as e: 877 handle_fern_exception(e) 878 raise e 879 880 def get_observation( 881 self, 882 id: str, 883 ) -> Observation: 884 """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead. 885 886 Args: 887 id: The identifier of the observation to fetch. 888 889 Raises: 890 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 891 """ 892 warnings.warn( 893 "get_observation is deprecated, use fetch_observation instead.", 894 DeprecationWarning, 895 ) 896 try: 897 self.log.debug(f"Getting observation {id}") 898 return self.client.observations.get(id) 899 except Exception as e: 900 handle_fern_exception(e) 901 raise e 902 903 def fetch_sessions( 904 self, 905 *, 906 page: typing.Optional[int] = None, 907 limit: typing.Optional[int] = None, 908 from_timestamp: typing.Optional[dt.datetime] = None, 909 to_timestamp: typing.Optional[dt.datetime] = None, 910 ) -> FetchSessionsResponse: 911 """Get a list of sessions in the current project. 912 913 Args: 914 page (Optional[int]): Page number of the sessions to return. Defaults to None. 915 limit (Optional[int]): Maximum number of sessions to return. Defaults to None. 916 from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None. 917 to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None. 918 919 Returns: 920 FetchSessionsResponse, list of sessions on `data` and metadata on `meta`. 921 922 Raises: 923 Exception: If an error occurred during the request. 924 """ 925 try: 926 self.log.debug( 927 f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}" 928 ) 929 res = self.client.sessions.list( 930 page=page, 931 limit=limit, 932 from_timestamp=from_timestamp, 933 to_timestamp=to_timestamp, 934 ) 935 return FetchSessionsResponse(data=res.data, meta=res.meta) 936 except Exception as e: 937 handle_fern_exception(e) 938 raise e 939 940 @overload 941 def get_prompt( 942 self, 943 name: str, 944 version: Optional[int] = None, 945 *, 946 label: Optional[str] = None, 947 type: Literal["chat"], 948 cache_ttl_seconds: Optional[int] = None, 949 fallback: Optional[List[ChatMessageDict]] = None, 950 max_retries: Optional[int] = None, 951 fetch_timeout_seconds: Optional[int] = None, 952 ) -> ChatPromptClient: ... 953 954 @overload 955 def get_prompt( 956 self, 957 name: str, 958 version: Optional[int] = None, 959 *, 960 label: Optional[str] = None, 961 type: Literal["text"] = "text", 962 cache_ttl_seconds: Optional[int] = None, 963 fallback: Optional[str] = None, 964 max_retries: Optional[int] = None, 965 fetch_timeout_seconds: Optional[int] = None, 966 ) -> TextPromptClient: ... 967 968 def get_prompt( 969 self, 970 name: str, 971 version: Optional[int] = None, 972 *, 973 label: Optional[str] = None, 974 type: Literal["chat", "text"] = "text", 975 cache_ttl_seconds: Optional[int] = None, 976 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 977 max_retries: Optional[int] = None, 978 fetch_timeout_seconds: Optional[int] = None, 979 ) -> PromptClient: 980 """Get a prompt. 981 982 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 983 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 984 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 985 return the expired prompt as a fallback. 986 987 Args: 988 name (str): The name of the prompt to retrieve. 989 990 Keyword Args: 991 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 992 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 993 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 994 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 995 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 996 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 997 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 998 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default. 999 1000 Returns: 1001 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 1002 - TextPromptClient, if type argument is 'text'. 1003 - ChatPromptClient, if type argument is 'chat'. 1004 1005 Raises: 1006 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 1007 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 1008 """ 1009 if version is not None and label is not None: 1010 raise ValueError("Cannot specify both version and label at the same time.") 1011 1012 if not name: 1013 raise ValueError("Prompt name cannot be empty.") 1014 1015 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 1016 bounded_max_retries = self._get_bounded_max_retries( 1017 max_retries, default_max_retries=2, max_retries_upper_bound=4 1018 ) 1019 1020 self.log.debug(f"Getting prompt '{cache_key}'") 1021 cached_prompt = self.prompt_cache.get(cache_key) 1022 1023 if cached_prompt is None or cache_ttl_seconds == 0: 1024 self.log.debug( 1025 f"Prompt '{cache_key}' not found in cache or caching disabled." 1026 ) 1027 try: 1028 return self._fetch_prompt_and_update_cache( 1029 name, 1030 version=version, 1031 label=label, 1032 ttl_seconds=cache_ttl_seconds, 1033 max_retries=bounded_max_retries, 1034 fetch_timeout_seconds=fetch_timeout_seconds, 1035 ) 1036 except Exception as e: 1037 if fallback: 1038 self.log.warning( 1039 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 1040 ) 1041 1042 fallback_client_args = { 1043 "name": name, 1044 "prompt": fallback, 1045 "type": type, 1046 "version": version or 0, 1047 "config": {}, 1048 "labels": [label] if label else [], 1049 "tags": [], 1050 } 1051 1052 if type == "text": 1053 return TextPromptClient( 1054 prompt=Prompt_Text(**fallback_client_args), 1055 is_fallback=True, 1056 ) 1057 1058 if type == "chat": 1059 return ChatPromptClient( 1060 prompt=Prompt_Chat(**fallback_client_args), 1061 is_fallback=True, 1062 ) 1063 1064 raise e 1065 1066 if cached_prompt.is_expired(): 1067 self.log.debug(f"Stale prompt '{cache_key}' found in cache.") 1068 try: 1069 # refresh prompt in background thread, refresh_prompt deduplicates tasks 1070 self.log.debug(f"Refreshing prompt '{cache_key}' in background.") 1071 self.prompt_cache.add_refresh_prompt_task( 1072 cache_key, 1073 lambda: self._fetch_prompt_and_update_cache( 1074 name, 1075 version=version, 1076 label=label, 1077 ttl_seconds=cache_ttl_seconds, 1078 max_retries=bounded_max_retries, 1079 fetch_timeout_seconds=fetch_timeout_seconds, 1080 ), 1081 ) 1082 self.log.debug(f"Returning stale prompt '{cache_key}' from cache.") 1083 # return stale prompt 1084 return cached_prompt.value 1085 1086 except Exception as e: 1087 self.log.warning( 1088 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 1089 ) 1090 # creation of refresh prompt task failed, return stale prompt 1091 return cached_prompt.value 1092 1093 return cached_prompt.value 1094 1095 def _fetch_prompt_and_update_cache( 1096 self, 1097 name: str, 1098 *, 1099 version: Optional[int] = None, 1100 label: Optional[str] = None, 1101 ttl_seconds: Optional[int] = None, 1102 max_retries: int, 1103 fetch_timeout_seconds, 1104 ) -> PromptClient: 1105 try: 1106 cache_key = PromptCache.generate_cache_key( 1107 name, version=version, label=label 1108 ) 1109 1110 self.log.debug(f"Fetching prompt '{cache_key}' from server...") 1111 1112 @backoff.on_exception( 1113 backoff.constant, Exception, max_tries=max_retries, logger=None 1114 ) 1115 def fetch_prompts(): 1116 return self.client.prompts.get( 1117 self._url_encode(name), 1118 version=version, 1119 label=label, 1120 request_options={ 1121 "timeout_in_seconds": fetch_timeout_seconds, 1122 } 1123 if fetch_timeout_seconds is not None 1124 else None, 1125 ) 1126 1127 prompt_response = fetch_prompts() 1128 1129 if prompt_response.type == "chat": 1130 prompt = ChatPromptClient(prompt_response) 1131 else: 1132 prompt = TextPromptClient(prompt_response) 1133 1134 self.prompt_cache.set(cache_key, prompt, ttl_seconds) 1135 1136 return prompt 1137 1138 except Exception as e: 1139 self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}") 1140 raise e 1141 1142 def _get_bounded_max_retries( 1143 self, 1144 max_retries: Optional[int], 1145 *, 1146 default_max_retries: int = 2, 1147 max_retries_upper_bound: int = 4, 1148 ) -> int: 1149 if max_retries is None: 1150 return default_max_retries 1151 1152 bounded_max_retries = min( 1153 max(max_retries, 0), 1154 max_retries_upper_bound, 1155 ) 1156 1157 return bounded_max_retries 1158 1159 @overload 1160 def create_prompt( 1161 self, 1162 *, 1163 name: str, 1164 prompt: List[ChatMessageDict], 1165 is_active: Optional[bool] = None, # deprecated 1166 labels: List[str] = [], 1167 tags: Optional[List[str]] = None, 1168 type: Optional[Literal["chat"]], 1169 config: Optional[Any] = None, 1170 ) -> ChatPromptClient: ... 1171 1172 @overload 1173 def create_prompt( 1174 self, 1175 *, 1176 name: str, 1177 prompt: str, 1178 is_active: Optional[bool] = None, # deprecated 1179 labels: List[str] = [], 1180 tags: Optional[List[str]] = None, 1181 type: Optional[Literal["text"]] = "text", 1182 config: Optional[Any] = None, 1183 ) -> TextPromptClient: ... 1184 1185 def create_prompt( 1186 self, 1187 *, 1188 name: str, 1189 prompt: Union[str, List[ChatMessageDict]], 1190 is_active: Optional[bool] = None, # deprecated 1191 labels: List[str] = [], 1192 tags: Optional[List[str]] = None, 1193 type: Optional[Literal["chat", "text"]] = "text", 1194 config: Optional[Any] = None, 1195 ) -> PromptClient: 1196 """Create a new prompt in Langfuse. 1197 1198 Keyword Args: 1199 name : The name of the prompt to be created. 1200 prompt : The content of the prompt to be created. 1201 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 1202 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 1203 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 1204 config: Additional structured data to be saved with the prompt. Defaults to None. 1205 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 1206 1207 Returns: 1208 TextPromptClient: The prompt if type argument is 'text'. 1209 ChatPromptClient: The prompt if type argument is 'chat'. 1210 """ 1211 try: 1212 self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}") 1213 1214 # Handle deprecated is_active flag 1215 if is_active: 1216 self.log.warning( 1217 "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead." 1218 ) 1219 1220 labels = labels if "production" in labels else labels + ["production"] 1221 1222 if type == "chat": 1223 if not isinstance(prompt, list): 1224 raise ValueError( 1225 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 1226 ) 1227 request = CreatePromptRequest_Chat( 1228 name=name, 1229 prompt=prompt, 1230 labels=labels, 1231 tags=tags, 1232 config=config or {}, 1233 type="chat", 1234 ) 1235 server_prompt = self.client.prompts.create(request=request) 1236 1237 return ChatPromptClient(prompt=server_prompt) 1238 1239 if not isinstance(prompt, str): 1240 raise ValueError("For 'text' type, 'prompt' must be a string.") 1241 1242 request = CreatePromptRequest_Text( 1243 name=name, 1244 prompt=prompt, 1245 labels=labels, 1246 tags=tags, 1247 config=config or {}, 1248 type="text", 1249 ) 1250 1251 server_prompt = self.client.prompts.create(request=request) 1252 return TextPromptClient(prompt=server_prompt) 1253 1254 except Exception as e: 1255 handle_fern_exception(e) 1256 raise e 1257 1258 def _url_encode(self, url: str) -> str: 1259 return urllib.parse.quote(url) 1260 1261 def trace( 1262 self, 1263 *, 1264 id: typing.Optional[str] = None, 1265 name: typing.Optional[str] = None, 1266 user_id: typing.Optional[str] = None, 1267 session_id: typing.Optional[str] = None, 1268 version: typing.Optional[str] = None, 1269 input: typing.Optional[typing.Any] = None, 1270 output: typing.Optional[typing.Any] = None, 1271 metadata: typing.Optional[typing.Any] = None, 1272 tags: typing.Optional[typing.List[str]] = None, 1273 timestamp: typing.Optional[dt.datetime] = None, 1274 public: typing.Optional[bool] = None, 1275 **kwargs, 1276 ) -> "StatefulTraceClient": 1277 """Create a trace. 1278 1279 Args: 1280 id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id. 1281 name: Identifier of the trace. Useful for sorting/filtering in the UI. 1282 input: The input of the trace. Can be any JSON object. 1283 output: The output of the trace. Can be any JSON object. 1284 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 1285 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 1286 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 1287 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 1288 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 1289 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 1290 timestamp: The timestamp of the trace. Defaults to the current time if not provided. 1291 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 1292 **kwargs: Additional keyword arguments that can be included in the trace. 1293 1294 Returns: 1295 StatefulTraceClient: The created trace. 1296 1297 Example: 1298 ```python 1299 from langfuse import Langfuse 1300 1301 langfuse = Langfuse() 1302 1303 trace = langfuse.trace( 1304 name="example-application", 1305 user_id="user-1234") 1306 ) 1307 ``` 1308 """ 1309 new_id = id or str(uuid.uuid4()) 1310 self.trace_id = new_id 1311 try: 1312 new_dict = { 1313 "id": new_id, 1314 "name": name, 1315 "userId": user_id, 1316 "sessionId": session_id 1317 or kwargs.get("sessionId", None), # backward compatibility 1318 "release": self.release, 1319 "version": version, 1320 "metadata": metadata, 1321 "input": input, 1322 "output": output, 1323 "tags": tags, 1324 "timestamp": timestamp or _get_timestamp(), 1325 "public": public, 1326 } 1327 if kwargs is not None: 1328 new_dict.update(kwargs) 1329 1330 new_body = TraceBody(**new_dict) 1331 1332 self.log.debug(f"Creating trace {new_body}") 1333 event = { 1334 "id": str(uuid.uuid4()), 1335 "type": "trace-create", 1336 "body": new_body.dict(exclude_none=True), 1337 } 1338 1339 self.task_manager.add_task( 1340 event, 1341 ) 1342 1343 except Exception as e: 1344 self.log.exception(e) 1345 finally: 1346 self._log_memory_usage() 1347 1348 return StatefulTraceClient( 1349 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1350 ) 1351 1352 def _log_memory_usage(self): 1353 try: 1354 is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0))) 1355 report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0)) 1356 top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10)) 1357 1358 if ( 1359 not is_malloc_tracing_enabled 1360 or report_interval <= 0 1361 or round(time.monotonic()) % report_interval != 0 1362 ): 1363 return 1364 1365 snapshot = tracemalloc.take_snapshot().statistics("lineno") 1366 1367 total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024 1368 memory_usage_total_items = [f"{stat}" for stat in snapshot] 1369 memory_usage_langfuse_items = [ 1370 stat for stat in memory_usage_total_items if "/langfuse/" in stat 1371 ] 1372 1373 logged_memory_usage = { 1374 "all_files": [f"{stat}" for stat in memory_usage_total_items][ 1375 :top_k_items 1376 ], 1377 "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][ 1378 :top_k_items 1379 ], 1380 "total_usage": f"{total_memory_usage:.2f} MB", 1381 "langfuse_queue_length": self.task_manager._queue.qsize(), 1382 } 1383 1384 self.log.debug<