langfuse.client
1import datetime as dt 2import logging 3import os 4import time 5import tracemalloc 6import typing 7import urllib.parse 8import uuid 9import warnings 10from contextlib import contextmanager 11from dataclasses import dataclass 12from enum import Enum 13from typing import Any, Dict, List, Literal, Optional, Sequence, Union, overload 14 15import backoff 16import httpx 17 18from langfuse.api.resources.commons.types.dataset_run_with_items import ( 19 DatasetRunWithItems, 20) 21from langfuse.api.resources.commons.types.observations_view import ObservationsView 22from langfuse.api.resources.commons.types.session import Session 23from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails 24from langfuse.api.resources.datasets.types.paginated_dataset_runs import ( 25 PaginatedDatasetRuns, 26) 27from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody 28from langfuse.api.resources.ingestion.types.create_generation_body import ( 29 CreateGenerationBody, 30) 31from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody 32from langfuse.api.resources.ingestion.types.score_body import ScoreBody 33from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody 34from langfuse.api.resources.ingestion.types.trace_body import TraceBody 35from langfuse.api.resources.ingestion.types.update_generation_body import ( 36 UpdateGenerationBody, 37) 38from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody 39from langfuse.api.resources.observations.types.observations_views import ( 40 ObservationsViews, 41) 42from langfuse.api.resources.prompts.types import ( 43 CreatePromptRequest_Chat, 44 CreatePromptRequest_Text, 45 Prompt_Chat, 46 Prompt_Text, 47) 48from langfuse.api.resources.trace.types.traces import Traces 49from langfuse.api.resources.utils.resources.pagination.types.meta_response import ( 50 MetaResponse, 51) 52from langfuse.model import ( 53 ChatMessageDict, 54 ChatPromptClient, 55 CreateDatasetItemRequest, 56 CreateDatasetRequest, 57 CreateDatasetRunItemRequest, 58 DatasetItem, 59 DatasetStatus, 60 ModelUsage, 61 PromptClient, 62 TextPromptClient, 63) 64from langfuse.parse_error import handle_fern_exception 65from langfuse.prompt_cache import PromptCache 66 67try: 68 import pydantic.v1 as pydantic # type: ignore 69except ImportError: 70 import pydantic # type: ignore 71 72from langfuse._task_manager.task_manager import TaskManager 73from langfuse.api.client import FernLangfuse 74from langfuse.environment import get_common_release_envs 75from langfuse.logging import clean_logger 76from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails 77from langfuse.request import LangfuseClient 78from langfuse.types import MaskFunction, ScoreDataType, SpanLevel 79from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp 80 81from .version import __version__ as version 82 83 84@dataclass 85class FetchTracesResponse: 86 """Response object for fetch_traces method.""" 87 88 data: typing.List[TraceWithDetails] 89 meta: MetaResponse 90 91 92@dataclass 93class FetchTraceResponse: 94 """Response object for fetch_trace method.""" 95 96 data: TraceWithFullDetails 97 98 99@dataclass 100class FetchObservationsResponse: 101 """Response object for fetch_observations method.""" 102 103 data: typing.List[ObservationsView] 104 meta: MetaResponse 105 106 107@dataclass 108class FetchObservationResponse: 109 """Response object for fetch_observation method.""" 110 111 data: Observation 112 113 114@dataclass 115class FetchSessionsResponse: 116 """Response object for fetch_sessions method.""" 117 118 data: typing.List[Session] 119 meta: MetaResponse 120 121 122class Langfuse(object): 123 """Langfuse Python client. 124 125 Attributes: 126 log (logging.Logger): Logger for the Langfuse client. 127 base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction. 128 httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API. 129 client (FernLangfuse): Core interface for Langfuse API interaction. 130 task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks. 131 release (str): Identifies the release number or hash of the application. 132 prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances. 133 134 Example: 135 Initiating the Langfuse client should always be first step to use Langfuse. 136 ```python 137 import os 138 from langfuse import Langfuse 139 140 # Set the public and secret keys as environment variables 141 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 142 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 143 144 # Initialize the Langfuse client using the credentials 145 langfuse = Langfuse() 146 ``` 147 """ 148 149 log = logging.getLogger("langfuse") 150 """Logger for the Langfuse client.""" 151 152 host: str 153 """Host of Langfuse API.""" 154 155 def __init__( 156 self, 157 public_key: Optional[str] = None, 158 secret_key: Optional[str] = None, 159 host: Optional[str] = None, 160 release: Optional[str] = None, 161 debug: bool = False, 162 threads: Optional[int] = None, 163 flush_at: Optional[int] = None, 164 flush_interval: Optional[float] = None, 165 max_retries: Optional[int] = None, 166 timeout: Optional[int] = None, # seconds 167 sdk_integration: Optional[str] = "default", 168 httpx_client: Optional[httpx.Client] = None, 169 enabled: Optional[bool] = True, 170 sample_rate: Optional[float] = None, 171 mask: Optional[MaskFunction] = None, 172 ): 173 """Initialize the Langfuse client. 174 175 Args: 176 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 177 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 178 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 179 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 180 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 181 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 182 flush_at: Max batch size that's sent to the API. 183 flush_interval: Max delay until a new batch is sent to the API. 184 max_retries: Max number of retries in case of API/network errors. 185 timeout: Timeout of API requests in seconds. Defaults to 20 seconds. 186 httpx_client: Pass your own httpx client for more customizability of requests. 187 sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly. 188 enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops. 189 sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable. 190 mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data. 191 192 Raises: 193 ValueError: If public_key or secret_key are not set and not found in environment variables. 194 195 Example: 196 Initiating the Langfuse client should always be first step to use Langfuse. 197 ```python 198 import os 199 from langfuse import Langfuse 200 201 # Set the public and secret keys as environment variables 202 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 203 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 204 205 # Initialize the Langfuse client using the credentials 206 langfuse = Langfuse() 207 ``` 208 """ 209 self.enabled = enabled 210 public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 211 secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 212 sample_rate = ( 213 sample_rate 214 if sample_rate 215 is not None # needs explicit None check, as 0 is a valid value 216 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 217 ) 218 219 if sample_rate is not None and ( 220 sample_rate > 1 or sample_rate < 0 221 ): # default value 1 will be set in the taskmanager 222 self.enabled = False 223 self.log.warning( 224 "Langfuse client is disabled since the sample rate provided is not between 0 and 1." 225 ) 226 227 threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1)) 228 flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15)) 229 flush_interval = flush_interval or float( 230 os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5) 231 ) 232 233 max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3)) 234 timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20)) 235 236 if not self.enabled: 237 self.log.warning( 238 "Langfuse client is disabled. No observability data will be sent." 239 ) 240 241 elif not public_key: 242 self.enabled = False 243 self.log.warning( 244 "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 245 ) 246 247 elif not secret_key: 248 self.enabled = False 249 self.log.warning( 250 "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 251 ) 252 253 set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True") 254 255 if set_debug is True: 256 # Ensures that debug level messages are logged when debug mode is on. 257 # Otherwise, defaults to WARNING level. 258 # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided 259 logging.basicConfig() 260 # Set level for all loggers under langfuse package 261 logging.getLogger("langfuse").setLevel(logging.DEBUG) 262 263 clean_logger() 264 else: 265 logging.getLogger("langfuse").setLevel(logging.WARNING) 266 clean_logger() 267 268 self.base_url = ( 269 host 270 if host 271 else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") 272 ) 273 274 self.httpx_client = httpx_client or httpx.Client(timeout=timeout) 275 276 self.client = FernLangfuse( 277 base_url=self.base_url, 278 username=public_key, 279 password=secret_key, 280 x_langfuse_sdk_name="python", 281 x_langfuse_sdk_version=version, 282 x_langfuse_public_key=public_key, 283 httpx_client=self.httpx_client, 284 ) 285 286 langfuse_client = LangfuseClient( 287 public_key=public_key, 288 secret_key=secret_key, 289 base_url=self.base_url, 290 version=version, 291 timeout=timeout, 292 session=self.httpx_client, 293 ) 294 295 args = { 296 "threads": threads, 297 "flush_at": flush_at, 298 "flush_interval": flush_interval, 299 "max_retries": max_retries, 300 "client": langfuse_client, 301 "api_client": self.client, 302 "public_key": public_key, 303 "sdk_name": "python", 304 "sdk_version": version, 305 "sdk_integration": sdk_integration, 306 "enabled": self.enabled, 307 "sample_rate": sample_rate, 308 "mask": mask, 309 } 310 311 self.task_manager = TaskManager(**args) 312 313 self.trace_id = None 314 315 self.release = self._get_release_value(release) 316 317 self.prompt_cache = PromptCache() 318 319 def _get_release_value(self, release: Optional[str] = None) -> Optional[str]: 320 if release: 321 return release 322 elif "LANGFUSE_RELEASE" in os.environ: 323 return os.environ["LANGFUSE_RELEASE"] 324 else: 325 return get_common_release_envs() 326 327 def get_trace_id(self) -> str: 328 """Get the current trace id.""" 329 return self.trace_id 330 331 def get_trace_url(self) -> str: 332 """Get the URL of the current trace to view it in the Langfuse UI.""" 333 return f"{self.base_url}/trace/{self.trace_id}" 334 335 def get_dataset( 336 self, name: str, *, fetch_items_page_size: Optional[int] = 50 337 ) -> "DatasetClient": 338 """Fetch a dataset by its name. 339 340 Args: 341 name (str): The name of the dataset to fetch. 342 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 343 344 Returns: 345 DatasetClient: The dataset with the given name. 346 """ 347 try: 348 self.log.debug(f"Getting datasets {name}") 349 dataset = self.client.datasets.get(dataset_name=name) 350 351 dataset_items = [] 352 page = 1 353 while True: 354 new_items = self.client.dataset_items.list( 355 dataset_name=self._url_encode(name), 356 page=page, 357 limit=fetch_items_page_size, 358 ) 359 dataset_items.extend(new_items.data) 360 if new_items.meta.total_pages <= page: 361 break 362 page += 1 363 364 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 365 366 return DatasetClient(dataset, items=items) 367 except Exception as e: 368 handle_fern_exception(e) 369 raise e 370 371 def get_dataset_item(self, id: str) -> "DatasetItemClient": 372 """Get the dataset item with the given id.""" 373 try: 374 self.log.debug(f"Getting dataset item {id}") 375 dataset_item = self.client.dataset_items.get(id=id) 376 return DatasetItemClient(dataset_item, langfuse=self) 377 except Exception as e: 378 handle_fern_exception(e) 379 raise e 380 381 def auth_check(self) -> bool: 382 """Check if the provided credentials (public and secret key) are valid. 383 384 Raises: 385 Exception: If no projects were found for the provided credentials. 386 387 Note: 388 This method is blocking. It is discouraged to use it in production code. 389 """ 390 try: 391 projects = self.client.projects.get() 392 self.log.debug( 393 f"Auth check successful, found {len(projects.data)} projects" 394 ) 395 if len(projects.data) == 0: 396 raise Exception( 397 "Auth check failed, no project found for the keys provided." 398 ) 399 return True 400 401 except Exception as e: 402 handle_fern_exception(e) 403 raise e 404 405 def get_dataset_runs( 406 self, 407 dataset_name: str, 408 *, 409 page: typing.Optional[int] = None, 410 limit: typing.Optional[int] = None, 411 ) -> PaginatedDatasetRuns: 412 """Get all dataset runs. 413 414 Args: 415 dataset_name (str): Name of the dataset. 416 page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None. 417 limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50. 418 419 Returns: 420 PaginatedDatasetRuns: The dataset runs. 421 """ 422 try: 423 self.log.debug("Getting dataset runs") 424 return self.client.datasets.get_runs( 425 dataset_name=self._url_encode(dataset_name), page=page, limit=limit 426 ) 427 except Exception as e: 428 handle_fern_exception(e) 429 raise e 430 431 def get_dataset_run( 432 self, 433 dataset_name: str, 434 dataset_run_name: str, 435 ) -> DatasetRunWithItems: 436 """Get a dataset run. 437 438 Args: 439 dataset_name: Name of the dataset. 440 dataset_run_name: Name of the dataset run. 441 442 Returns: 443 DatasetRunWithItems: The dataset run. 444 """ 445 try: 446 self.log.debug( 447 f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}" 448 ) 449 return self.client.datasets.get_run( 450 dataset_name=self._url_encode(dataset_name), 451 run_name=self._url_encode(dataset_run_name), 452 ) 453 except Exception as e: 454 handle_fern_exception(e) 455 raise e 456 457 def create_dataset( 458 self, 459 name: str, 460 description: Optional[str] = None, 461 metadata: Optional[Any] = None, 462 ) -> Dataset: 463 """Create a dataset with the given name on Langfuse. 464 465 Args: 466 name: Name of the dataset to create. 467 description: Description of the dataset. Defaults to None. 468 metadata: Additional metadata. Defaults to None. 469 470 Returns: 471 Dataset: The created dataset as returned by the Langfuse API. 472 """ 473 try: 474 body = CreateDatasetRequest( 475 name=name, description=description, metadata=metadata 476 ) 477 self.log.debug(f"Creating datasets {body}") 478 return self.client.datasets.create(request=body) 479 except Exception as e: 480 handle_fern_exception(e) 481 raise e 482 483 def create_dataset_item( 484 self, 485 dataset_name: str, 486 input: Optional[Any] = None, 487 expected_output: Optional[Any] = None, 488 metadata: Optional[Any] = None, 489 source_trace_id: Optional[str] = None, 490 source_observation_id: Optional[str] = None, 491 status: Optional[DatasetStatus] = None, 492 id: Optional[str] = None, 493 ) -> DatasetItem: 494 """Create a dataset item. 495 496 Upserts if an item with id already exists. 497 498 Args: 499 dataset_name: Name of the dataset in which the dataset item should be created. 500 input: Input data. Defaults to None. Can contain any dict, list or scalar. 501 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 502 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 503 source_trace_id: Id of the source trace. Defaults to None. 504 source_observation_id: Id of the source observation. Defaults to None. 505 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 506 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 507 508 Returns: 509 DatasetItem: The created dataset item as returned by the Langfuse API. 510 511 Example: 512 ```python 513 from langfuse import Langfuse 514 515 langfuse = Langfuse() 516 517 # Uploading items to the Langfuse dataset named "capital_cities" 518 langfuse.create_dataset_item( 519 dataset_name="capital_cities", 520 input={"input": {"country": "Italy"}}, 521 expected_output={"expected_output": "Rome"}, 522 metadata={"foo": "bar"} 523 ) 524 ``` 525 """ 526 try: 527 body = CreateDatasetItemRequest( 528 datasetName=dataset_name, 529 input=input, 530 expectedOutput=expected_output, 531 metadata=metadata, 532 sourceTraceId=source_trace_id, 533 sourceObservationId=source_observation_id, 534 status=status, 535 id=id, 536 ) 537 self.log.debug(f"Creating dataset item {body}") 538 return self.client.dataset_items.create(request=body) 539 except Exception as e: 540 handle_fern_exception(e) 541 raise e 542 543 def fetch_trace( 544 self, 545 id: str, 546 ) -> FetchTraceResponse: 547 """Fetch a trace via the Langfuse API by its id. 548 549 Args: 550 id: The id of the trace to fetch. 551 552 Returns: 553 FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`. 554 555 Raises: 556 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 557 """ 558 try: 559 self.log.debug(f"Getting trace {id}") 560 trace = self.client.trace.get(id) 561 return FetchTraceResponse(data=trace) 562 except Exception as e: 563 handle_fern_exception(e) 564 raise e 565 566 def get_trace( 567 self, 568 id: str, 569 ) -> TraceWithFullDetails: 570 """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead. 571 572 Args: 573 id: The id of the trace to fetch. 574 575 Returns: 576 TraceWithFullDetails: The trace with full details as returned by the Langfuse API. 577 578 Raises: 579 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 580 """ 581 warnings.warn( 582 "get_trace is deprecated, use fetch_trace instead.", 583 DeprecationWarning, 584 ) 585 586 try: 587 self.log.debug(f"Getting trace {id}") 588 return self.client.trace.get(id) 589 except Exception as e: 590 handle_fern_exception(e) 591 raise e 592 593 def fetch_traces( 594 self, 595 *, 596 page: Optional[int] = None, 597 limit: Optional[int] = None, 598 user_id: Optional[str] = None, 599 name: Optional[str] = None, 600 session_id: Optional[str] = None, 601 from_timestamp: Optional[dt.datetime] = None, 602 to_timestamp: Optional[dt.datetime] = None, 603 order_by: Optional[str] = None, 604 tags: Optional[Union[str, Sequence[str]]] = None, 605 ) -> FetchTracesResponse: 606 """Fetch a list of traces in the current project matching the given parameters. 607 608 Args: 609 page (Optional[int]): Page number, starts at 1. Defaults to None. 610 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 611 name (Optional[str]): Filter by name of traces. Defaults to None. 612 user_id (Optional[str]): Filter by user_id. Defaults to None. 613 session_id (Optional[str]): Filter by session_id. Defaults to None. 614 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 615 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 616 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 617 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 618 619 Returns: 620 FetchTracesResponse, list of traces on `data` and metadata on `meta`. 621 622 Raises: 623 Exception: If an error occurred during the request. 624 """ 625 try: 626 self.log.debug( 627 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 628 ) 629 res = self.client.trace.list( 630 page=page, 631 limit=limit, 632 name=name, 633 user_id=user_id, 634 session_id=session_id, 635 from_timestamp=from_timestamp, 636 to_timestamp=to_timestamp, 637 order_by=order_by, 638 tags=tags, 639 ) 640 return FetchTracesResponse(data=res.data, meta=res.meta) 641 except Exception as e: 642 handle_fern_exception(e) 643 raise e 644 645 def get_traces( 646 self, 647 *, 648 page: Optional[int] = None, 649 limit: Optional[int] = None, 650 user_id: Optional[str] = None, 651 name: Optional[str] = None, 652 session_id: Optional[str] = None, 653 from_timestamp: Optional[dt.datetime] = None, 654 to_timestamp: Optional[dt.datetime] = None, 655 order_by: Optional[str] = None, 656 tags: Optional[Union[str, Sequence[str]]] = None, 657 ) -> Traces: 658 """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead. 659 660 Args: 661 page (Optional[int]): Page number, starts at 1. Defaults to None. 662 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 663 name (Optional[str]): Filter by name of traces. Defaults to None. 664 user_id (Optional[str]): Filter by user_id. Defaults to None. 665 session_id (Optional[str]): Filter by session_id. Defaults to None. 666 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 667 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 668 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 669 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 670 671 Returns: 672 List of Traces 673 674 Raises: 675 Exception: If an error occurred during the request. 676 """ 677 warnings.warn( 678 "get_traces is deprecated, use fetch_traces instead.", 679 DeprecationWarning, 680 ) 681 try: 682 self.log.debug( 683 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 684 ) 685 return self.client.trace.list( 686 page=page, 687 limit=limit, 688 name=name, 689 user_id=user_id, 690 session_id=session_id, 691 from_timestamp=from_timestamp, 692 to_timestamp=to_timestamp, 693 order_by=order_by, 694 tags=tags, 695 ) 696 except Exception as e: 697 handle_fern_exception(e) 698 raise e 699 700 def fetch_observations( 701 self, 702 *, 703 page: typing.Optional[int] = None, 704 limit: typing.Optional[int] = None, 705 name: typing.Optional[str] = None, 706 user_id: typing.Optional[str] = None, 707 trace_id: typing.Optional[str] = None, 708 parent_observation_id: typing.Optional[str] = None, 709 from_start_time: typing.Optional[dt.datetime] = None, 710 to_start_time: typing.Optional[dt.datetime] = None, 711 type: typing.Optional[str] = None, 712 ) -> FetchObservationsResponse: 713 """Get a list of observations in the current project matching the given parameters. 714 715 Args: 716 page (Optional[int]): Page number of the observations to return. Defaults to None. 717 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 718 name (Optional[str]): Name of the observations to return. Defaults to None. 719 user_id (Optional[str]): User identifier. Defaults to None. 720 trace_id (Optional[str]): Trace identifier. Defaults to None. 721 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 722 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 723 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 724 type (Optional[str]): Type of the observation. Defaults to None. 725 726 Returns: 727 FetchObservationsResponse, list of observations on `data` and metadata on `meta`. 728 729 Raises: 730 Exception: If an error occurred during the request. 731 """ 732 try: 733 self.log.debug( 734 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 735 ) 736 res = self.client.observations.get_many( 737 page=page, 738 limit=limit, 739 name=name, 740 user_id=user_id, 741 trace_id=trace_id, 742 parent_observation_id=parent_observation_id, 743 from_start_time=from_start_time, 744 to_start_time=to_start_time, 745 type=type, 746 ) 747 return FetchObservationsResponse(data=res.data, meta=res.meta) 748 except Exception as e: 749 self.log.exception(e) 750 raise e 751 752 def get_observations( 753 self, 754 *, 755 page: typing.Optional[int] = None, 756 limit: typing.Optional[int] = None, 757 name: typing.Optional[str] = None, 758 user_id: typing.Optional[str] = None, 759 trace_id: typing.Optional[str] = None, 760 parent_observation_id: typing.Optional[str] = None, 761 from_start_time: typing.Optional[dt.datetime] = None, 762 to_start_time: typing.Optional[dt.datetime] = None, 763 type: typing.Optional[str] = None, 764 ) -> ObservationsViews: 765 """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead. 766 767 Args: 768 page (Optional[int]): Page number of the observations to return. Defaults to None. 769 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 770 name (Optional[str]): Name of the observations to return. Defaults to None. 771 user_id (Optional[str]): User identifier. Defaults to None. 772 trace_id (Optional[str]): Trace identifier. Defaults to None. 773 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 774 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 775 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 776 type (Optional[str]): Type of the observation. Defaults to None. 777 778 Returns: 779 List of ObservationsViews: List of observations in the project matching the given parameters. 780 781 Raises: 782 Exception: If an error occurred during the request. 783 """ 784 warnings.warn( 785 "get_observations is deprecated, use fetch_observations instead.", 786 DeprecationWarning, 787 ) 788 try: 789 self.log.debug( 790 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 791 ) 792 return self.client.observations.get_many( 793 page=page, 794 limit=limit, 795 name=name, 796 user_id=user_id, 797 trace_id=trace_id, 798 parent_observation_id=parent_observation_id, 799 from_start_time=from_start_time, 800 to_start_time=to_start_time, 801 type=type, 802 ) 803 except Exception as e: 804 handle_fern_exception(e) 805 raise e 806 807 def get_generations( 808 self, 809 *, 810 page: typing.Optional[int] = None, 811 limit: typing.Optional[int] = None, 812 name: typing.Optional[str] = None, 813 user_id: typing.Optional[str] = None, 814 trace_id: typing.Optional[str] = None, 815 from_start_time: typing.Optional[dt.datetime] = None, 816 to_start_time: typing.Optional[dt.datetime] = None, 817 parent_observation_id: typing.Optional[str] = None, 818 ) -> ObservationsViews: 819 """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead. 820 821 Args: 822 page (Optional[int]): Page number of the generations to return. Defaults to None. 823 limit (Optional[int]): Maximum number of generations to return. Defaults to None. 824 name (Optional[str]): Name of the generations to return. Defaults to None. 825 user_id (Optional[str]): User identifier of the generations to return. Defaults to None. 826 trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None. 827 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 828 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 829 parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None. 830 831 Returns: 832 List of ObservationsViews: List of generations in the project matching the given parameters. 833 834 Raises: 835 Exception: If an error occurred during the request. 836 """ 837 warnings.warn( 838 "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.", 839 DeprecationWarning, 840 ) 841 return self.get_observations( 842 page=page, 843 limit=limit, 844 name=name, 845 user_id=user_id, 846 trace_id=trace_id, 847 parent_observation_id=parent_observation_id, 848 from_start_time=from_start_time, 849 to_start_time=to_start_time, 850 type="GENERATION", 851 ) 852 853 def fetch_observation( 854 self, 855 id: str, 856 ) -> FetchObservationResponse: 857 """Get an observation in the current project with the given identifier. 858 859 Args: 860 id: The identifier of the observation to fetch. 861 862 Returns: 863 FetchObservationResponse: The observation with the given id on `data`. 864 865 Raises: 866 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 867 """ 868 try: 869 self.log.debug(f"Getting observation {id}") 870 observation = self.client.observations.get(id) 871 return FetchObservationResponse(data=observation) 872 except Exception as e: 873 handle_fern_exception(e) 874 raise e 875 876 def get_observation( 877 self, 878 id: str, 879 ) -> Observation: 880 """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead. 881 882 Args: 883 id: The identifier of the observation to fetch. 884 885 Raises: 886 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 887 """ 888 warnings.warn( 889 "get_observation is deprecated, use fetch_observation instead.", 890 DeprecationWarning, 891 ) 892 try: 893 self.log.debug(f"Getting observation {id}") 894 return self.client.observations.get(id) 895 except Exception as e: 896 handle_fern_exception(e) 897 raise e 898 899 def fetch_sessions( 900 self, 901 *, 902 page: typing.Optional[int] = None, 903 limit: typing.Optional[int] = None, 904 from_timestamp: typing.Optional[dt.datetime] = None, 905 to_timestamp: typing.Optional[dt.datetime] = None, 906 ) -> FetchSessionsResponse: 907 """Get a list of sessions in the current project. 908 909 Args: 910 page (Optional[int]): Page number of the sessions to return. Defaults to None. 911 limit (Optional[int]): Maximum number of sessions to return. Defaults to None. 912 from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None. 913 to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None. 914 915 Returns: 916 FetchSessionsResponse, list of sessions on `data` and metadata on `meta`. 917 918 Raises: 919 Exception: If an error occurred during the request. 920 """ 921 try: 922 self.log.debug( 923 f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}" 924 ) 925 res = self.client.sessions.list( 926 page=page, 927 limit=limit, 928 from_timestamp=from_timestamp, 929 to_timestamp=to_timestamp, 930 ) 931 return FetchSessionsResponse(data=res.data, meta=res.meta) 932 except Exception as e: 933 handle_fern_exception(e) 934 raise e 935 936 @overload 937 def get_prompt( 938 self, 939 name: str, 940 version: Optional[int] = None, 941 *, 942 label: Optional[str] = None, 943 type: Literal["chat"], 944 cache_ttl_seconds: Optional[int] = None, 945 fallback: Optional[List[ChatMessageDict]] = None, 946 max_retries: Optional[int] = None, 947 fetch_timeout_seconds: Optional[int] = None, 948 ) -> ChatPromptClient: ... 949 950 @overload 951 def get_prompt( 952 self, 953 name: str, 954 version: Optional[int] = None, 955 *, 956 label: Optional[str] = None, 957 type: Literal["text"] = "text", 958 cache_ttl_seconds: Optional[int] = None, 959 fallback: Optional[str] = None, 960 max_retries: Optional[int] = None, 961 fetch_timeout_seconds: Optional[int] = None, 962 ) -> TextPromptClient: ... 963 964 def get_prompt( 965 self, 966 name: str, 967 version: Optional[int] = None, 968 *, 969 label: Optional[str] = None, 970 type: Literal["chat", "text"] = "text", 971 cache_ttl_seconds: Optional[int] = None, 972 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 973 max_retries: Optional[int] = None, 974 fetch_timeout_seconds: Optional[int] = None, 975 ) -> PromptClient: 976 """Get a prompt. 977 978 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 979 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 980 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 981 return the expired prompt as a fallback. 982 983 Args: 984 name (str): The name of the prompt to retrieve. 985 986 Keyword Args: 987 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 988 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 989 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 990 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 991 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 992 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 993 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 994 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default. 995 996 Returns: 997 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 998 - TextPromptClient, if type argument is 'text'. 999 - ChatPromptClient, if type argument is 'chat'. 1000 1001 Raises: 1002 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 1003 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 1004 """ 1005 if version is not None and label is not None: 1006 raise ValueError("Cannot specify both version and label at the same time.") 1007 1008 if not name: 1009 raise ValueError("Prompt name cannot be empty.") 1010 1011 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 1012 bounded_max_retries = self._get_bounded_max_retries( 1013 max_retries, default_max_retries=2, max_retries_upper_bound=4 1014 ) 1015 1016 self.log.debug(f"Getting prompt '{cache_key}'") 1017 cached_prompt = self.prompt_cache.get(cache_key) 1018 1019 if cached_prompt is None or cache_ttl_seconds == 0: 1020 self.log.debug( 1021 f"Prompt '{cache_key}' not found in cache or caching disabled." 1022 ) 1023 try: 1024 return self._fetch_prompt_and_update_cache( 1025 name, 1026 version=version, 1027 label=label, 1028 ttl_seconds=cache_ttl_seconds, 1029 max_retries=bounded_max_retries, 1030 fetch_timeout_seconds=fetch_timeout_seconds, 1031 ) 1032 except Exception as e: 1033 if fallback: 1034 self.log.warning( 1035 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 1036 ) 1037 1038 fallback_client_args = { 1039 "name": name, 1040 "prompt": fallback, 1041 "type": type, 1042 "version": version or 0, 1043 "config": {}, 1044 "labels": [label] if label else [], 1045 "tags": [], 1046 } 1047 1048 if type == "text": 1049 return TextPromptClient( 1050 prompt=Prompt_Text(**fallback_client_args), 1051 is_fallback=True, 1052 ) 1053 1054 if type == "chat": 1055 return ChatPromptClient( 1056 prompt=Prompt_Chat(**fallback_client_args), 1057 is_fallback=True, 1058 ) 1059 1060 raise e 1061 1062 if cached_prompt.is_expired(): 1063 self.log.debug(f"Stale prompt '{cache_key}' found in cache.") 1064 try: 1065 # refresh prompt in background thread, refresh_prompt deduplicates tasks 1066 self.log.debug(f"Refreshing prompt '{cache_key}' in background.") 1067 self.prompt_cache.add_refresh_prompt_task( 1068 cache_key, 1069 lambda: self._fetch_prompt_and_update_cache( 1070 name, 1071 version=version, 1072 label=label, 1073 ttl_seconds=cache_ttl_seconds, 1074 max_retries=bounded_max_retries, 1075 fetch_timeout_seconds=fetch_timeout_seconds, 1076 ), 1077 ) 1078 self.log.debug(f"Returning stale prompt '{cache_key}' from cache.") 1079 # return stale prompt 1080 return cached_prompt.value 1081 1082 except Exception as e: 1083 self.log.warning( 1084 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 1085 ) 1086 # creation of refresh prompt task failed, return stale prompt 1087 return cached_prompt.value 1088 1089 return cached_prompt.value 1090 1091 def _fetch_prompt_and_update_cache( 1092 self, 1093 name: str, 1094 *, 1095 version: Optional[int] = None, 1096 label: Optional[str] = None, 1097 ttl_seconds: Optional[int] = None, 1098 max_retries: int, 1099 fetch_timeout_seconds, 1100 ) -> PromptClient: 1101 try: 1102 cache_key = PromptCache.generate_cache_key( 1103 name, version=version, label=label 1104 ) 1105 1106 self.log.debug(f"Fetching prompt '{cache_key}' from server...") 1107 1108 @backoff.on_exception( 1109 backoff.constant, Exception, max_tries=max_retries, logger=None 1110 ) 1111 def fetch_prompts(): 1112 return self.client.prompts.get( 1113 self._url_encode(name), 1114 version=version, 1115 label=label, 1116 request_options={ 1117 "timeout_in_seconds": fetch_timeout_seconds, 1118 } 1119 if fetch_timeout_seconds is not None 1120 else None, 1121 ) 1122 1123 prompt_response = fetch_prompts() 1124 1125 if prompt_response.type == "chat": 1126 prompt = ChatPromptClient(prompt_response) 1127 else: 1128 prompt = TextPromptClient(prompt_response) 1129 1130 self.prompt_cache.set(cache_key, prompt, ttl_seconds) 1131 1132 return prompt 1133 1134 except Exception as e: 1135 self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}") 1136 raise e 1137 1138 def _get_bounded_max_retries( 1139 self, 1140 max_retries: Optional[int], 1141 *, 1142 default_max_retries: int = 2, 1143 max_retries_upper_bound: int = 4, 1144 ) -> int: 1145 if max_retries is None: 1146 return default_max_retries 1147 1148 bounded_max_retries = min( 1149 max(max_retries, 0), 1150 max_retries_upper_bound, 1151 ) 1152 1153 return bounded_max_retries 1154 1155 @overload 1156 def create_prompt( 1157 self, 1158 *, 1159 name: str, 1160 prompt: List[ChatMessageDict], 1161 is_active: Optional[bool] = None, # deprecated 1162 labels: List[str] = [], 1163 tags: Optional[List[str]] = None, 1164 type: Optional[Literal["chat"]], 1165 config: Optional[Any] = None, 1166 ) -> ChatPromptClient: ... 1167 1168 @overload 1169 def create_prompt( 1170 self, 1171 *, 1172 name: str, 1173 prompt: str, 1174 is_active: Optional[bool] = None, # deprecated 1175 labels: List[str] = [], 1176 tags: Optional[List[str]] = None, 1177 type: Optional[Literal["text"]] = "text", 1178 config: Optional[Any] = None, 1179 ) -> TextPromptClient: ... 1180 1181 def create_prompt( 1182 self, 1183 *, 1184 name: str, 1185 prompt: Union[str, List[ChatMessageDict]], 1186 is_active: Optional[bool] = None, # deprecated 1187 labels: List[str] = [], 1188 tags: Optional[List[str]] = None, 1189 type: Optional[Literal["chat", "text"]] = "text", 1190 config: Optional[Any] = None, 1191 ) -> PromptClient: 1192 """Create a new prompt in Langfuse. 1193 1194 Keyword Args: 1195 name : The name of the prompt to be created. 1196 prompt : The content of the prompt to be created. 1197 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 1198 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 1199 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 1200 config: Additional structured data to be saved with the prompt. Defaults to None. 1201 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 1202 1203 Returns: 1204 TextPromptClient: The prompt if type argument is 'text'. 1205 ChatPromptClient: The prompt if type argument is 'chat'. 1206 """ 1207 try: 1208 self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}") 1209 1210 # Handle deprecated is_active flag 1211 if is_active: 1212 self.log.warning( 1213 "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead." 1214 ) 1215 1216 labels = labels if "production" in labels else labels + ["production"] 1217 1218 if type == "chat": 1219 if not isinstance(prompt, list): 1220 raise ValueError( 1221 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 1222 ) 1223 request = CreatePromptRequest_Chat( 1224 name=name, 1225 prompt=prompt, 1226 labels=labels, 1227 tags=tags, 1228 config=config or {}, 1229 type="chat", 1230 ) 1231 server_prompt = self.client.prompts.create(request=request) 1232 1233 return ChatPromptClient(prompt=server_prompt) 1234 1235 if not isinstance(prompt, str): 1236 raise ValueError("For 'text' type, 'prompt' must be a string.") 1237 1238 request = CreatePromptRequest_Text( 1239 name=name, 1240 prompt=prompt, 1241 labels=labels, 1242 tags=tags, 1243 config=config or {}, 1244 type="text", 1245 ) 1246 1247 server_prompt = self.client.prompts.create(request=request) 1248 return TextPromptClient(prompt=server_prompt) 1249 1250 except Exception as e: 1251 handle_fern_exception(e) 1252 raise e 1253 1254 def _url_encode(self, url: str) -> str: 1255 return urllib.parse.quote(url) 1256 1257 def trace( 1258 self, 1259 *, 1260 id: typing.Optional[str] = None, 1261 name: typing.Optional[str] = None, 1262 user_id: typing.Optional[str] = None, 1263 session_id: typing.Optional[str] = None, 1264 version: typing.Optional[str] = None, 1265 input: typing.Optional[typing.Any] = None, 1266 output: typing.Optional[typing.Any] = None, 1267 metadata: typing.Optional[typing.Any] = None, 1268 tags: typing.Optional[typing.List[str]] = None, 1269 timestamp: typing.Optional[dt.datetime] = None, 1270 public: typing.Optional[bool] = None, 1271 **kwargs, 1272 ) -> "StatefulTraceClient": 1273 """Create a trace. 1274 1275 Args: 1276 id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id. 1277 name: Identifier of the trace. Useful for sorting/filtering in the UI. 1278 input: The input of the trace. Can be any JSON object. 1279 output: The output of the trace. Can be any JSON object. 1280 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 1281 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 1282 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 1283 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 1284 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 1285 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 1286 timestamp: The timestamp of the trace. Defaults to the current time if not provided. 1287 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 1288 **kwargs: Additional keyword arguments that can be included in the trace. 1289 1290 Returns: 1291 StatefulTraceClient: The created trace. 1292 1293 Example: 1294 ```python 1295 from langfuse import Langfuse 1296 1297 langfuse = Langfuse() 1298 1299 trace = langfuse.trace( 1300 name="example-application", 1301 user_id="user-1234") 1302 ) 1303 ``` 1304 """ 1305 new_id = id or str(uuid.uuid4()) 1306 self.trace_id = new_id 1307 try: 1308 new_dict = { 1309 "id": new_id, 1310 "name": name, 1311 "userId": user_id, 1312 "sessionId": session_id 1313 or kwargs.get("sessionId", None), # backward compatibility 1314 "release": self.release, 1315 "version": version, 1316 "metadata": metadata, 1317 "input": input, 1318 "output": output, 1319 "tags": tags, 1320 "timestamp": timestamp or _get_timestamp(), 1321 "public": public, 1322 } 1323 if kwargs is not None: 1324 new_dict.update(kwargs) 1325 1326 new_body = TraceBody(**new_dict) 1327 1328 self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}") 1329 event = { 1330 "id": str(uuid.uuid4()), 1331 "type": "trace-create", 1332 "body": new_body, 1333 } 1334 1335 self.task_manager.add_task( 1336 event, 1337 ) 1338 1339 except Exception as e: 1340 self.log.exception(e) 1341 finally: 1342 self._log_memory_usage() 1343 1344 return StatefulTraceClient( 1345 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1346 ) 1347 1348 def _log_memory_usage(self): 1349 try: 1350 is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0))) 1351 report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0)) 1352 top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10)) 1353 1354 if ( 1355 not is_malloc_tracing_enabled 1356 or report_interval <= 0 1357 or round(time.monotonic()) % report_interval != 0 1358 ): 1359 return 1360 1361 snapshot = tracemalloc.take_snapshot().statistics("lineno") 1362 1363 total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024 1364 memory_usage_total_items = [f"{stat}" for stat in snapshot] 1365 memory_usage_langfuse_items = [ 1366 stat for stat in memory_usage_total_items if "/langfuse/" in stat 1367 ] 1368 1369 logged_memory_usage = { 1370 "all_files": [f"{stat}" for stat in memory_usage_total_items][ 1371 :top_k_items 1372 ], 1373 "langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][ 1374 :top_k_items 1375 ], 1376 "total_usage": f"{total_memory_usage:.2f} MB", 1377 "langfuse_queue_length": self.task_manager._ingestion_queue.qsize(), 1378 } 1379 1380 self.log.debug("Memory usage: ", logged_memory_usage) 1381 1382 event = SdkLogBody(log=logged_memory_usage) 1383 self.task_manager.add_task( 1384 { 1385 "id": str(uuid.uuid4()), 1386 "type": "sdk-log", 1387 "timestamp": _get_timestamp(), 1388 "body": event.dict(), 1389 } 1390 ) 1391 1392 except Exception as e: 1393 self.log.exception(e) 1394 1395 @overload 1396 def score( 1397 self, 1398 *, 1399 name: str, 1400 value: float, 1401 data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 1402 trace_id: typing.Optional[str] = None, 1403 id: typing.Optional[str] = None, 1404 comment: typing.Optional[str] = None, 1405 observation_id: typing.Optional[str] = None, 1406 config_id: typing.Optional[str] = None, 1407 **kwargs, 1408 ) -> "StatefulClient": ... 1409 1410 @overload 1411 def score( 1412 self, 1413 *, 1414 name: str, 1415 value: str, 1416 data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 1417 trace_id: typing.Optional[str] = None, 1418 id: typing.Optional[str] = None, 1419 comment: typing.Optional[str] = None, 1420 observation_id: typing.Optional[str] = None, 1421 config_id: typing.Optional[str] = None, 1422 **kwargs, 1423 ) -> "StatefulClient": ... 1424 1425 def score( 1426 self, 1427 *, 1428 name: str, 1429 value: typing.Union[float, str], 1430 data_type: typing.Optional[ScoreDataType] = None, 1431 trace_id: typing.Optional[str] = None, 1432 id: typing.Optional[str] = None, 1433 comment: typing.Optional[str] = None, 1434 observation_id: typing.Optional[str] = None, 1435 config_id: typing.Optional[str] = None, 1436 **kwargs, 1437 ) -> "StatefulClient": 1438 """Create a score attached to a trace (and optionally an observation). 1439 1440 Args: 1441 name (str): Identifier of the score. 1442 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. 1443 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 1444 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 1445 trace_id (str): The id of the trace to which the score should be attached. 1446 id (Optional[str]): The id of the score. If not provided, a new UUID is generated. 1447 comment (Optional[str]): Additional context/explanation of the score. 1448 observation_id (Optional[str]): The id of the observation to which the score should be attached. 1449 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 1450 **kwargs: Additional keyword arguments to include in the score. 1451 1452 Returns: 1453 StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided). 1454 1455 Example: 1456 ```python 1457 from langfuse import Langfuse 1458 1459 langfuse = Langfuse() 1460 1461 # Create a trace 1462 trace = langfuse.trace(name="example-application") 1463 1464 # Get id of created trace 1465 trace_id = trace.id 1466 1467 # Add score to the trace 1468 trace = langfuse.score( 1469 trace_id=trace_id, 1470 name="user-explicit-feedback", 1471 value=0.9, 1472 comment="I like how personalized the response is" 1473 ) 1474 ``` 1475 """ 1476 trace_id = trace_id or self.trace_id or str(uuid.uuid4()) 1477 new_id = id or str(uuid.uuid4()) 1478 try: 1479 new_dict = { 1480 "id": new_id, 1481 "trace_id": trace_id, 1482 "observation_id": observation_id, 1483 "name": name, 1484 "value": value, 1485 "data_type": data_type, 1486 "comment": comment, 1487 "config_id": config_id, 1488 **kwargs, 1489 } 1490 1491 self.log.debug(f"Creating score {new_dict}...") 1492 new_body = ScoreBody(**new_dict) 1493 1494 event = { 1495 "id": str(uuid.uuid4()), 1496 "type": "score-create", 1497 "body": new_body, 1498 } 1499 self.task_manager.add_task(event) 1500 1501 except Exception as e: 1502 self.log.exception(e) 1503 finally: 1504 if observation_id is not None: 1505 return StatefulClient( 1506 self.client, 1507 observation_id, 1508 StateType.OBSERVATION, 1509 trace_id, 1510 self.task_manager, 1511 ) 1512 else: 1513 return StatefulClient( 1514 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1515 ) 1516 1517 def span( 1518 self, 1519 *, 1520 id: typing.Optional[str] = None, 1521 trace_id: typing.Optional[str] = None, 1522 parent_observation_id: typing.Optional[str] = None, 1523 name: typing.Optional[str] = None, 1524 start_time: typing.Optional[dt.datetime] = None, 1525 end_time: typing.Optional[dt.datetime] = None, 1526 metadata: typing.Optional[typing.Any] = None, 1527 level: typing.Optional[SpanLevel] = None, 1528 status_message: typing.Optional[str] = None, 1529 input: typing.Optional[typing.Any] = None, 1530 output: typing.Optional[typing.Any] = None, 1531 version: typing.Optional[str] = None, 1532 **kwargs, 1533 ) -> "StatefulSpanClient": 1534 """Create a span. 1535 1536 A span represents durations of units of work in a trace. 1537 Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1538 1539 If no trace_id is provided, a new trace is created just for this span. 1540 1541 Args: 1542 id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. 1543 trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated. 1544 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1545 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 1546 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 1547 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 1548 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 1549 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1550 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 1551 input (Optional[dict]): The input to the span. Can be any JSON object. 1552 output (Optional[dict]): The output to the span. Can be any JSON object. 1553 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 1554 **kwargs: Additional keyword arguments to include in the span. 1555 1556 Returns: 1557 StatefulSpanClient: The created span. 1558 1559 Example: 1560 ```python 1561 from langfuse import Langfuse 1562 1563 langfuse = Langfuse() 1564 1565 trace = langfuse.trace(name = "llm-feature") 1566 1567 # Create a span 1568 retrieval = langfuse.span(name = "retrieval", trace_id = trace.id) 1569 1570 # Create a nested span 1571 nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id) 1572 ``` 1573 """ 1574 new_span_id = id or str(uuid.uuid4()) 1575 new_trace_id = trace_id or str(uuid.uuid4()) 1576 self.trace_id = new_trace_id 1577 try: 1578 span_body = { 1579 "id": new_span_id, 1580 "trace_id": new_trace_id, 1581 "name": name, 1582 "start_time": start_time or _get_timestamp(), 1583 "metadata": metadata, 1584 "input": input, 1585 "output": output, 1586 "level": level, 1587 "status_message": status_message, 1588 "parent_observation_id": parent_observation_id, 1589 "version": version, 1590 "end_time": end_time, 1591 "trace": {"release": self.release}, 1592 **kwargs, 1593 } 1594 1595 if trace_id is None: 1596 self._generate_trace(new_trace_id, name or new_trace_id) 1597 1598 self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...") 1599 1600 span_body = CreateSpanBody(**span_body) 1601 1602 event = { 1603 "id": str(uuid.uuid4()), 1604 "type": "span-create", 1605 "body": span_body, 1606 } 1607 1608 self.task_manager.add_task(event) 1609 1610 except Exception as e: 1611 self.log.exception(e) 1612 finally: 1613 self._log_memory_usage() 1614 1615 return StatefulSpanClient( 1616 self.client, 1617 new_span_id, 1618 StateType.OBSERVATION, 1619 new_trace_id, 1620 self.task_manager, 1621 ) 1622 1623 def event( 1624 self, 1625 *, 1626 id: typing.Optional[str] = None, 1627 trace_id: typing.Optional[str] = None, 1628 parent_observation_id: typing.Optional[str] = None, 1629 name: typing.Optional[str] = None, 1630 start_time: typing.Optional[dt.datetime] = None, 1631 metadata: typing.Optional[typing.Any] = None, 1632 input: typing.Optional[typing.Any] = None, 1633 output: typing.Optional[typing.Any] = None, 1634 level: typing.Optional[SpanLevel] = None, 1635 status_message: typing.Optional[str] = None, 1636 version: typing.Optional[str] = None, 1637 **kwargs, 1638 ) -> "StatefulSpanClient": 1639 """Create an event. 1640 1641 An event represents a discrete event in a trace. 1642 Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1643 1644 If no trace_id is provided, a new trace is created just for this event. 1645 1646 Args: 1647 id (Optional[str]): The id of the event can be set, otherwise a random id is generated. 1648 trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event. 1649 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1650 name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. 1651 start_time (Optional[datetime]): The time at which the event started, defaults to the current time. 1652 metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. 1653 input (Optional[Any]): The input to the event. Can be any JSON object. 1654 output (Optional[Any]): The output to the event. Can be any JSON object. 1655 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1656 status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. 1657 version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. 1658 **kwargs: Additional keyword arguments to include in the event. 1659 1660 Returns: 1661 StatefulSpanClient: The created event. 1662 1663 Example: 1664 ```python 1665 from langfuse import Langfuse 1666 1667 langfuse = Langfuse() 1668 1669 trace = langfuse.trace(name = "llm-feature") 1670 1671 # Create an event 1672 retrieval = langfuse.event(name = "retrieval", trace_id = trace.id) 1673 ``` 1674 """ 1675 event_id = id or str(uuid.uuid4()) 1676 new_trace_id = trace_id or str(uuid.uuid4()) 1677 self.trace_id = new_trace_id 1678 try: 1679 event_body = { 1680 "id": event_id, 1681 "trace_id": new_trace_id, 1682 "name": name, 1683 "start_time": start_time or _get_timestamp(), 1684 "metadata": metadata, 1685 "input": input, 1686 "output": output, 1687 "level": level, 1688 "status_message": status_message, 1689 "parent_observation_id": parent_observation_id, 1690 "version": version, 1691 "trace": {"release": self.release}, 1692 **kwargs, 1693 } 1694 1695 if trace_id is None: 1696 self._generate_trace(new_trace_id, name or new_trace_id) 1697 1698 request = CreateEventBody(**event_body) 1699 1700 event = { 1701 "id": str(uuid.uuid4()), 1702 "type": "event-create", 1703 "body": request, 1704 } 1705 1706 self.log.debug( 1707 f"Creating event {_filter_io_from_event_body(event_body)} ..." 1708 ) 1709 self.task_manager.add_task(event) 1710 1711 except Exception as e: 1712 self.log.exception(e) 1713 finally: 1714 return StatefulSpanClient( 1715 self.client, 1716 event_id, 1717 StateType.OBSERVATION, 1718 new_trace_id, 1719 self.task_manager, 1720 ) 1721 1722 def generation( 1723 self, 1724 *, 1725 id: typing.Optional[str] = None, 1726 trace_id: typing.Optional[str] = None, 1727 parent_observation_id: typing.Optional[str] = None, 1728 name: typing.Optional[str] = None, 1729 start_time: typing.Optional[dt.datetime] = None, 1730 end_time: typing.Optional[dt.datetime] = None, 1731 completion_start_time: typing.Optional[dt.datetime] = None, 1732 metadata: typing.Optional[typing.Any] = None, 1733 level: typing.Optional[SpanLevel] = None, 1734 status_message: typing.Optional[str] = None, 1735 version: typing.Optional[str] = None, 1736 model: typing.Optional[str] = None, 1737 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 1738 input: typing.Optional[typing.Any] = None, 1739 output: typing.Optional[typing.Any] = None, 1740 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 1741 prompt: typing.Optional[PromptClient] = None, 1742 **kwargs, 1743 ) -> "StatefulGenerationClient": 1744 """Create a generation. 1745 1746 A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. 1747 1748 Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. 1749 1750 If no trace_id is provided, a new trace is created just for this generation. 1751 1752 Args: 1753 id (Optional[str]): The id of the generation can be set, defaults to random id. 1754 trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created 1755 parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. 1756 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 1757 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 1758 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 1759 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 1760 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 1761 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 1762 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 1763 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 1764 model (Optional[str]): The name of the model used for the generation. 1765 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 1766 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 1767 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 1768 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 1769 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 1770 **kwargs: Additional keyword arguments to include in the generation. 1771 1772 Returns: 1773 StatefulGenerationClient: The created generation. 1774 1775 Example: 1776 ```python 1777 from langfuse import Langfuse 1778 1779 langfuse = Langfuse() 1780 1781 # Create a generation in Langfuse 1782 generation = langfuse.generation( 1783 name="summary-generation", 1784 model="gpt-3.5-turbo", 1785 model_parameters={"maxTokens": "1000", "temperature": "0.9"}, 1786 input=[{"role": "system", "content": "You are a helpful assistant."}, 1787 {"role": "user", "content": "Please generate a summary of the following documents ..."}], 1788 metadata={"interface": "whatsapp"} 1789 ) 1790 ``` 1791 """ 1792 new_trace_id = trace_id or str(uuid.uuid4()) 1793 new_generation_id = id or str(uuid.uuid4()) 1794 self.trace_id = new_trace_id 1795 try: 1796 generation_body = { 1797 "id": new_generation_id, 1798 "trace_id": new_trace_id, 1799 "release": self.release, 1800 "name": name, 1801 "start_time": start_time or _get_timestamp(), 1802 "metadata": metadata, 1803 "input": input, 1804 "output": output, 1805 "level": level, 1806 "status_message": status_message, 1807 "parent_observation_id": parent_observation_id, 1808 "version": version, 1809 "end_time": end_time, 1810 "completion_start_time": completion_start_time, 1811 "model": model, 1812 "model_parameters": model_parameters, 1813 "usage": _convert_usage_input(usage) if usage is not None else None, 1814 "trace": {"release": self.release}, 1815 **_create_prompt_context(prompt), 1816 **kwargs, 1817 } 1818 1819 if trace_id is None: 1820 trace = { 1821 "id": new_trace_id, 1822 "release": self.release, 1823 "name": name, 1824 } 1825 request = TraceBody(**trace) 1826 1827 event = { 1828 "id": str(uuid.uuid4()), 1829 "type": "trace-create", 1830 "body": request, 1831 } 1832 1833 self.log.debug("Creating trace...") 1834 1835 self.task_manager.add_task(event) 1836 1837 self.log.debug( 1838 f"Creating generation max {_filter_io_from_event_body(generation_body)}..." 1839 ) 1840 request = CreateGenerationBody(**generation_body) 1841 1842 event = { 1843 "id": str(uuid.uuid4()), 1844 "type": "generation-create", 1845 "body": request, 1846 } 1847 1848 self.task_manager.add_task(event) 1849 1850 except Exception as e: 1851 self.log.exception(e) 1852 finally: 1853 return StatefulGenerationClient( 1854 self.client, 1855 new_generation_id, 1856 StateType.OBSERVATION, 1857 new_trace_id, 1858 self.task_manager, 1859 ) 1860 1861 def _generate_trace(self, trace_id: str, name: str): 1862 trace_dict = { 1863 "id": trace_id, 1864 "release": self.release, 1865 "name": name, 1866 } 1867 1868 trace_body = TraceBody(**trace_dict) 1869 1870 event = { 1871 "id": str(uuid.uuid4()), 1872 "type": "trace-create", 1873 "body": trace_body, 1874 } 1875 1876 self.log.debug(f"Creating trace {_filter_io_from_event_body(trace_dict)}...") 1877 self.task_manager.add_task(event) 1878 1879 def join(self): 1880 """Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter. 1881 1882 If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes. 1883 To guarantee all messages have been delivered, you still need to call flush(). 1884 """ 1885 try: 1886 return self.task_manager.join() 1887 except Exception as e: 1888 self.log.exception(e) 1889 1890 def flush(self): 1891 """Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down. 1892 1893 Example: 1894 ```python 1895 from langfuse import Langfuse 1896 1897 langfuse = Langfuse() 1898 1899 # Some operations with Langfuse 1900 1901 # Flushing all events to end Langfuse cleanly 1902 langfuse.flush() 1903 ``` 1904 """ 1905 try: 1906 return self.task_manager.flush() 1907 except Exception as e: 1908 self.log.exception(e) 1909 1910 def shutdown(self): 1911 """Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated. 1912 1913 This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API. 1914 As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API. 1915 """ 1916 try: 1917 return self.task_manager.shutdown() 1918 except Exception as e: 1919 self.log.exception(e) 1920 1921 1922class StateType(Enum): 1923 """Enum to distinguish observation and trace states. 1924 1925 Attributes: 1926 OBSERVATION (int): Observation state. 1927 TRACE (int): Trace state. 1928 """ 1929 1930 OBSERVATION = 1 1931 TRACE = 0 1932 1933 1934class StatefulClient(object): 1935 """Base class for handling stateful operations in the Langfuse system. 1936 1937 This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events, 1938 associating them with either an observation or a trace based on the specified state type. 1939 1940 Attributes: 1941 client (FernLangfuse): Core interface for Langfuse API interactions. 1942 id (str): Unique identifier of the stateful client (either observation or trace). 1943 state_type (StateType): Enum indicating whether the client is an observation or a trace. 1944 trace_id (str): Id of the trace associated with the stateful client. 1945 task_manager (TaskManager): Manager handling asynchronous tasks for the client. 1946 """ 1947 1948 log = logging.getLogger("langfuse") 1949 1950 def __init__( 1951 self, 1952 client: FernLangfuse, 1953 id: str, 1954 state_type: StateType, 1955 trace_id: str, 1956 task_manager: TaskManager, 1957 ): 1958 """Initialize the StatefulClient. 1959 1960 Args: 1961 client (FernLangfuse): Core interface for Langfuse API interactions. 1962 id (str): Unique identifier of the stateful client (either observation or trace). 1963 state_type (StateType): Enum indicating whether the client is an observation or a trace. 1964 trace_id (str): Id of the trace associated with the stateful client. 1965 task_manager (TaskManager): Manager handling asynchronous tasks for the client. 1966 """ 1967 self.client = client 1968 self.trace_id = trace_id 1969 self.id = id 1970 self.state_type = state_type 1971 self.task_manager = task_manager 1972 1973 def _add_state_to_event(self, body: dict): 1974 if self.state_type == StateType.OBSERVATION: 1975 body["parent_observation_id"] = self.id 1976 body["trace_id"] = self.trace_id 1977 else: 1978 body["trace_id"] = self.id 1979 return body 1980 1981 def _add_default_values(self, body: dict): 1982 if body.get("start_time") is None: 1983 body["start_time"] = _get_timestamp() 1984 return body 1985 1986 def generation( 1987 self, 1988 *, 1989 id: typing.Optional[str] = None, 1990 name: typing.Optional[str] = None, 1991 start_time: typing.Optional[dt.datetime] = None, 1992 end_time: typing.Optional[dt.datetime] = None, 1993 metadata: typing.Optional[typing.Any] = None, 1994 level: typing.Optional[SpanLevel] = None, 1995 status_message: typing.Optional[str] = None, 1996 version: typing.Optional[str] = None, 1997 completion_start_time: typing.Optional[dt.datetime] = None, 1998 model: typing.Optional[str] = None, 1999 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2000 input: typing.Optional[typing.Any] = None, 2001 output: typing.Optional[typing.Any] = None, 2002 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2003 prompt: typing.Optional[PromptClient] = None, 2004 **kwargs, 2005 ) -> "StatefulGenerationClient": 2006 """Create a generation nested within the current observation or trace. 2007 2008 A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. 2009 2010 Args: 2011 id (Optional[str]): The id of the generation can be set, defaults to random id. 2012 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2013 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2014 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 2015 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2016 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2017 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2018 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2019 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2020 model (Optional[str]): The name of the model used for the generation. 2021 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2022 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2023 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2024 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2025 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2026 **kwargs: Additional keyword arguments to include in the generation. 2027 2028 Returns: 2029 StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations. 2030 2031 Example: 2032 ```python 2033 from langfuse import Langfuse 2034 2035 langfuse = Langfuse() 2036 2037 # Create a trace 2038 trace = langfuse.trace(name = "llm-feature") 2039 2040 # Create a nested generation in Langfuse 2041 generation = trace.generation( 2042 name="summary-generation", 2043 model="gpt-3.5-turbo", 2044 model_parameters={"maxTokens": "1000", "temperature": "0.9"}, 2045 input=[{"role": "system", "content": "You are a helpful assistant."}, 2046 {"role": "user", "content": "Please generate a summary of the following documents ..."}], 2047 metadata={"interface": "whatsapp"} 2048 ) 2049 ``` 2050 """ 2051 generation_id = id or str(uuid.uuid4()) 2052 try: 2053 generation_body = { 2054 "id": generation_id, 2055 "name": name, 2056 "start_time": start_time or _get_timestamp(), 2057 "metadata": metadata, 2058 "level": level, 2059 "status_message": status_message, 2060 "version": version, 2061 "end_time": end_time, 2062 "completion_start_time": completion_start_time, 2063 "model": model, 2064 "model_parameters": model_parameters, 2065 "input": input, 2066 "output": output, 2067 "usage": _convert_usage_input(usage) if usage is not None else None, 2068 **_create_prompt_context(prompt), 2069 **kwargs, 2070 } 2071 2072 generation_body = self._add_state_to_event(generation_body) 2073 new_body = self._add_default_values(generation_body) 2074 2075 new_body = CreateGenerationBody(**new_body) 2076 2077 event = { 2078 "id": str(uuid.uuid4()), 2079 "type": "generation-create", 2080 "body": new_body.dict(exclude_none=True, exclude_unset=False), 2081 } 2082 2083 self.log.debug( 2084 f"Creating generation {_filter_io_from_event_body(generation_body)}..." 2085 ) 2086 self.task_manager.add_task(event) 2087 2088 except Exception as e: 2089 self.log.exception(e) 2090 finally: 2091 return StatefulGenerationClient( 2092 self.client, 2093 generation_id, 2094 StateType.OBSERVATION, 2095 self.trace_id, 2096 task_manager=self.task_manager, 2097 ) 2098 2099 def span( 2100 self, 2101 *, 2102 id: typing.Optional[str] = None, 2103 name: typing.Optional[str] = None, 2104 start_time: typing.Optional[dt.datetime] = None, 2105 end_time: typing.Optional[dt.datetime] = None, 2106 metadata: typing.Optional[typing.Any] = None, 2107 input: typing.Optional[typing.Any] = None, 2108 output: typing.Optional[typing.Any] = None, 2109 level: typing.Optional[SpanLevel] = None, 2110 status_message: typing.Optional[str] = None, 2111 version: typing.Optional[str] = None, 2112 **kwargs, 2113 ) -> "StatefulSpanClient": 2114 """Create a span nested within the current observation or trace. 2115 2116 A span represents durations of units of work in a trace. 2117 2118 Args: 2119 id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. 2120 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2121 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2122 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2123 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2124 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2125 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2126 input (Optional[dict]): The input to the span. Can be any JSON object. 2127 output (Optional[dict]): The output to the span. Can be any JSON object. 2128 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2129 **kwargs: Additional keyword arguments to include in the span. 2130 2131 Returns: 2132 StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations. 2133 2134 Example: 2135 ```python 2136 from langfuse import Langfuse 2137 2138 langfuse = Langfuse() 2139 2140 # Create a trace 2141 trace = langfuse.trace(name = "llm-feature") 2142 2143 # Create a span 2144 retrieval = langfuse.span(name = "retrieval") 2145 ``` 2146 """ 2147 span_id = id or str(uuid.uuid4()) 2148 try: 2149 span_body = { 2150 "id": span_id, 2151 "name": name, 2152 "start_time": start_time or _get_timestamp(), 2153 "metadata": metadata, 2154 "input": input, 2155 "output": output, 2156 "level": level, 2157 "status_message": status_message, 2158 "version": version, 2159 "end_time": end_time, 2160 **kwargs, 2161 } 2162 2163 self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...") 2164 2165 new_dict = self._add_state_to_event(span_body) 2166 new_body = self._add_default_values(new_dict) 2167 2168 event = CreateSpanBody(**new_body) 2169 2170 event = { 2171 "id": str(uuid.uuid4()), 2172 "type": "span-create", 2173 "body": event, 2174 } 2175 2176 self.task_manager.add_task(event) 2177 except Exception as e: 2178 self.log.exception(e) 2179 finally: 2180 return StatefulSpanClient( 2181 self.client, 2182 span_id, 2183 StateType.OBSERVATION, 2184 self.trace_id, 2185 task_manager=self.task_manager, 2186 ) 2187 2188 @overload 2189 def score( 2190 self, 2191 *, 2192 id: typing.Optional[str] = None, 2193 name: str, 2194 value: float, 2195 data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None, 2196 comment: typing.Optional[str] = None, 2197 config_id: typing.Optional[str] = None, 2198 **kwargs, 2199 ) -> "StatefulClient": ... 2200 2201 @overload 2202 def score( 2203 self, 2204 *, 2205 id: typing.Optional[str] = None, 2206 name: str, 2207 value: str, 2208 data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL", 2209 comment: typing.Optional[str] = None, 2210 config_id: typing.Optional[str] = None, 2211 **kwargs, 2212 ) -> "StatefulClient": ... 2213 2214 def score( 2215 self, 2216 *, 2217 id: typing.Optional[str] = None, 2218 name: str, 2219 value: typing.Union[float, str], 2220 data_type: typing.Optional[ScoreDataType] = None, 2221 comment: typing.Optional[str] = None, 2222 config_id: typing.Optional[str] = None, 2223 **kwargs, 2224 ) -> "StatefulClient": 2225 """Create a score attached for the current observation or trace. 2226 2227 Args: 2228 name (str): Identifier of the score. 2229 value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores. 2230 data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present. 2231 When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores. 2232 comment (Optional[str]): Additional context/explanation of the score. 2233 id (Optional[str]): The id of the score. If not provided, a new UUID is generated. 2234 config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None. 2235 **kwargs: Additional keyword arguments to include in the score. 2236 2237 Returns: 2238 StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining. 2239 2240 Example: 2241 ```python 2242 from langfuse import Langfuse 2243 2244 langfuse = Langfuse() 2245 2246 # Create a trace 2247 trace = langfuse.trace(name="example-application") 2248 2249 # Add score to the trace 2250 trace = trace.score( 2251 name="user-explicit-feedback", 2252 value=0.8, 2253 comment="I like how personalized the response is" 2254 ) 2255 ``` 2256 """ 2257 score_id = id or str(uuid.uuid4()) 2258 try: 2259 new_score = { 2260 "id": score_id, 2261 "trace_id": self.trace_id, 2262 "name": name, 2263 "value": value, 2264 "data_type": data_type, 2265 "comment": comment, 2266 "config_id": config_id, 2267 **kwargs, 2268 } 2269 2270 self.log.debug(f"Creating score {new_score}...") 2271 2272 new_dict = self._add_state_to_event(new_score) 2273 2274 if self.state_type == StateType.OBSERVATION: 2275 new_dict["observationId"] = self.id 2276 2277 request = ScoreBody(**new_dict) 2278 2279 event = { 2280 "id": str(uuid.uuid4()), 2281 "type": "score-create", 2282 "body": request, 2283 } 2284 2285 self.task_manager.add_task(event) 2286 2287 except Exception as e: 2288 self.log.exception(e) 2289 finally: 2290 return StatefulClient( 2291 self.client, 2292 self.id, 2293 self.state_type, 2294 self.trace_id, 2295 task_manager=self.task_manager, 2296 ) 2297 2298 def event( 2299 self, 2300 *, 2301 id: typing.Optional[str] = None, 2302 name: typing.Optional[str] = None, 2303 start_time: typing.Optional[dt.datetime] = None, 2304 metadata: typing.Optional[typing.Any] = None, 2305 input: typing.Optional[typing.Any] = None, 2306 output: typing.Optional[typing.Any] = None, 2307 level: typing.Optional[SpanLevel] = None, 2308 status_message: typing.Optional[str] = None, 2309 version: typing.Optional[str] = None, 2310 **kwargs, 2311 ) -> "StatefulClient": 2312 """Create an event nested within the current observation or trace. 2313 2314 An event represents a discrete event in a trace. 2315 2316 Args: 2317 id (Optional[str]): The id of the event can be set, otherwise a random id is generated. 2318 name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. 2319 start_time (Optional[datetime]): The time at which the event started, defaults to the current time. 2320 metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. 2321 input (Optional[Any]): The input to the event. Can be any JSON object. 2322 output (Optional[Any]): The output to the event. Can be any JSON object. 2323 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2324 status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. 2325 version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. 2326 **kwargs: Additional keyword arguments to include in the event. 2327 2328 Returns: 2329 StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations. 2330 2331 Example: 2332 ```python 2333 from langfuse import Langfuse 2334 2335 langfuse = Langfuse() 2336 2337 # Create a trace 2338 trace = langfuse.trace(name = "llm-feature") 2339 2340 # Create an event 2341 retrieval = trace.event(name = "retrieval") 2342 ``` 2343 """ 2344 event_id = id or str(uuid.uuid4()) 2345 try: 2346 event_body = { 2347 "id": event_id, 2348 "name": name, 2349 "start_time": start_time or _get_timestamp(), 2350 "metadata": metadata, 2351 "input": input, 2352 "output": output, 2353 "level": level, 2354 "status_message": status_message, 2355 "version": version, 2356 **kwargs, 2357 } 2358 2359 new_dict = self._add_state_to_event(event_body) 2360 new_body = self._add_default_values(new_dict) 2361 2362 request = CreateEventBody(**new_body) 2363 2364 event = { 2365 "id": str(uuid.uuid4()), 2366 "type": "event-create", 2367 "body": request, 2368 } 2369 2370 self.log.debug( 2371 f"Creating event {_filter_io_from_event_body(event_body)}..." 2372 ) 2373 self.task_manager.add_task(event) 2374 2375 except Exception as e: 2376 self.log.exception(e) 2377 finally: 2378 return StatefulClient( 2379 self.client, 2380 event_id, 2381 StateType.OBSERVATION, 2382 self.trace_id, 2383 self.task_manager, 2384 ) 2385 2386 def get_trace_url(self): 2387 """Get the URL to see the current trace in the Langfuse UI.""" 2388 return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}" 2389 2390 2391class StatefulGenerationClient(StatefulClient): 2392 """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient. 2393 2394 This client extends the capabilities of the StatefulClient to specifically handle generation, 2395 allowing for the creation, update, and termination of generation processes in Langfuse. 2396 2397 Attributes: 2398 client (FernLangfuse): Core interface for Langfuse API interaction. 2399 id (str): Unique identifier of the generation. 2400 state_type (StateType): Type of the stateful entity (observation or trace). 2401 trace_id (str): Id of trace associated with the generation. 2402 task_manager (TaskManager): Manager for handling asynchronous tasks. 2403 """ 2404 2405 log = logging.getLogger("langfuse") 2406 2407 def __init__( 2408 self, 2409 client: FernLangfuse, 2410 id: str, 2411 state_type: StateType, 2412 trace_id: str, 2413 task_manager: TaskManager, 2414 ): 2415 """Initialize the StatefulGenerationClient.""" 2416 super().__init__(client, id, state_type, trace_id, task_manager) 2417 2418 # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY 2419 def update( 2420 self, 2421 *, 2422 name: typing.Optional[str] = None, 2423 start_time: typing.Optional[dt.datetime] = None, 2424 end_time: typing.Optional[dt.datetime] = None, 2425 completion_start_time: typing.Optional[dt.datetime] = None, 2426 metadata: typing.Optional[typing.Any] = None, 2427 level: typing.Optional[SpanLevel] = None, 2428 status_message: typing.Optional[str] = None, 2429 version: typing.Optional[str] = None, 2430 model: typing.Optional[str] = None, 2431 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2432 input: typing.Optional[typing.Any] = None, 2433 output: typing.Optional[typing.Any] = None, 2434 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2435 prompt: typing.Optional[PromptClient] = None, 2436 **kwargs, 2437 ) -> "StatefulGenerationClient": 2438 """Update the generation. 2439 2440 Args: 2441 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2442 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2443 end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. 2444 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2445 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2446 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2447 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2448 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2449 model (Optional[str]): The name of the model used for the generation. 2450 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2451 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2452 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2453 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2454 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2455 **kwargs: Additional keyword arguments to include in the generation. 2456 2457 Returns: 2458 StatefulGenerationClient: The updated generation. Passthrough for chaining. 2459 2460 Example: 2461 ```python 2462 from langfuse import Langfuse 2463 2464 langfuse = Langfuse() 2465 2466 # Create a trace 2467 trace = langfuse.trace(name = "llm-feature") 2468 2469 # Create a nested generation in Langfuse 2470 generation = trace.generation(name="summary-generation") 2471 2472 # Update the generation 2473 generation = generation.update(metadata={"interface": "whatsapp"}) 2474 ``` 2475 """ 2476 try: 2477 generation_body = { 2478 "id": self.id, 2479 "trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API 2480 "name": name, 2481 "start_time": start_time, 2482 "metadata": metadata, 2483 "level": level, 2484 "status_message": status_message, 2485 "version": version, 2486 "end_time": end_time, 2487 "completion_start_time": completion_start_time, 2488 "model": model, 2489 "model_parameters": model_parameters, 2490 "input": input, 2491 "output": output, 2492 "usage": _convert_usage_input(usage) if usage is not None else None, 2493 **_create_prompt_context(prompt), 2494 **kwargs, 2495 } 2496 2497 self.log.debug( 2498 f"Update generation {_filter_io_from_event_body(generation_body)}..." 2499 ) 2500 2501 request = UpdateGenerationBody(**generation_body) 2502 2503 event = { 2504 "id": str(uuid.uuid4()), 2505 "type": "generation-update", 2506 "body": request.dict(exclude_none=True, exclude_unset=False), 2507 } 2508 2509 self.log.debug( 2510 f"Update generation {_filter_io_from_event_body(generation_body)}..." 2511 ) 2512 self.task_manager.add_task(event) 2513 2514 except Exception as e: 2515 self.log.exception(e) 2516 finally: 2517 return StatefulGenerationClient( 2518 self.client, 2519 self.id, 2520 StateType.OBSERVATION, 2521 self.trace_id, 2522 task_manager=self.task_manager, 2523 ) 2524 2525 def end( 2526 self, 2527 *, 2528 name: typing.Optional[str] = None, 2529 start_time: typing.Optional[dt.datetime] = None, 2530 end_time: typing.Optional[dt.datetime] = None, 2531 completion_start_time: typing.Optional[dt.datetime] = None, 2532 metadata: typing.Optional[typing.Any] = None, 2533 level: typing.Optional[SpanLevel] = None, 2534 status_message: typing.Optional[str] = None, 2535 version: typing.Optional[str] = None, 2536 model: typing.Optional[str] = None, 2537 model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, 2538 input: typing.Optional[typing.Any] = None, 2539 output: typing.Optional[typing.Any] = None, 2540 usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None, 2541 prompt: typing.Optional[PromptClient] = None, 2542 **kwargs, 2543 ) -> "StatefulGenerationClient": 2544 """End the generation, optionally updating its properties. 2545 2546 Args: 2547 name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. 2548 start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. 2549 end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time. 2550 completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. 2551 metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. 2552 level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2553 status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. 2554 version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2555 model (Optional[str]): The name of the model used for the generation. 2556 model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. 2557 input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. 2558 output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. 2559 usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. 2560 prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. 2561 **kwargs: Additional keyword arguments to include in the generation. 2562 2563 Returns: 2564 StatefulGenerationClient: The ended generation. Passthrough for chaining. 2565 2566 Example: 2567 ```python 2568 from langfuse import Langfuse 2569 2570 langfuse = Langfuse() 2571 2572 # Create a trace 2573 trace = langfuse.trace(name = "llm-feature") 2574 2575 # Create a nested generation in Langfuse 2576 generation = trace.generation(name="summary-generation") 2577 2578 # End the generation and update its properties 2579 generation = generation.end(metadata={"interface": "whatsapp"}) 2580 ``` 2581 """ 2582 return self.update( 2583 name=name, 2584 start_time=start_time, 2585 end_time=end_time or _get_timestamp(), 2586 metadata=metadata, 2587 level=level, 2588 status_message=status_message, 2589 version=version, 2590 completion_start_time=completion_start_time, 2591 model=model, 2592 model_parameters=model_parameters, 2593 input=input, 2594 output=output, 2595 usage=usage, 2596 prompt=prompt, 2597 **kwargs, 2598 ) 2599 2600 2601class StatefulSpanClient(StatefulClient): 2602 """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient. 2603 2604 Attributes: 2605 client (FernLangfuse): Core interface for Langfuse API interaction. 2606 id (str): Unique identifier of the span. 2607 state_type (StateType): Type of the stateful entity (observation or trace). 2608 trace_id (str): Id of trace associated with the span. 2609 task_manager (TaskManager): Manager for handling asynchronous tasks. 2610 """ 2611 2612 log = logging.getLogger("langfuse") 2613 2614 def __init__( 2615 self, 2616 client: FernLangfuse, 2617 id: str, 2618 state_type: StateType, 2619 trace_id: str, 2620 task_manager: TaskManager, 2621 ): 2622 """Initialize the StatefulSpanClient.""" 2623 super().__init__(client, id, state_type, trace_id, task_manager) 2624 2625 # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY 2626 def update( 2627 self, 2628 *, 2629 name: typing.Optional[str] = None, 2630 start_time: typing.Optional[dt.datetime] = None, 2631 end_time: typing.Optional[dt.datetime] = None, 2632 metadata: typing.Optional[typing.Any] = None, 2633 input: typing.Optional[typing.Any] = None, 2634 output: typing.Optional[typing.Any] = None, 2635 level: typing.Optional[SpanLevel] = None, 2636 status_message: typing.Optional[str] = None, 2637 version: typing.Optional[str] = None, 2638 **kwargs, 2639 ) -> "StatefulSpanClient": 2640 """Update the span. 2641 2642 Args: 2643 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2644 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2645 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2646 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2647 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2648 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2649 input (Optional[dict]): The input to the span. Can be any JSON object. 2650 output (Optional[dict]): The output to the span. Can be any JSON object. 2651 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2652 **kwargs: Additional keyword arguments to include in the span. 2653 2654 Returns: 2655 StatefulSpanClient: The updated span. Passthrough for chaining. 2656 2657 Example: 2658 ```python 2659 from langfuse import Langfuse 2660 2661 langfuse = Langfuse() 2662 2663 # Create a trace 2664 trace = langfuse.trace(name = "llm-feature") 2665 2666 # Create a nested span in Langfuse 2667 span = trace.span(name="retrieval") 2668 2669 # Update the span 2670 span = span.update(metadata={"interface": "whatsapp"}) 2671 ``` 2672 """ 2673 try: 2674 span_body = { 2675 "id": self.id, 2676 "trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API 2677 "name": name, 2678 "start_time": start_time, 2679 "metadata": metadata, 2680 "input": input, 2681 "output": output, 2682 "level": level, 2683 "status_message": status_message, 2684 "version": version, 2685 "end_time": end_time, 2686 **kwargs, 2687 } 2688 self.log.debug(f"Update span {_filter_io_from_event_body(span_body)}...") 2689 2690 request = UpdateSpanBody(**span_body) 2691 2692 event = { 2693 "id": str(uuid.uuid4()), 2694 "type": "span-update", 2695 "body": request, 2696 } 2697 2698 self.task_manager.add_task(event) 2699 except Exception as e: 2700 self.log.exception(e) 2701 finally: 2702 return StatefulSpanClient( 2703 self.client, 2704 self.id, 2705 StateType.OBSERVATION, 2706 self.trace_id, 2707 task_manager=self.task_manager, 2708 ) 2709 2710 def end( 2711 self, 2712 *, 2713 name: typing.Optional[str] = None, 2714 start_time: typing.Optional[dt.datetime] = None, 2715 end_time: typing.Optional[dt.datetime] = None, 2716 metadata: typing.Optional[typing.Any] = None, 2717 input: typing.Optional[typing.Any] = None, 2718 output: typing.Optional[typing.Any] = None, 2719 level: typing.Optional[SpanLevel] = None, 2720 status_message: typing.Optional[str] = None, 2721 version: typing.Optional[str] = None, 2722 **kwargs, 2723 ) -> "StatefulSpanClient": 2724 """End the span, optionally updating its properties. 2725 2726 Args: 2727 name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. 2728 start_time (Optional[datetime]): The time at which the span started, defaults to the current time. 2729 end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. 2730 metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. 2731 level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. 2732 status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. 2733 input (Optional[dict]): The input to the span. Can be any JSON object. 2734 output (Optional[dict]): The output to the span. Can be any JSON object. 2735 version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. 2736 **kwargs: Additional keyword arguments to include in the span. 2737 2738 Returns: 2739 StatefulSpanClient: The updated span. Passthrough for chaining. 2740 2741 Example: 2742 ```python 2743 from langfuse import Langfuse 2744 2745 langfuse = Langfuse() 2746 2747 # Create a trace 2748 trace = langfuse.trace(name = "llm-feature") 2749 2750 # Create a nested span in Langfuse 2751 span = trace.span(name="retrieval") 2752 2753 # End the span and update its properties 2754 span = span.end(metadata={"interface": "whatsapp"}) 2755 ``` 2756 """ 2757 try: 2758 span_body = { 2759 "name": name, 2760 "start_time": start_time, 2761 "metadata": metadata, 2762 "input": input, 2763 "output": output, 2764 "level": level, 2765 "status_message": status_message, 2766 "version": version, 2767 "end_time": end_time or _get_timestamp(), 2768 **kwargs, 2769 } 2770 return self.update(**span_body) 2771 2772 except Exception as e: 2773 self.log.warning(e) 2774 finally: 2775 return StatefulSpanClient( 2776 self.client, 2777 self.id, 2778 StateType.OBSERVATION, 2779 self.trace_id, 2780 task_manager=self.task_manager, 2781 ) 2782 2783 def get_langchain_handler(self, update_parent: bool = False): 2784 """Get langchain callback handler associated with the current span. 2785 2786 Args: 2787 update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run. 2788 2789 Returns: 2790 CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient. 2791 """ 2792 from langfuse.callback import CallbackHandler 2793 2794 return CallbackHandler( 2795 stateful_client=self, update_stateful_client=update_parent 2796 ) 2797 2798 2799class StatefulTraceClient(StatefulClient): 2800 """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient. 2801 2802 Attributes: 2803 client (FernLangfuse): Core interface for Langfuse API interaction. 2804 id (str): Unique identifier of the trace. 2805 state_type (StateType): Type of the stateful entity (observation or trace). 2806 trace_id (str): The trace ID associated with this client. 2807 task_manager (TaskManager): Manager for handling asynchronous tasks. 2808 """ 2809 2810 log = logging.getLogger("langfuse") 2811 2812 def __init__( 2813 self, 2814 client: FernLangfuse, 2815 id: str, 2816 state_type: StateType, 2817 trace_id: str, 2818 task_manager: TaskManager, 2819 ): 2820 """Initialize the StatefulTraceClient.""" 2821 super().__init__(client, id, state_type, trace_id, task_manager) 2822 self.task_manager = task_manager 2823 2824 def update( 2825 self, 2826 *, 2827 name: typing.Optional[str] = None, 2828 user_id: typing.Optional[str] = None, 2829 session_id: typing.Optional[str] = None, 2830 version: typing.Optional[str] = None, 2831 release: typing.Optional[str] = None, 2832 input: typing.Optional[typing.Any] = None, 2833 output: typing.Optional[typing.Any] = None, 2834 metadata: typing.Optional[typing.Any] = None, 2835 tags: typing.Optional[typing.List[str]] = None, 2836 public: typing.Optional[bool] = None, 2837 **kwargs, 2838 ) -> "StatefulTraceClient": 2839 """Update the trace. 2840 2841 Args: 2842 name: Identifier of the trace. Useful for sorting/filtering in the UI. 2843 input: The input of the trace. Can be any JSON object. 2844 output: The output of the trace. Can be any JSON object. 2845 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 2846 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 2847 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 2848 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 2849 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 2850 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 2851 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 2852 **kwargs: Additional keyword arguments that can be included in the trace. 2853 2854 Returns: 2855 StatefulTraceClient: The updated trace. Passthrough for chaining. 2856 2857 Example: 2858 ```python 2859 from langfuse import Langfuse 2860 2861 langfuse = Langfuse() 2862 2863 # Create a trace 2864 trace = langfuse.trace( 2865 name="example-application", 2866 user_id="user-1234") 2867 ) 2868 2869 # Update the trace 2870 trace = trace.update( 2871 output={"result": "success"}, 2872 metadata={"interface": "whatsapp"} 2873 ) 2874 ``` 2875 """ 2876 try: 2877 trace_body = { 2878 "id": self.id, 2879 "name": name, 2880 "userId": user_id, 2881 "sessionId": session_id 2882 or kwargs.get("sessionId", None), # backward compatibility 2883 "version": version, 2884 "release": release, 2885 "input": input, 2886 "output": output, 2887 "metadata": metadata, 2888 "public": public, 2889 "tags": tags, 2890 **kwargs, 2891 } 2892 self.log.debug(f"Update trace {_filter_io_from_event_body(trace_body)}...") 2893 2894 request = TraceBody(**trace_body) 2895 2896 event = { 2897 "id": str(uuid.uuid4()), 2898 "type": "trace-create", 2899 "body": request, 2900 } 2901 2902 self.task_manager.add_task(event) 2903 2904 except Exception as e: 2905 self.log.exception(e) 2906 finally: 2907 return StatefulTraceClient( 2908 self.client, 2909 self.id, 2910 StateType.TRACE, 2911 self.trace_id, 2912 task_manager=self.task_manager, 2913 ) 2914 2915 def get_langchain_handler(self, update_parent: bool = False): 2916 """Get langchain callback handler associated with the current trace. 2917 2918 This method creates and returns a CallbackHandler instance, linking it with the current 2919 trace. Use this if you want to group multiple Langchain runs within a single trace. 2920 2921 Args: 2922 update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run. 2923 2924 Raises: 2925 ImportError: If the 'langchain' module is not installed, indicating missing functionality. 2926 2927 Returns: 2928 CallbackHandler: Langchain callback handler linked to the current trace. 2929 2930 Example: 2931 ```python 2932 from langfuse import Langfuse 2933 2934 langfuse = Langfuse() 2935 2936 # Create a trace 2937 trace = langfuse.trace(name = "llm-feature") 2938 2939 # Get a langchain callback handler 2940 handler = trace.get_langchain_handler() 2941 ``` 2942 """ 2943 try: 2944 from langfuse.callback import CallbackHandler 2945 2946 self.log.debug(f"Creating new handler for trace {self.id}") 2947 2948 return CallbackHandler( 2949 stateful_client=self, 2950 debug=self.log.level == logging.DEBUG, 2951 update_stateful_client=update_parent, 2952 ) 2953 except Exception as e: 2954 self.log.exception(e) 2955 2956 def getNewHandler(self): 2957 """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated.""" 2958 return self.get_langchain_handler() 2959 2960 2961class DatasetItemClient: 2962 """Class for managing dataset items in Langfuse. 2963 2964 Args: 2965 id (str): Unique identifier of the dataset item. 2966 status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'. 2967 input (Any): Input data of the dataset item. 2968 expected_output (Optional[Any]): Expected output of the dataset item. 2969 metadata (Optional[Any]): Additional metadata of the dataset item. 2970 source_trace_id (Optional[str]): Identifier of the source trace. 2971 source_observation_id (Optional[str]): Identifier of the source observation. 2972 dataset_id (str): Identifier of the dataset to which this item belongs. 2973 dataset_name (str): Name of the dataset to which this item belongs. 2974 created_at (datetime): Timestamp of dataset item creation. 2975 updated_at (datetime): Timestamp of the last update to the dataset item. 2976 langfuse (Langfuse): Instance of Langfuse client for API interactions. 2977 2978 Example: 2979 ```python 2980 from langfuse import Langfuse 2981 2982 langfuse = Langfuse() 2983 2984 dataset = langfuse.get_dataset("<dataset_name>") 2985 2986 for item in dataset.items: 2987 # Generate a completion using the input of every item 2988 completion, generation = llm_app.run(item.input) 2989 2990 # Evaluate the completion 2991 generation.score( 2992 name="example-score", 2993 value=1 2994 ) 2995 ``` 2996 """ 2997 2998 log = logging.getLogger("langfuse") 2999 3000 id: str 3001 status: DatasetStatus 3002 input: typing.Any 3003 expected_output: typing.Optional[typing.Any] 3004 metadata: Optional[Any] 3005 source_trace_id: typing.Optional[str] 3006 source_observation_id: typing.Optional[str] 3007 dataset_id: str 3008 dataset_name: str 3009 created_at: dt.datetime 3010 updated_at: dt.datetime 3011 3012 langfuse: Langfuse 3013 3014 def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse): 3015 """Initialize the DatasetItemClient.""" 3016 self.id = dataset_item.id 3017 self.status = dataset_item.status 3018 self.input = dataset_item.input 3019 self.expected_output = dataset_item.expected_output 3020 self.metadata = dataset_item.metadata 3021 self.source_trace_id = dataset_item.source_trace_id 3022 self.source_observation_id = dataset_item.source_observation_id 3023 self.dataset_id = dataset_item.dataset_id 3024 self.dataset_name = dataset_item.dataset_name 3025 self.created_at = dataset_item.created_at 3026 self.updated_at = dataset_item.updated_at 3027 3028 self.langfuse = langfuse 3029 3030 def flush(self, observation: StatefulClient, run_name: str): 3031 """Flushes an observations task manager's queue. 3032 3033 Used before creating a dataset run item to ensure all events are persistent. 3034 3035 Args: 3036 observation (StatefulClient): The observation or trace client associated with the dataset item. 3037 run_name (str): The name of the dataset run. 3038 """ 3039 observation.task_manager.flush() 3040 3041 def link( 3042 self, 3043 trace_or_observation: typing.Union[StatefulClient, str, None], 3044 run_name: str, 3045 run_metadata: Optional[Any] = None, 3046 run_description: Optional[str] = None, 3047 trace_id: Optional[str] = None, 3048 observation_id: Optional[str] = None, 3049 ): 3050 """Link the dataset item to observation within a specific dataset run. Creates a dataset run item. 3051 3052 Args: 3053 trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID. 3054 run_name (str): The name of the dataset run. 3055 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3056 run_description (Optional[str]): Description of the dataset run. 3057 trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided. 3058 observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided. 3059 """ 3060 parsed_trace_id: str = None 3061 parsed_observation_id: str = None 3062 3063 if isinstance(trace_or_observation, StatefulClient): 3064 # flush the queue before creating the dataset run item 3065 # to ensure that all events are persisted. 3066 if trace_or_observation.state_type == StateType.TRACE: 3067 parsed_trace_id = trace_or_observation.trace_id 3068 elif trace_or_observation.state_type == StateType.OBSERVATION: 3069 parsed_observation_id = trace_or_observation.id 3070 parsed_trace_id = trace_or_observation.trace_id 3071 # legacy support for observation_id 3072 elif isinstance(trace_or_observation, str): 3073 parsed_observation_id = trace_or_observation 3074 elif trace_or_observation is None: 3075 if trace_id is not None: 3076 parsed_trace_id = trace_id 3077 if observation_id is not None: 3078 parsed_observation_id = observation_id 3079 else: 3080 raise ValueError( 3081 "trace_id must be provided if trace_or_observation is None" 3082 ) 3083 else: 3084 raise ValueError( 3085 "trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item" 3086 ) 3087 3088 self.log.debug( 3089 f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}" 3090 ) 3091 self.langfuse.client.dataset_run_items.create( 3092 request=CreateDatasetRunItemRequest( 3093 runName=run_name, 3094 datasetItemId=self.id, 3095 traceId=parsed_trace_id, 3096 observationId=parsed_observation_id, 3097 metadata=run_metadata, 3098 runDescription=run_description, 3099 ) 3100 ) 3101 3102 def get_langchain_handler( 3103 self, 3104 *, 3105 run_name: str, 3106 run_description: Optional[str] = None, 3107 run_metadata: Optional[Any] = None, 3108 ): 3109 """Create and get a langchain callback handler linked to this dataset item. 3110 3111 Args: 3112 run_name (str): The name of the dataset run to be used in the callback handler. 3113 run_description (Optional[str]): Description of the dataset run. 3114 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3115 3116 Returns: 3117 CallbackHandler: An instance of CallbackHandler linked to the dataset item. 3118 """ 3119 metadata = { 3120 "dataset_item_id": self.id, 3121 "run_name": run_name, 3122 "dataset_id": self.dataset_id, 3123 } 3124 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3125 3126 self.link( 3127 trace, run_name, run_metadata=run_metadata, run_description=run_description 3128 ) 3129 3130 return trace.get_langchain_handler(update_parent=True) 3131 3132 @contextmanager 3133 def observe( 3134 self, 3135 *, 3136 run_name: str, 3137 run_description: Optional[str] = None, 3138 run_metadata: Optional[Any] = None, 3139 trace_id: Optional[str] = None, 3140 ): 3141 """Observes a dataset run within the Langfuse client. 3142 3143 Args: 3144 run_name (str): The name of the dataset run. 3145 root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created. 3146 run_description (Optional[str]): The description of the dataset run. 3147 run_metadata (Optional[Any]): Additional metadata for the dataset run. 3148 3149 Yields: 3150 StatefulTraceClient: The trace associated with the dataset run. 3151 """ 3152 from langfuse.decorators import langfuse_context 3153 3154 root_trace_id = trace_id or str(uuid.uuid4()) 3155 3156 langfuse_context._set_root_trace_id(root_trace_id) 3157 3158 try: 3159 yield root_trace_id 3160 3161 finally: 3162 self.link( 3163 run_name=run_name, 3164 run_metadata=run_metadata, 3165 run_description=run_description, 3166 trace_or_observation=None, 3167 trace_id=root_trace_id, 3168 ) 3169 3170 @contextmanager 3171 def observe_llama_index( 3172 self, 3173 *, 3174 run_name: str, 3175 run_description: Optional[str] = None, 3176 run_metadata: Optional[Any] = None, 3177 llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {}, 3178 ): 3179 """Context manager for observing LlamaIndex operations linked to this dataset item. 3180 3181 This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging 3182 and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all 3183 operations performed within the context are linked to the appropriate dataset item and run in Langfuse. 3184 3185 Args: 3186 run_name (str): The name of the dataset run. 3187 run_description (Optional[str]): Description of the dataset run. Defaults to None. 3188 run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None. 3189 llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass 3190 to the LlamaIndex integration constructor. Defaults to an empty dictionary. 3191 3192 Yields: 3193 LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations. 3194 3195 Example: 3196 ```python 3197 dataset_item = dataset.items[0] 3198 3199 with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler: 3200 # Perform LlamaIndex operations here 3201 some_llama_index_operation() 3202 ``` 3203 3204 Raises: 3205 ImportError: If required modules for LlamaIndex integration are not available. 3206 """ 3207 metadata = { 3208 "dataset_item_id": self.id, 3209 "run_name": run_name, 3210 "dataset_id": self.dataset_id, 3211 } 3212 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3213 self.link( 3214 trace, run_name, run_metadata=run_metadata, run_description=run_description 3215 ) 3216 3217 try: 3218 import llama_index.core 3219 from llama_index.core import Settings 3220 from llama_index.core.callbacks import CallbackManager 3221 3222 from langfuse.llama_index import LlamaIndexCallbackHandler 3223 3224 callback_handler = LlamaIndexCallbackHandler( 3225 **llama_index_integration_constructor_kwargs, 3226 ) 3227 callback_handler.set_root(trace, update_root=True) 3228 3229 # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler 3230 # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it 3231 prev_global_handler = llama_index.core.global_handler 3232 prev_langfuse_handler = None 3233 3234 if isinstance(prev_global_handler, LlamaIndexCallbackHandler): 3235 llama_index.core.global_handler = None 3236 3237 if Settings.callback_manager is None: 3238 Settings.callback_manager = CallbackManager([callback_handler]) 3239 else: 3240 for handler in Settings.callback_manager.handlers: 3241 if isinstance(handler, LlamaIndexCallbackHandler): 3242 prev_langfuse_handler = handler 3243 Settings.callback_manager.remove_handler(handler) 3244 3245 Settings.callback_manager.add_handler(callback_handler) 3246 3247 except Exception as e: 3248 self.log.exception(e) 3249 3250 try: 3251 yield callback_handler 3252 finally: 3253 # Reset the handlers 3254 Settings.callback_manager.remove_handler(callback_handler) 3255 if prev_langfuse_handler is not None: 3256 Settings.callback_manager.add_handler(prev_langfuse_handler) 3257 3258 llama_index.core.global_handler = prev_global_handler 3259 3260 def get_llama_index_handler( 3261 self, 3262 *, 3263 run_name: str, 3264 run_description: Optional[str] = None, 3265 run_metadata: Optional[Any] = None, 3266 llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {}, 3267 ): 3268 """Create and get a llama-index callback handler linked to this dataset item. 3269 3270 Args: 3271 run_name (str): The name of the dataset run to be used in the callback handler. 3272 run_description (Optional[str]): Description of the dataset run. 3273 run_metadata (Optional[Any]): Additional metadata to include in dataset run. 3274 llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor. 3275 3276 Returns: 3277 LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item. 3278 """ 3279 metadata = { 3280 "dataset_item_id": self.id, 3281 "run_name": run_name, 3282 "dataset_id": self.dataset_id, 3283 } 3284 trace = self.langfuse.trace(name="dataset-run", metadata=metadata) 3285 3286 self.link( 3287 trace, run_name, run_metadata=run_metadata, run_description=run_description 3288 ) 3289 3290 try: 3291 from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler 3292 3293 callback_handler = LlamaIndexCallbackHandler( 3294 **llama_index_integration_constructor_kwargs, 3295 ) 3296 callback_handler.set_root(trace, update_root=True) 3297 3298 return callback_handler 3299 except Exception as e: 3300 self.log.exception(e) 3301 3302 3303class DatasetClient: 3304 """Class for managing datasets in Langfuse. 3305 3306 Attributes: 3307 id (str): Unique identifier of the dataset. 3308 name (str): Name of the dataset. 3309 description (Optional[str]): Description of the dataset. 3310 metadata (Optional[typing.Any]): Additional metadata of the dataset. 3311 project_id (str): Identifier of the project to which the dataset belongs. 3312 dataset_name (str): Name of the dataset. 3313 created_at (datetime): Timestamp of dataset creation. 3314 updated_at (datetime): Timestamp of the last update to the dataset. 3315 items (List[DatasetItemClient]): List of dataset items associated with the dataset. 3316 runs (List[str]): List of dataset runs associated with the dataset. Deprecated. 3317 3318 Example: 3319 Print the input of each dataset item in a dataset. 3320 ```python 3321 from langfuse import Langfuse 3322 3323 langfuse = Langfuse() 3324 3325 dataset = langfuse.get_dataset("<dataset_name>") 3326 3327 for item in dataset.items: 3328 print(item.input) 3329 ``` 3330 """ 3331 3332 id: str 3333 name: str 3334 description: Optional[str] 3335 project_id: str 3336 dataset_name: str # for backward compatibility, to be deprecated 3337 metadata: Optional[Any] 3338 created_at: dt.datetime 3339 updated_at: dt.datetime 3340 items: typing.List[DatasetItemClient] 3341 runs: typing.List[str] = [] # deprecated 3342 3343 def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]): 3344 """Initialize the DatasetClient.""" 3345 self.id = dataset.id 3346 self.name = dataset.name 3347 self.description = dataset.description 3348 self.project_id = dataset.project_id 3349 self.metadata = dataset.metadata 3350 self.dataset_name = dataset.name # for backward compatibility, to be deprecated 3351 self.created_at = dataset.created_at 3352 self.updated_at = dataset.updated_at 3353 self.items = items 3354 3355 3356def _filter_io_from_event_body(event_body: Dict[str, Any]): 3357 return { 3358 k: v for k, v in event_body.items() if k not in ("input", "output", "metadata") 3359 }
@dataclass
class
FetchTracesResponse:
85@dataclass 86class FetchTracesResponse: 87 """Response object for fetch_traces method.""" 88 89 data: typing.List[TraceWithDetails] 90 meta: MetaResponse
Response object for fetch_traces method.
FetchTracesResponse( data: List[langfuse.api.TraceWithDetails], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.TraceWithDetails]
@dataclass
class
FetchTraceResponse:
93@dataclass 94class FetchTraceResponse: 95 """Response object for fetch_trace method.""" 96 97 data: TraceWithFullDetails
Response object for fetch_trace method.
FetchTraceResponse( data: langfuse.api.TraceWithFullDetails)
@dataclass
class
FetchObservationsResponse:
100@dataclass 101class FetchObservationsResponse: 102 """Response object for fetch_observations method.""" 103 104 data: typing.List[ObservationsView] 105 meta: MetaResponse
Response object for fetch_observations method.
FetchObservationsResponse( data: List[langfuse.api.ObservationsView], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.ObservationsView]
@dataclass
class
FetchObservationResponse:
108@dataclass 109class FetchObservationResponse: 110 """Response object for fetch_observation method.""" 111 112 data: Observation
Response object for fetch_observation method.
FetchObservationResponse(data: langfuse.api.Observation)
data: langfuse.api.Observation
@dataclass
class
FetchSessionsResponse:
115@dataclass 116class FetchSessionsResponse: 117 """Response object for fetch_sessions method.""" 118 119 data: typing.List[Session] 120 meta: MetaResponse
Response object for fetch_sessions method.
FetchSessionsResponse( data: List[langfuse.api.Session], meta: langfuse.api.resources.utils.resources.pagination.types.meta_response.MetaResponse)
data: List[langfuse.api.Session]
class
Langfuse:
123class Langfuse(object): 124 """Langfuse Python client. 125 126 Attributes: 127 log (logging.Logger): Logger for the Langfuse client. 128 base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction. 129 httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API. 130 client (FernLangfuse): Core interface for Langfuse API interaction. 131 task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks. 132 release (str): Identifies the release number or hash of the application. 133 prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances. 134 135 Example: 136 Initiating the Langfuse client should always be first step to use Langfuse. 137 ```python 138 import os 139 from langfuse import Langfuse 140 141 # Set the public and secret keys as environment variables 142 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 143 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 144 145 # Initialize the Langfuse client using the credentials 146 langfuse = Langfuse() 147 ``` 148 """ 149 150 log = logging.getLogger("langfuse") 151 """Logger for the Langfuse client.""" 152 153 host: str 154 """Host of Langfuse API.""" 155 156 def __init__( 157 self, 158 public_key: Optional[str] = None, 159 secret_key: Optional[str] = None, 160 host: Optional[str] = None, 161 release: Optional[str] = None, 162 debug: bool = False, 163 threads: Optional[int] = None, 164 flush_at: Optional[int] = None, 165 flush_interval: Optional[float] = None, 166 max_retries: Optional[int] = None, 167 timeout: Optional[int] = None, # seconds 168 sdk_integration: Optional[str] = "default", 169 httpx_client: Optional[httpx.Client] = None, 170 enabled: Optional[bool] = True, 171 sample_rate: Optional[float] = None, 172 mask: Optional[MaskFunction] = None, 173 ): 174 """Initialize the Langfuse client. 175 176 Args: 177 public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. 178 secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. 179 host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. 180 release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. 181 debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. 182 threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. 183 flush_at: Max batch size that's sent to the API. 184 flush_interval: Max delay until a new batch is sent to the API. 185 max_retries: Max number of retries in case of API/network errors. 186 timeout: Timeout of API requests in seconds. Defaults to 20 seconds. 187 httpx_client: Pass your own httpx client for more customizability of requests. 188 sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly. 189 enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops. 190 sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable. 191 mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data. 192 193 Raises: 194 ValueError: If public_key or secret_key are not set and not found in environment variables. 195 196 Example: 197 Initiating the Langfuse client should always be first step to use Langfuse. 198 ```python 199 import os 200 from langfuse import Langfuse 201 202 # Set the public and secret keys as environment variables 203 os.environ['LANGFUSE_PUBLIC_KEY'] = public_key 204 os.environ['LANGFUSE_SECRET_KEY'] = secret_key 205 206 # Initialize the Langfuse client using the credentials 207 langfuse = Langfuse() 208 ``` 209 """ 210 self.enabled = enabled 211 public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 212 secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 213 sample_rate = ( 214 sample_rate 215 if sample_rate 216 is not None # needs explicit None check, as 0 is a valid value 217 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 218 ) 219 220 if sample_rate is not None and ( 221 sample_rate > 1 or sample_rate < 0 222 ): # default value 1 will be set in the taskmanager 223 self.enabled = False 224 self.log.warning( 225 "Langfuse client is disabled since the sample rate provided is not between 0 and 1." 226 ) 227 228 threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1)) 229 flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15)) 230 flush_interval = flush_interval or float( 231 os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5) 232 ) 233 234 max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3)) 235 timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20)) 236 237 if not self.enabled: 238 self.log.warning( 239 "Langfuse client is disabled. No observability data will be sent." 240 ) 241 242 elif not public_key: 243 self.enabled = False 244 self.log.warning( 245 "Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 246 ) 247 248 elif not secret_key: 249 self.enabled = False 250 self.log.warning( 251 "Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" 252 ) 253 254 set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True") 255 256 if set_debug is True: 257 # Ensures that debug level messages are logged when debug mode is on. 258 # Otherwise, defaults to WARNING level. 259 # See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided 260 logging.basicConfig() 261 # Set level for all loggers under langfuse package 262 logging.getLogger("langfuse").setLevel(logging.DEBUG) 263 264 clean_logger() 265 else: 266 logging.getLogger("langfuse").setLevel(logging.WARNING) 267 clean_logger() 268 269 self.base_url = ( 270 host 271 if host 272 else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") 273 ) 274 275 self.httpx_client = httpx_client or httpx.Client(timeout=timeout) 276 277 self.client = FernLangfuse( 278 base_url=self.base_url, 279 username=public_key, 280 password=secret_key, 281 x_langfuse_sdk_name="python", 282 x_langfuse_sdk_version=version, 283 x_langfuse_public_key=public_key, 284 httpx_client=self.httpx_client, 285 ) 286 287 langfuse_client = LangfuseClient( 288 public_key=public_key, 289 secret_key=secret_key, 290 base_url=self.base_url, 291 version=version, 292 timeout=timeout, 293 session=self.httpx_client, 294 ) 295 296 args = { 297 "threads": threads, 298 "flush_at": flush_at, 299 "flush_interval": flush_interval, 300 "max_retries": max_retries, 301 "client": langfuse_client, 302 "api_client": self.client, 303 "public_key": public_key, 304 "sdk_name": "python", 305 "sdk_version": version, 306 "sdk_integration": sdk_integration, 307 "enabled": self.enabled, 308 "sample_rate": sample_rate, 309 "mask": mask, 310 } 311 312 self.task_manager = TaskManager(**args) 313 314 self.trace_id = None 315 316 self.release = self._get_release_value(release) 317 318 self.prompt_cache = PromptCache() 319 320 def _get_release_value(self, release: Optional[str] = None) -> Optional[str]: 321 if release: 322 return release 323 elif "LANGFUSE_RELEASE" in os.environ: 324 return os.environ["LANGFUSE_RELEASE"] 325 else: 326 return get_common_release_envs() 327 328 def get_trace_id(self) -> str: 329 """Get the current trace id.""" 330 return self.trace_id 331 332 def get_trace_url(self) -> str: 333 """Get the URL of the current trace to view it in the Langfuse UI.""" 334 return f"{self.base_url}/trace/{self.trace_id}" 335 336 def get_dataset( 337 self, name: str, *, fetch_items_page_size: Optional[int] = 50 338 ) -> "DatasetClient": 339 """Fetch a dataset by its name. 340 341 Args: 342 name (str): The name of the dataset to fetch. 343 fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. 344 345 Returns: 346 DatasetClient: The dataset with the given name. 347 """ 348 try: 349 self.log.debug(f"Getting datasets {name}") 350 dataset = self.client.datasets.get(dataset_name=name) 351 352 dataset_items = [] 353 page = 1 354 while True: 355 new_items = self.client.dataset_items.list( 356 dataset_name=self._url_encode(name), 357 page=page, 358 limit=fetch_items_page_size, 359 ) 360 dataset_items.extend(new_items.data) 361 if new_items.meta.total_pages <= page: 362 break 363 page += 1 364 365 items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] 366 367 return DatasetClient(dataset, items=items) 368 except Exception as e: 369 handle_fern_exception(e) 370 raise e 371 372 def get_dataset_item(self, id: str) -> "DatasetItemClient": 373 """Get the dataset item with the given id.""" 374 try: 375 self.log.debug(f"Getting dataset item {id}") 376 dataset_item = self.client.dataset_items.get(id=id) 377 return DatasetItemClient(dataset_item, langfuse=self) 378 except Exception as e: 379 handle_fern_exception(e) 380 raise e 381 382 def auth_check(self) -> bool: 383 """Check if the provided credentials (public and secret key) are valid. 384 385 Raises: 386 Exception: If no projects were found for the provided credentials. 387 388 Note: 389 This method is blocking. It is discouraged to use it in production code. 390 """ 391 try: 392 projects = self.client.projects.get() 393 self.log.debug( 394 f"Auth check successful, found {len(projects.data)} projects" 395 ) 396 if len(projects.data) == 0: 397 raise Exception( 398 "Auth check failed, no project found for the keys provided." 399 ) 400 return True 401 402 except Exception as e: 403 handle_fern_exception(e) 404 raise e 405 406 def get_dataset_runs( 407 self, 408 dataset_name: str, 409 *, 410 page: typing.Optional[int] = None, 411 limit: typing.Optional[int] = None, 412 ) -> PaginatedDatasetRuns: 413 """Get all dataset runs. 414 415 Args: 416 dataset_name (str): Name of the dataset. 417 page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None. 418 limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50. 419 420 Returns: 421 PaginatedDatasetRuns: The dataset runs. 422 """ 423 try: 424 self.log.debug("Getting dataset runs") 425 return self.client.datasets.get_runs( 426 dataset_name=self._url_encode(dataset_name), page=page, limit=limit 427 ) 428 except Exception as e: 429 handle_fern_exception(e) 430 raise e 431 432 def get_dataset_run( 433 self, 434 dataset_name: str, 435 dataset_run_name: str, 436 ) -> DatasetRunWithItems: 437 """Get a dataset run. 438 439 Args: 440 dataset_name: Name of the dataset. 441 dataset_run_name: Name of the dataset run. 442 443 Returns: 444 DatasetRunWithItems: The dataset run. 445 """ 446 try: 447 self.log.debug( 448 f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}" 449 ) 450 return self.client.datasets.get_run( 451 dataset_name=self._url_encode(dataset_name), 452 run_name=self._url_encode(dataset_run_name), 453 ) 454 except Exception as e: 455 handle_fern_exception(e) 456 raise e 457 458 def create_dataset( 459 self, 460 name: str, 461 description: Optional[str] = None, 462 metadata: Optional[Any] = None, 463 ) -> Dataset: 464 """Create a dataset with the given name on Langfuse. 465 466 Args: 467 name: Name of the dataset to create. 468 description: Description of the dataset. Defaults to None. 469 metadata: Additional metadata. Defaults to None. 470 471 Returns: 472 Dataset: The created dataset as returned by the Langfuse API. 473 """ 474 try: 475 body = CreateDatasetRequest( 476 name=name, description=description, metadata=metadata 477 ) 478 self.log.debug(f"Creating datasets {body}") 479 return self.client.datasets.create(request=body) 480 except Exception as e: 481 handle_fern_exception(e) 482 raise e 483 484 def create_dataset_item( 485 self, 486 dataset_name: str, 487 input: Optional[Any] = None, 488 expected_output: Optional[Any] = None, 489 metadata: Optional[Any] = None, 490 source_trace_id: Optional[str] = None, 491 source_observation_id: Optional[str] = None, 492 status: Optional[DatasetStatus] = None, 493 id: Optional[str] = None, 494 ) -> DatasetItem: 495 """Create a dataset item. 496 497 Upserts if an item with id already exists. 498 499 Args: 500 dataset_name: Name of the dataset in which the dataset item should be created. 501 input: Input data. Defaults to None. Can contain any dict, list or scalar. 502 expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. 503 metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar. 504 source_trace_id: Id of the source trace. Defaults to None. 505 source_observation_id: Id of the source observation. Defaults to None. 506 status: Status of the dataset item. Defaults to ACTIVE for newly created items. 507 id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets. 508 509 Returns: 510 DatasetItem: The created dataset item as returned by the Langfuse API. 511 512 Example: 513 ```python 514 from langfuse import Langfuse 515 516 langfuse = Langfuse() 517 518 # Uploading items to the Langfuse dataset named "capital_cities" 519 langfuse.create_dataset_item( 520 dataset_name="capital_cities", 521 input={"input": {"country": "Italy"}}, 522 expected_output={"expected_output": "Rome"}, 523 metadata={"foo": "bar"} 524 ) 525 ``` 526 """ 527 try: 528 body = CreateDatasetItemRequest( 529 datasetName=dataset_name, 530 input=input, 531 expectedOutput=expected_output, 532 metadata=metadata, 533 sourceTraceId=source_trace_id, 534 sourceObservationId=source_observation_id, 535 status=status, 536 id=id, 537 ) 538 self.log.debug(f"Creating dataset item {body}") 539 return self.client.dataset_items.create(request=body) 540 except Exception as e: 541 handle_fern_exception(e) 542 raise e 543 544 def fetch_trace( 545 self, 546 id: str, 547 ) -> FetchTraceResponse: 548 """Fetch a trace via the Langfuse API by its id. 549 550 Args: 551 id: The id of the trace to fetch. 552 553 Returns: 554 FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`. 555 556 Raises: 557 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 558 """ 559 try: 560 self.log.debug(f"Getting trace {id}") 561 trace = self.client.trace.get(id) 562 return FetchTraceResponse(data=trace) 563 except Exception as e: 564 handle_fern_exception(e) 565 raise e 566 567 def get_trace( 568 self, 569 id: str, 570 ) -> TraceWithFullDetails: 571 """Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead. 572 573 Args: 574 id: The id of the trace to fetch. 575 576 Returns: 577 TraceWithFullDetails: The trace with full details as returned by the Langfuse API. 578 579 Raises: 580 Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. 581 """ 582 warnings.warn( 583 "get_trace is deprecated, use fetch_trace instead.", 584 DeprecationWarning, 585 ) 586 587 try: 588 self.log.debug(f"Getting trace {id}") 589 return self.client.trace.get(id) 590 except Exception as e: 591 handle_fern_exception(e) 592 raise e 593 594 def fetch_traces( 595 self, 596 *, 597 page: Optional[int] = None, 598 limit: Optional[int] = None, 599 user_id: Optional[str] = None, 600 name: Optional[str] = None, 601 session_id: Optional[str] = None, 602 from_timestamp: Optional[dt.datetime] = None, 603 to_timestamp: Optional[dt.datetime] = None, 604 order_by: Optional[str] = None, 605 tags: Optional[Union[str, Sequence[str]]] = None, 606 ) -> FetchTracesResponse: 607 """Fetch a list of traces in the current project matching the given parameters. 608 609 Args: 610 page (Optional[int]): Page number, starts at 1. Defaults to None. 611 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 612 name (Optional[str]): Filter by name of traces. Defaults to None. 613 user_id (Optional[str]): Filter by user_id. Defaults to None. 614 session_id (Optional[str]): Filter by session_id. Defaults to None. 615 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 616 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 617 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 618 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 619 620 Returns: 621 FetchTracesResponse, list of traces on `data` and metadata on `meta`. 622 623 Raises: 624 Exception: If an error occurred during the request. 625 """ 626 try: 627 self.log.debug( 628 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 629 ) 630 res = self.client.trace.list( 631 page=page, 632 limit=limit, 633 name=name, 634 user_id=user_id, 635 session_id=session_id, 636 from_timestamp=from_timestamp, 637 to_timestamp=to_timestamp, 638 order_by=order_by, 639 tags=tags, 640 ) 641 return FetchTracesResponse(data=res.data, meta=res.meta) 642 except Exception as e: 643 handle_fern_exception(e) 644 raise e 645 646 def get_traces( 647 self, 648 *, 649 page: Optional[int] = None, 650 limit: Optional[int] = None, 651 user_id: Optional[str] = None, 652 name: Optional[str] = None, 653 session_id: Optional[str] = None, 654 from_timestamp: Optional[dt.datetime] = None, 655 to_timestamp: Optional[dt.datetime] = None, 656 order_by: Optional[str] = None, 657 tags: Optional[Union[str, Sequence[str]]] = None, 658 ) -> Traces: 659 """Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead. 660 661 Args: 662 page (Optional[int]): Page number, starts at 1. Defaults to None. 663 limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None. 664 name (Optional[str]): Filter by name of traces. Defaults to None. 665 user_id (Optional[str]): Filter by user_id. Defaults to None. 666 session_id (Optional[str]): Filter by session_id. Defaults to None. 667 from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None. 668 to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None. 669 order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None. 670 tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None. 671 672 Returns: 673 List of Traces 674 675 Raises: 676 Exception: If an error occurred during the request. 677 """ 678 warnings.warn( 679 "get_traces is deprecated, use fetch_traces instead.", 680 DeprecationWarning, 681 ) 682 try: 683 self.log.debug( 684 f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}" 685 ) 686 return self.client.trace.list( 687 page=page, 688 limit=limit, 689 name=name, 690 user_id=user_id, 691 session_id=session_id, 692 from_timestamp=from_timestamp, 693 to_timestamp=to_timestamp, 694 order_by=order_by, 695 tags=tags, 696 ) 697 except Exception as e: 698 handle_fern_exception(e) 699 raise e 700 701 def fetch_observations( 702 self, 703 *, 704 page: typing.Optional[int] = None, 705 limit: typing.Optional[int] = None, 706 name: typing.Optional[str] = None, 707 user_id: typing.Optional[str] = None, 708 trace_id: typing.Optional[str] = None, 709 parent_observation_id: typing.Optional[str] = None, 710 from_start_time: typing.Optional[dt.datetime] = None, 711 to_start_time: typing.Optional[dt.datetime] = None, 712 type: typing.Optional[str] = None, 713 ) -> FetchObservationsResponse: 714 """Get a list of observations in the current project matching the given parameters. 715 716 Args: 717 page (Optional[int]): Page number of the observations to return. Defaults to None. 718 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 719 name (Optional[str]): Name of the observations to return. Defaults to None. 720 user_id (Optional[str]): User identifier. Defaults to None. 721 trace_id (Optional[str]): Trace identifier. Defaults to None. 722 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 723 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 724 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 725 type (Optional[str]): Type of the observation. Defaults to None. 726 727 Returns: 728 FetchObservationsResponse, list of observations on `data` and metadata on `meta`. 729 730 Raises: 731 Exception: If an error occurred during the request. 732 """ 733 try: 734 self.log.debug( 735 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 736 ) 737 res = self.client.observations.get_many( 738 page=page, 739 limit=limit, 740 name=name, 741 user_id=user_id, 742 trace_id=trace_id, 743 parent_observation_id=parent_observation_id, 744 from_start_time=from_start_time, 745 to_start_time=to_start_time, 746 type=type, 747 ) 748 return FetchObservationsResponse(data=res.data, meta=res.meta) 749 except Exception as e: 750 self.log.exception(e) 751 raise e 752 753 def get_observations( 754 self, 755 *, 756 page: typing.Optional[int] = None, 757 limit: typing.Optional[int] = None, 758 name: typing.Optional[str] = None, 759 user_id: typing.Optional[str] = None, 760 trace_id: typing.Optional[str] = None, 761 parent_observation_id: typing.Optional[str] = None, 762 from_start_time: typing.Optional[dt.datetime] = None, 763 to_start_time: typing.Optional[dt.datetime] = None, 764 type: typing.Optional[str] = None, 765 ) -> ObservationsViews: 766 """Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead. 767 768 Args: 769 page (Optional[int]): Page number of the observations to return. Defaults to None. 770 limit (Optional[int]): Maximum number of observations to return. Defaults to None. 771 name (Optional[str]): Name of the observations to return. Defaults to None. 772 user_id (Optional[str]): User identifier. Defaults to None. 773 trace_id (Optional[str]): Trace identifier. Defaults to None. 774 parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. 775 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 776 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 777 type (Optional[str]): Type of the observation. Defaults to None. 778 779 Returns: 780 List of ObservationsViews: List of observations in the project matching the given parameters. 781 782 Raises: 783 Exception: If an error occurred during the request. 784 """ 785 warnings.warn( 786 "get_observations is deprecated, use fetch_observations instead.", 787 DeprecationWarning, 788 ) 789 try: 790 self.log.debug( 791 f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}" 792 ) 793 return self.client.observations.get_many( 794 page=page, 795 limit=limit, 796 name=name, 797 user_id=user_id, 798 trace_id=trace_id, 799 parent_observation_id=parent_observation_id, 800 from_start_time=from_start_time, 801 to_start_time=to_start_time, 802 type=type, 803 ) 804 except Exception as e: 805 handle_fern_exception(e) 806 raise e 807 808 def get_generations( 809 self, 810 *, 811 page: typing.Optional[int] = None, 812 limit: typing.Optional[int] = None, 813 name: typing.Optional[str] = None, 814 user_id: typing.Optional[str] = None, 815 trace_id: typing.Optional[str] = None, 816 from_start_time: typing.Optional[dt.datetime] = None, 817 to_start_time: typing.Optional[dt.datetime] = None, 818 parent_observation_id: typing.Optional[str] = None, 819 ) -> ObservationsViews: 820 """Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead. 821 822 Args: 823 page (Optional[int]): Page number of the generations to return. Defaults to None. 824 limit (Optional[int]): Maximum number of generations to return. Defaults to None. 825 name (Optional[str]): Name of the generations to return. Defaults to None. 826 user_id (Optional[str]): User identifier of the generations to return. Defaults to None. 827 trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None. 828 from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None. 829 to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None. 830 parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None. 831 832 Returns: 833 List of ObservationsViews: List of generations in the project matching the given parameters. 834 835 Raises: 836 Exception: If an error occurred during the request. 837 """ 838 warnings.warn( 839 "get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.", 840 DeprecationWarning, 841 ) 842 return self.get_observations( 843 page=page, 844 limit=limit, 845 name=name, 846 user_id=user_id, 847 trace_id=trace_id, 848 parent_observation_id=parent_observation_id, 849 from_start_time=from_start_time, 850 to_start_time=to_start_time, 851 type="GENERATION", 852 ) 853 854 def fetch_observation( 855 self, 856 id: str, 857 ) -> FetchObservationResponse: 858 """Get an observation in the current project with the given identifier. 859 860 Args: 861 id: The identifier of the observation to fetch. 862 863 Returns: 864 FetchObservationResponse: The observation with the given id on `data`. 865 866 Raises: 867 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 868 """ 869 try: 870 self.log.debug(f"Getting observation {id}") 871 observation = self.client.observations.get(id) 872 return FetchObservationResponse(data=observation) 873 except Exception as e: 874 handle_fern_exception(e) 875 raise e 876 877 def get_observation( 878 self, 879 id: str, 880 ) -> Observation: 881 """Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead. 882 883 Args: 884 id: The identifier of the observation to fetch. 885 886 Raises: 887 Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. 888 """ 889 warnings.warn( 890 "get_observation is deprecated, use fetch_observation instead.", 891 DeprecationWarning, 892 ) 893 try: 894 self.log.debug(f"Getting observation {id}") 895 return self.client.observations.get(id) 896 except Exception as e: 897 handle_fern_exception(e) 898 raise e 899 900 def fetch_sessions( 901 self, 902 *, 903 page: typing.Optional[int] = None, 904 limit: typing.Optional[int] = None, 905 from_timestamp: typing.Optional[dt.datetime] = None, 906 to_timestamp: typing.Optional[dt.datetime] = None, 907 ) -> FetchSessionsResponse: 908 """Get a list of sessions in the current project. 909 910 Args: 911 page (Optional[int]): Page number of the sessions to return. Defaults to None. 912 limit (Optional[int]): Maximum number of sessions to return. Defaults to None. 913 from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None. 914 to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None. 915 916 Returns: 917 FetchSessionsResponse, list of sessions on `data` and metadata on `meta`. 918 919 Raises: 920 Exception: If an error occurred during the request. 921 """ 922 try: 923 self.log.debug( 924 f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}" 925 ) 926 res = self.client.sessions.list( 927 page=page, 928 limit=limit, 929 from_timestamp=from_timestamp, 930 to_timestamp=to_timestamp, 931 ) 932 return FetchSessionsResponse(data=res.data, meta=res.meta) 933 except Exception as e: 934 handle_fern_exception(e) 935 raise e 936 937 @overload 938 def get_prompt( 939 self, 940 name: str, 941 version: Optional[int] = None, 942 *, 943 label: Optional[str] = None, 944 type: Literal["chat"], 945 cache_ttl_seconds: Optional[int] = None, 946 fallback: Optional[List[ChatMessageDict]] = None, 947 max_retries: Optional[int] = None, 948 fetch_timeout_seconds: Optional[int] = None, 949 ) -> ChatPromptClient: ... 950 951 @overload 952 def get_prompt( 953 self, 954 name: str, 955 version: Optional[int] = None, 956 *, 957 label: Optional[str] = None, 958 type: Literal["text"] = "text", 959 cache_ttl_seconds: Optional[int] = None, 960 fallback: Optional[str] = None, 961 max_retries: Optional[int] = None, 962 fetch_timeout_seconds: Optional[int] = None, 963 ) -> TextPromptClient: ... 964 965 def get_prompt( 966 self, 967 name: str, 968 version: Optional[int] = None, 969 *, 970 label: Optional[str] = None, 971 type: Literal["chat", "text"] = "text", 972 cache_ttl_seconds: Optional[int] = None, 973 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None, 974 max_retries: Optional[int] = None, 975 fetch_timeout_seconds: Optional[int] = None, 976 ) -> PromptClient: 977 """Get a prompt. 978 979 This method attempts to fetch the requested prompt from the local cache. If the prompt is not found 980 in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again 981 and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will 982 return the expired prompt as a fallback. 983 984 Args: 985 name (str): The name of the prompt to retrieve. 986 987 Keyword Args: 988 version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 989 label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both. 990 cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a 991 keyword argument. If not set, defaults to 60 seconds. Disables caching if set to 0. 992 type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text". 993 fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None. 994 max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds. 995 fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default. 996 997 Returns: 998 The prompt object retrieved from the cache or directly fetched if not cached or expired of type 999 - TextPromptClient, if type argument is 'text'. 1000 - ChatPromptClient, if type argument is 'chat'. 1001 1002 Raises: 1003 Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an 1004 expired prompt in the cache, in which case it logs a warning and returns the expired prompt. 1005 """ 1006 if version is not None and label is not None: 1007 raise ValueError("Cannot specify both version and label at the same time.") 1008 1009 if not name: 1010 raise ValueError("Prompt name cannot be empty.") 1011 1012 cache_key = PromptCache.generate_cache_key(name, version=version, label=label) 1013 bounded_max_retries = self._get_bounded_max_retries( 1014 max_retries, default_max_retries=2, max_retries_upper_bound=4 1015 ) 1016 1017 self.log.debug(f"Getting prompt '{cache_key}'") 1018 cached_prompt = self.prompt_cache.get(cache_key) 1019 1020 if cached_prompt is None or cache_ttl_seconds == 0: 1021 self.log.debug( 1022 f"Prompt '{cache_key}' not found in cache or caching disabled." 1023 ) 1024 try: 1025 return self._fetch_prompt_and_update_cache( 1026 name, 1027 version=version, 1028 label=label, 1029 ttl_seconds=cache_ttl_seconds, 1030 max_retries=bounded_max_retries, 1031 fetch_timeout_seconds=fetch_timeout_seconds, 1032 ) 1033 except Exception as e: 1034 if fallback: 1035 self.log.warning( 1036 f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}" 1037 ) 1038 1039 fallback_client_args = { 1040 "name": name, 1041 "prompt": fallback, 1042 "type": type, 1043 "version": version or 0, 1044 "config": {}, 1045 "labels": [label] if label else [], 1046 "tags": [], 1047 } 1048 1049 if type == "text": 1050 return TextPromptClient( 1051 prompt=Prompt_Text(**fallback_client_args), 1052 is_fallback=True, 1053 ) 1054 1055 if type == "chat": 1056 return ChatPromptClient( 1057 prompt=Prompt_Chat(**fallback_client_args), 1058 is_fallback=True, 1059 ) 1060 1061 raise e 1062 1063 if cached_prompt.is_expired(): 1064 self.log.debug(f"Stale prompt '{cache_key}' found in cache.") 1065 try: 1066 # refresh prompt in background thread, refresh_prompt deduplicates tasks 1067 self.log.debug(f"Refreshing prompt '{cache_key}' in background.") 1068 self.prompt_cache.add_refresh_prompt_task( 1069 cache_key, 1070 lambda: self._fetch_prompt_and_update_cache( 1071 name, 1072 version=version, 1073 label=label, 1074 ttl_seconds=cache_ttl_seconds, 1075 max_retries=bounded_max_retries, 1076 fetch_timeout_seconds=fetch_timeout_seconds, 1077 ), 1078 ) 1079 self.log.debug(f"Returning stale prompt '{cache_key}' from cache.") 1080 # return stale prompt 1081 return cached_prompt.value 1082 1083 except Exception as e: 1084 self.log.warning( 1085 f"Error when refreshing cached prompt '{cache_key}', returning cached version. Error: {e}" 1086 ) 1087 # creation of refresh prompt task failed, return stale prompt 1088 return cached_prompt.value 1089 1090 return cached_prompt.value 1091 1092 def _fetch_prompt_and_update_cache( 1093 self, 1094 name: str, 1095 *, 1096 version: Optional[int] = None, 1097 label: Optional[str] = None, 1098 ttl_seconds: Optional[int] = None, 1099 max_retries: int, 1100 fetch_timeout_seconds, 1101 ) -> PromptClient: 1102 try: 1103 cache_key = PromptCache.generate_cache_key( 1104 name, version=version, label=label 1105 ) 1106 1107 self.log.debug(f"Fetching prompt '{cache_key}' from server...") 1108 1109 @backoff.on_exception( 1110 backoff.constant, Exception, max_tries=max_retries, logger=None 1111 ) 1112 def fetch_prompts(): 1113 return self.client.prompts.get( 1114 self._url_encode(name), 1115 version=version, 1116 label=label, 1117 request_options={ 1118 "timeout_in_seconds": fetch_timeout_seconds, 1119 } 1120 if fetch_timeout_seconds is not None 1121 else None, 1122 ) 1123 1124 prompt_response = fetch_prompts() 1125 1126 if prompt_response.type == "chat": 1127 prompt = ChatPromptClient(prompt_response) 1128 else: 1129 prompt = TextPromptClient(prompt_response) 1130 1131 self.prompt_cache.set(cache_key, prompt, ttl_seconds) 1132 1133 return prompt 1134 1135 except Exception as e: 1136 self.log.error(f"Error while fetching prompt '{cache_key}': {str(e)}") 1137 raise e 1138 1139 def _get_bounded_max_retries( 1140 self, 1141 max_retries: Optional[int], 1142 *, 1143 default_max_retries: int = 2, 1144 max_retries_upper_bound: int = 4, 1145 ) -> int: 1146 if max_retries is None: 1147 return default_max_retries 1148 1149 bounded_max_retries = min( 1150 max(max_retries, 0), 1151 max_retries_upper_bound, 1152 ) 1153 1154 return bounded_max_retries 1155 1156 @overload 1157 def create_prompt( 1158 self, 1159 *, 1160 name: str, 1161 prompt: List[ChatMessageDict], 1162 is_active: Optional[bool] = None, # deprecated 1163 labels: List[str] = [], 1164 tags: Optional[List[str]] = None, 1165 type: Optional[Literal["chat"]], 1166 config: Optional[Any] = None, 1167 ) -> ChatPromptClient: ... 1168 1169 @overload 1170 def create_prompt( 1171 self, 1172 *, 1173 name: str, 1174 prompt: str, 1175 is_active: Optional[bool] = None, # deprecated 1176 labels: List[str] = [], 1177 tags: Optional[List[str]] = None, 1178 type: Optional[Literal["text"]] = "text", 1179 config: Optional[Any] = None, 1180 ) -> TextPromptClient: ... 1181 1182 def create_prompt( 1183 self, 1184 *, 1185 name: str, 1186 prompt: Union[str, List[ChatMessageDict]], 1187 is_active: Optional[bool] = None, # deprecated 1188 labels: List[str] = [], 1189 tags: Optional[List[str]] = None, 1190 type: Optional[Literal["chat", "text"]] = "text", 1191 config: Optional[Any] = None, 1192 ) -> PromptClient: 1193 """Create a new prompt in Langfuse. 1194 1195 Keyword Args: 1196 name : The name of the prompt to be created. 1197 prompt : The content of the prompt to be created. 1198 is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead. 1199 labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label. 1200 tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt. 1201 config: Additional structured data to be saved with the prompt. Defaults to None. 1202 type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text". 1203 1204 Returns: 1205 TextPromptClient: The prompt if type argument is 'text'. 1206 ChatPromptClient: The prompt if type argument is 'chat'. 1207 """ 1208 try: 1209 self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}") 1210 1211 # Handle deprecated is_active flag 1212 if is_active: 1213 self.log.warning( 1214 "The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead." 1215 ) 1216 1217 labels = labels if "production" in labels else labels + ["production"] 1218 1219 if type == "chat": 1220 if not isinstance(prompt, list): 1221 raise ValueError( 1222 "For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes." 1223 ) 1224 request = CreatePromptRequest_Chat( 1225 name=name, 1226 prompt=prompt, 1227 labels=labels, 1228 tags=tags, 1229 config=config or {}, 1230 type="chat", 1231 ) 1232 server_prompt = self.client.prompts.create(request=request) 1233 1234 return ChatPromptClient(prompt=server_prompt) 1235 1236 if not isinstance(prompt, str): 1237 raise ValueError("For 'text' type, 'prompt' must be a string.") 1238 1239 request = CreatePromptRequest_Text( 1240 name=name, 1241 prompt=prompt, 1242 labels=labels, 1243 tags=tags, 1244 config=config or {}, 1245 type="text", 1246 ) 1247 1248 server_prompt = self.client.prompts.create(request=request) 1249 return TextPromptClient(prompt=server_prompt) 1250 1251 except Exception as e: 1252 handle_fern_exception(e) 1253 raise e 1254 1255 def _url_encode(self, url: str) -> str: 1256 return urllib.parse.quote(url) 1257 1258 def trace( 1259 self, 1260 *, 1261 id: typing.Optional[str] = None, 1262 name: typing.Optional[str] = None, 1263 user_id: typing.Optional[str] = None, 1264 session_id: typing.Optional[str] = None, 1265 version: typing.Optional[str] = None, 1266 input: typing.Optional[typing.Any] = None, 1267 output: typing.Optional[typing.Any] = None, 1268 metadata: typing.Optional[typing.Any] = None, 1269 tags: typing.Optional[typing.List[str]] = None, 1270 timestamp: typing.Optional[dt.datetime] = None, 1271 public: typing.Optional[bool] = None, 1272 **kwargs, 1273 ) -> "StatefulTraceClient": 1274 """Create a trace. 1275 1276 Args: 1277 id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id. 1278 name: Identifier of the trace. Useful for sorting/filtering in the UI. 1279 input: The input of the trace. Can be any JSON object. 1280 output: The output of the trace. Can be any JSON object. 1281 metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 1282 user_id: The id of the user that triggered the execution. Used to provide user-level analytics. 1283 session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 1284 version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 1285 release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. 1286 tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. 1287 timestamp: The timestamp of the trace. Defaults to the current time if not provided. 1288 public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 1289 **kwargs: Additional keyword arguments that can be included in the trace. 1290 1291 Returns: 1292 StatefulTraceClient: The created trace. 1293 1294 Example: 1295 ```python 1296 from langfuse import Langfuse 1297 1298 langfuse = Langfuse() 1299 1300 trace = langfuse.trace( 1301 name="example-application", 1302 user_id="user-1234") 1303 ) 1304 ``` 1305 """ 1306 new_id = id or str(uuid.uuid4()) 1307 self.trace_id = new_id 1308 try: 1309 new_dict = { 1310 "id": new_id, 1311 "name": name, 1312 "userId": user_id, 1313 "sessionId": session_id 1314 or kwargs.get("sessionId", None), # backward compatibility 1315 "release": self.release, 1316 "version": version, 1317 "metadata": metadata, 1318 "input": input, 1319 "output": output, 1320 "tags": tags, 1321 "timestamp": timestamp or _get_timestamp(), 1322 "public": public, 1323 } 1324 if kwargs is not None: 1325 new_dict.update(kwargs) 1326 1327 new_body = TraceBody(**new_dict) 1328 1329 self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}") 1330 event = { 1331 "id": str(uuid.uuid4()), 1332 "type": "trace-create", 1333 "body": new_body, 1334 } 1335 1336 self.task_manager.add_task( 1337 event, 1338 ) 1339 1340 except Exception as e: 1341 self.log.exception(e) 1342 finally: 1343 self._log_memory_usage() 1344 1345 return StatefulTraceClient( 1346 self.client, new_id, StateType.TRACE, new_id, self.task_manager 1347 ) 1348 1349 def _log_memory_usage(self): 1350 try: 1351 is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0))) 1352 report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0)) 1353 top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10)) 1354 1355 if ( 1356 not is_malloc_tracing_enabled 1357 or report_interval <= 0 1358 or round(time.monotonic()) % report_interval != 0 1359 ): 1360 return 1361 1362 snapshot = tracemalloc.take_snapshot().statistics("lineno") 1363 1364 total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024 1365 me