langfuse.llama_index
56@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"]) 57class LlamaIndexCallbackHandler( 58 LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler 59): 60 """LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future.""" 61 62 log = logging.getLogger("langfuse") 63 64 def __init__( 65 self, 66 *, 67 public_key: Optional[str] = None, 68 secret_key: Optional[str] = None, 69 host: Optional[str] = None, 70 debug: bool = False, 71 session_id: Optional[str] = None, 72 user_id: Optional[str] = None, 73 trace_name: Optional[str] = None, 74 release: Optional[str] = None, 75 version: Optional[str] = None, 76 tags: Optional[List[str]] = None, 77 metadata: Optional[Any] = None, 78 threads: Optional[int] = None, 79 flush_at: Optional[int] = None, 80 flush_interval: Optional[int] = None, 81 max_retries: Optional[int] = None, 82 timeout: Optional[int] = None, 83 event_starts_to_ignore: Optional[List[CBEventType]] = None, 84 event_ends_to_ignore: Optional[List[CBEventType]] = None, 85 tokenizer: Optional[Callable[[str], list]] = None, 86 enabled: Optional[bool] = None, 87 httpx_client: Optional[httpx.Client] = None, 88 sdk_integration: Optional[str] = None, 89 ) -> None: 90 LlamaIndexBaseCallbackHandler.__init__( 91 self, 92 event_starts_to_ignore=event_starts_to_ignore or [], 93 event_ends_to_ignore=event_ends_to_ignore or [], 94 ) 95 LangfuseBaseCallbackHandler.__init__( 96 self, 97 public_key=public_key, 98 secret_key=secret_key, 99 host=host, 100 debug=debug, 101 session_id=session_id, 102 user_id=user_id, 103 trace_name=trace_name, 104 release=release, 105 version=version, 106 tags=tags, 107 metadata=metadata, 108 threads=threads, 109 flush_at=flush_at, 110 flush_interval=flush_interval, 111 max_retries=max_retries, 112 timeout=timeout, 113 enabled=enabled, 114 httpx_client=httpx_client, 115 sdk_integration=sdk_integration or "llama-index_callback", 116 ) 117 118 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 119 self._llama_index_trace_name: Optional[str] = None 120 self._token_counter = TokenCounter(tokenizer) 121 122 # For stream-chat, the last LLM end_event arrives after the trace has ended 123 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 124 self._orphaned_LLM_generations: Dict[ 125 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 126 ] = {} 127 128 def set_root( 129 self, 130 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 131 *, 132 update_root: bool = False, 133 ) -> None: 134 """Set the root trace or span for the callback handler. 135 136 Args: 137 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 138 be used for all following operations. 139 140 Keyword Args: 141 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 142 143 Returns: 144 None 145 """ 146 context_root.set(root) 147 148 if root is None: 149 self.trace = None 150 self.root_span = None 151 self._task_manager = self.langfuse.task_manager if self.langfuse else None 152 153 return 154 155 if isinstance(root, StatefulTraceClient): 156 self.trace = root 157 158 elif isinstance(root, StatefulSpanClient): 159 self.root_span = root 160 self.trace = StatefulTraceClient( 161 root.client, 162 root.trace_id, 163 StateType.TRACE, 164 root.trace_id, 165 root.task_manager, 166 ) 167 168 self._task_manager = root.task_manager 169 self.update_stateful_client = update_root 170 171 def set_trace_params( 172 self, 173 name: Optional[str] = None, 174 user_id: Optional[str] = None, 175 session_id: Optional[str] = None, 176 version: Optional[str] = None, 177 release: Optional[str] = None, 178 metadata: Optional[Any] = None, 179 tags: Optional[List[str]] = None, 180 public: Optional[bool] = None, 181 ): 182 """Set the trace params that will be used for all following operations. 183 184 Allows setting params of subsequent traces at any point in the code. 185 Overwrites the default params set in the callback constructor. 186 187 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 188 189 Attributes: 190 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 191 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 192 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 193 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 194 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 195 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 196 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 197 198 199 Returns: 200 None 201 """ 202 context_trace_metadata.set( 203 { 204 "name": name, 205 "user_id": user_id, 206 "session_id": session_id, 207 "version": version, 208 "release": release, 209 "metadata": metadata, 210 "tags": tags, 211 "public": public, 212 } 213 ) 214 215 def start_trace(self, trace_id: Optional[str] = None) -> None: 216 """Run when an overall trace is launched.""" 217 self._llama_index_trace_name = trace_id 218 219 def end_trace( 220 self, 221 trace_id: Optional[str] = None, 222 trace_map: Optional[Dict[str, List[str]]] = None, 223 ) -> None: 224 """Run when an overall trace is exited.""" 225 if not trace_map: 226 self.log.debug("No events in trace map to create the observation tree.") 227 return 228 229 # Generate Langfuse observations after trace has ended and full trace_map is available. 230 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 231 # Timestamps remain accurate as they are set at the time of the event. 232 self._create_observations_from_trace_map( 233 event_id=BASE_TRACE_EVENT, trace_map=trace_map 234 ) 235 self._update_trace_data(trace_map=trace_map) 236 237 def on_event_start( 238 self, 239 event_type: CBEventType, 240 payload: Optional[Dict[str, Any]] = None, 241 event_id: str = "", 242 parent_id: str = "", 243 **kwargs: Any, 244 ) -> str: 245 """Run when an event starts and return id of event.""" 246 start_event = CallbackEvent( 247 event_id=event_id, event_type=event_type, payload=payload 248 ) 249 self.event_map[event_id].append(start_event) 250 251 return event_id 252 253 def on_event_end( 254 self, 255 event_type: CBEventType, 256 payload: Optional[Dict[str, Any]] = None, 257 event_id: str = "", 258 **kwargs: Any, 259 ) -> None: 260 """Run when an event ends.""" 261 end_event = CallbackEvent( 262 event_id=event_id, event_type=event_type, payload=payload 263 ) 264 self.event_map[event_id].append(end_event) 265 266 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 267 generation, trace = self._orphaned_LLM_generations[event_id] 268 self._handle_orphaned_LLM_end_event( 269 end_event, generation=generation, trace=trace 270 ) 271 del self._orphaned_LLM_generations[event_id] 272 273 def _create_observations_from_trace_map( 274 self, 275 event_id: str, 276 trace_map: Dict[str, List[str]], 277 parent: Optional[ 278 Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient] 279 ] = None, 280 ) -> None: 281 """Recursively create langfuse observations based on the trace_map.""" 282 if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id): 283 return 284 285 if event_id == BASE_TRACE_EVENT: 286 observation = self._get_root_observation() 287 else: 288 observation = self._create_observation( 289 event_id=event_id, parent=parent, trace_id=self.trace.id 290 ) 291 292 for child_event_id in trace_map.get(event_id, []): 293 self._create_observations_from_trace_map( 294 event_id=child_event_id, parent=observation, trace_map=trace_map 295 ) 296 297 def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]: 298 user_provided_root = context_root.get() 299 300 # Get trace metadata from contextvars or use default values 301 trace_metadata = context_trace_metadata.get() 302 name = ( 303 trace_metadata["name"] 304 or self.trace_name 305 or f"LlamaIndex_{self._llama_index_trace_name}" 306 ) 307 version = trace_metadata["version"] or self.version 308 release = trace_metadata["release"] or self.release 309 session_id = trace_metadata["session_id"] or self.session_id 310 user_id = trace_metadata["user_id"] or self.user_id 311 metadata = trace_metadata["metadata"] or self.metadata 312 tags = trace_metadata["tags"] or self.tags 313 public = trace_metadata["public"] or None 314 315 # Make sure that if a user-provided root is set, it has been set in the same trace 316 # and it's not a root from a different trace 317 if ( 318 user_provided_root is not None 319 and self.trace 320 and self.trace.id == user_provided_root.trace_id 321 ): 322 if self.update_stateful_client: 323 user_provided_root.update( 324 name=name, 325 version=version, 326 session_id=session_id, 327 user_id=user_id, 328 metadata=metadata, 329 tags=tags, 330 release=release, 331 public=public, 332 ) 333 334 return user_provided_root 335 336 else: 337 self.trace = self.langfuse.trace( 338 id=str(uuid4()), 339 name=name, 340 version=version, 341 session_id=session_id, 342 user_id=user_id, 343 metadata=metadata, 344 tags=tags, 345 release=release, 346 public=public, 347 ) 348 349 return self.trace 350 351 def _create_observation( 352 self, 353 event_id: str, 354 parent: Union[ 355 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 356 ], 357 trace_id: str, 358 ) -> Union[StatefulSpanClient, StatefulGenerationClient]: 359 event_type = self.event_map[event_id][0].event_type 360 361 if event_type == CBEventType.LLM: 362 return self._handle_LLM_events(event_id, parent, trace_id) 363 elif event_type == CBEventType.EMBEDDING: 364 return self._handle_embedding_events(event_id, parent, trace_id) 365 else: 366 return self._handle_span_events(event_id, parent, trace_id) 367 368 def _handle_LLM_events( 369 self, 370 event_id: str, 371 parent: Union[ 372 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 373 ], 374 trace_id: str, 375 ) -> StatefulGenerationClient: 376 events = self.event_map[event_id] 377 start_event, end_event = events[0], events[-1] 378 379 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 380 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 381 name = serialized.get("class_name", "LLM") 382 temperature = serialized.get("temperature", None) 383 max_tokens = serialized.get("max_tokens", None) 384 timeout = serialized.get("timeout", None) 385 386 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 387 parsed_metadata = self._parse_metadata_from_event(end_event) 388 389 generation = parent.generation( 390 id=event_id, 391 trace_id=trace_id, 392 version=self.version, 393 name=name, 394 start_time=start_event.time, 395 metadata=parsed_metadata, 396 model_parameters={ 397 "temperature": temperature, 398 "max_tokens": max_tokens, 399 "request_timeout": timeout, 400 }, 401 **parsed_end_payload, 402 ) 403 404 # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id 405 if len(events) == 1: 406 self._orphaned_LLM_generations[event_id] = (generation, self.trace) 407 408 return generation 409 410 def _handle_orphaned_LLM_end_event( 411 self, 412 end_event: CallbackEvent, 413 generation: StatefulGenerationClient, 414 trace: StatefulTraceClient, 415 ) -> None: 416 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 417 418 generation.update( 419 **parsed_end_payload, 420 ) 421 422 if generation.trace_id != trace.id: 423 raise ValueError( 424 f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}" 425 ) 426 427 trace.update(output=parsed_end_payload["output"]) 428 429 def _parse_LLM_end_event_payload( 430 self, end_event: CallbackEvent 431 ) -> ParsedLLMEndPayload: 432 result: ParsedLLMEndPayload = { 433 "input": None, 434 "output": None, 435 "usage": None, 436 "model": None, 437 "end_time": end_event.time, 438 } 439 440 if not end_event.payload: 441 return result 442 443 result["input"] = self._parse_input_from_event(end_event) 444 result["output"] = self._parse_output_from_event(end_event) 445 result["model"], result["usage"] = self._parse_usage_from_event_payload( 446 end_event.payload 447 ) 448 449 return result 450 451 def _parse_usage_from_event_payload(self, event_payload: Dict): 452 model = usage = None 453 454 if not ( 455 EventPayload.MESSAGES in event_payload 456 and EventPayload.RESPONSE in event_payload 457 ): 458 return model, usage 459 460 response = event_payload.get(EventPayload.RESPONSE) 461 462 if hasattr(response, "raw") and response.raw is not None: 463 model = response.raw.get("model", None) 464 token_usage = response.raw.get("usage", {}) 465 466 if token_usage: 467 usage = { 468 "input": getattr(token_usage, "prompt_tokens", None), 469 "output": getattr(token_usage, "completion_tokens", None), 470 "total": getattr(token_usage, "total_tokens", None), 471 } 472 473 return model, usage 474 475 def _handle_embedding_events( 476 self, 477 event_id: str, 478 parent: Union[ 479 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 480 ], 481 trace_id: str, 482 ) -> StatefulGenerationClient: 483 events = self.event_map[event_id] 484 start_event, end_event = events[0], events[-1] 485 486 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 487 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 488 name = serialized.get("class_name", "Embedding") 489 model = serialized.get("model_name", None) 490 timeout = serialized.get("timeout", None) 491 492 if end_event.payload: 493 chunks = end_event.payload.get(EventPayload.CHUNKS, []) 494 token_count = sum( 495 self._token_counter.get_string_tokens(chunk) for chunk in chunks 496 ) 497 498 usage = { 499 "input": 0, 500 "output": 0, 501 "total": token_count or None, 502 } 503 504 input = self._parse_input_from_event(end_event) 505 output = self._parse_output_from_event(end_event) 506 507 generation = parent.generation( 508 id=event_id, 509 trace_id=trace_id, 510 name=name, 511 start_time=start_event.time, 512 end_time=end_event.time, 513 version=self.version, 514 model=model, 515 input=input, 516 output=output, 517 usage=usage or None, 518 model_parameters={ 519 "request_timeout": timeout, 520 }, 521 ) 522 523 return generation 524 525 def _handle_span_events( 526 self, 527 event_id: str, 528 parent: Union[ 529 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 530 ], 531 trace_id: str, 532 ) -> StatefulSpanClient: 533 start_event, end_event = self.event_map[event_id] 534 535 extracted_input = self._parse_input_from_event(start_event) 536 extracted_output = self._parse_output_from_event(end_event) 537 extracted_metadata = self._parse_metadata_from_event(end_event) 538 539 metadata = ( 540 extracted_metadata if extracted_output != extracted_metadata else None 541 ) 542 543 span = parent.span( 544 id=event_id, 545 trace_id=trace_id, 546 start_time=start_event.time, 547 name=start_event.event_type.value, 548 version=self.version, 549 session_id=self.session_id, 550 input=extracted_input, 551 output=extracted_output, 552 metadata=metadata, 553 ) 554 555 if end_event: 556 span.end(end_time=end_event.time) 557 558 return span 559 560 def _update_trace_data(self, trace_map): 561 context_root_value = context_root.get() 562 if context_root_value and not self.update_stateful_client: 563 return 564 565 child_event_ids = trace_map.get(BASE_TRACE_EVENT, []) 566 if not child_event_ids: 567 return 568 569 event_pair = self.event_map.get(child_event_ids[0]) 570 if not event_pair or len(event_pair) < 2: 571 return 572 573 start_event, end_event = event_pair 574 input = self._parse_input_from_event(start_event) 575 output = self._parse_output_from_event(end_event) 576 577 if input or output: 578 if context_root_value and self.update_stateful_client: 579 context_root_value.update(input=input, output=output) 580 else: 581 self.trace.update(input=input, output=output) 582 583 def _parse_input_from_event(self, event: CallbackEvent): 584 if event.payload is None: 585 return 586 587 payload = event.payload.copy() 588 589 if EventPayload.SERIALIZED in payload: 590 # Always pop Serialized from payload as it may contain LLM api keys 591 payload.pop(EventPayload.SERIALIZED) 592 593 if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload: 594 chunks = payload.get(EventPayload.CHUNKS) 595 return {"num_chunks": len(chunks)} 596 597 if ( 598 event.event_type == CBEventType.NODE_PARSING 599 and EventPayload.DOCUMENTS in payload 600 ): 601 documents = payload.pop(EventPayload.DOCUMENTS) 602 payload["documents"] = [doc.metadata for doc in documents] 603 return payload 604 605 for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]: 606 if key in payload: 607 return payload.get(key) 608 609 return payload or None 610 611 def _parse_output_from_event(self, event: CallbackEvent): 612 if event.payload is None: 613 return 614 615 payload = event.payload.copy() 616 617 if EventPayload.SERIALIZED in payload: 618 # Always pop Serialized from payload as it may contain LLM api keys 619 payload.pop(EventPayload.SERIALIZED) 620 621 if ( 622 event.event_type == CBEventType.EMBEDDING 623 and EventPayload.EMBEDDINGS in payload 624 ): 625 embeddings = payload.get(EventPayload.EMBEDDINGS) 626 return {"num_embeddings": len(embeddings)} 627 628 if ( 629 event.event_type == CBEventType.NODE_PARSING 630 and EventPayload.NODES in payload 631 ): 632 nodes = payload.pop(EventPayload.NODES) 633 payload["num_nodes"] = len(nodes) 634 return payload 635 636 if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload: 637 chunks = payload.pop(EventPayload.CHUNKS) 638 payload["num_chunks"] = len(chunks) 639 640 if EventPayload.COMPLETION in payload: 641 return payload.get(EventPayload.COMPLETION) 642 643 if EventPayload.RESPONSE in payload: 644 response = payload.get(EventPayload.RESPONSE) 645 646 # Skip streaming responses as consuming them would block the user's execution path 647 if "Streaming" in type(response).__name__: 648 return None 649 650 if hasattr(response, "response"): 651 return response.response 652 653 if hasattr(response, "message"): 654 output = dict(response.message) 655 if "additional_kwargs" in output: 656 if "tool_calls" in output["additional_kwargs"]: 657 output["tool_calls"] = output["additional_kwargs"]["tool_calls"] 658 659 del output["additional_kwargs"] 660 661 return output 662 663 return payload or None 664 665 def _parse_metadata_from_event(self, event: CallbackEvent): 666 if event.payload is None: 667 return 668 669 metadata = {} 670 671 for key in event.payload.keys(): 672 if key not in [ 673 EventPayload.MESSAGES, 674 EventPayload.QUERY_STR, 675 EventPayload.PROMPT, 676 EventPayload.COMPLETION, 677 EventPayload.SERIALIZED, 678 "additional_kwargs", 679 ]: 680 if key != EventPayload.RESPONSE: 681 metadata[key] = event.payload[key] 682 else: 683 response = event.payload.get(EventPayload.RESPONSE) 684 685 if "Streaming" in type(response).__name__: 686 continue 687 688 for res_key, value in vars(response).items(): 689 if ( 690 not res_key.startswith("_") 691 and res_key 692 not in [ 693 "response", 694 "response_txt", 695 "message", 696 "additional_kwargs", 697 "delta", 698 "raw", 699 ] 700 and not isinstance(value, Generator) 701 ): 702 metadata[res_key] = value 703 704 return metadata or None
LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future.
64 def __init__( 65 self, 66 *, 67 public_key: Optional[str] = None, 68 secret_key: Optional[str] = None, 69 host: Optional[str] = None, 70 debug: bool = False, 71 session_id: Optional[str] = None, 72 user_id: Optional[str] = None, 73 trace_name: Optional[str] = None, 74 release: Optional[str] = None, 75 version: Optional[str] = None, 76 tags: Optional[List[str]] = None, 77 metadata: Optional[Any] = None, 78 threads: Optional[int] = None, 79 flush_at: Optional[int] = None, 80 flush_interval: Optional[int] = None, 81 max_retries: Optional[int] = None, 82 timeout: Optional[int] = None, 83 event_starts_to_ignore: Optional[List[CBEventType]] = None, 84 event_ends_to_ignore: Optional[List[CBEventType]] = None, 85 tokenizer: Optional[Callable[[str], list]] = None, 86 enabled: Optional[bool] = None, 87 httpx_client: Optional[httpx.Client] = None, 88 sdk_integration: Optional[str] = None, 89 ) -> None: 90 LlamaIndexBaseCallbackHandler.__init__( 91 self, 92 event_starts_to_ignore=event_starts_to_ignore or [], 93 event_ends_to_ignore=event_ends_to_ignore or [], 94 ) 95 LangfuseBaseCallbackHandler.__init__( 96 self, 97 public_key=public_key, 98 secret_key=secret_key, 99 host=host, 100 debug=debug, 101 session_id=session_id, 102 user_id=user_id, 103 trace_name=trace_name, 104 release=release, 105 version=version, 106 tags=tags, 107 metadata=metadata, 108 threads=threads, 109 flush_at=flush_at, 110 flush_interval=flush_interval, 111 max_retries=max_retries, 112 timeout=timeout, 113 enabled=enabled, 114 httpx_client=httpx_client, 115 sdk_integration=sdk_integration or "llama-index_callback", 116 ) 117 118 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 119 self._llama_index_trace_name: Optional[str] = None 120 self._token_counter = TokenCounter(tokenizer) 121 122 # For stream-chat, the last LLM end_event arrives after the trace has ended 123 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 124 self._orphaned_LLM_generations: Dict[ 125 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 126 ] = {}
Initialize the base callback handler.
128 def set_root( 129 self, 130 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 131 *, 132 update_root: bool = False, 133 ) -> None: 134 """Set the root trace or span for the callback handler. 135 136 Args: 137 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 138 be used for all following operations. 139 140 Keyword Args: 141 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 142 143 Returns: 144 None 145 """ 146 context_root.set(root) 147 148 if root is None: 149 self.trace = None 150 self.root_span = None 151 self._task_manager = self.langfuse.task_manager if self.langfuse else None 152 153 return 154 155 if isinstance(root, StatefulTraceClient): 156 self.trace = root 157 158 elif isinstance(root, StatefulSpanClient): 159 self.root_span = root 160 self.trace = StatefulTraceClient( 161 root.client, 162 root.trace_id, 163 StateType.TRACE, 164 root.trace_id, 165 root.task_manager, 166 ) 167 168 self._task_manager = root.task_manager 169 self.update_stateful_client = update_root
Set the root trace or span for the callback handler.
Arguments:
- root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:
update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
Returns:
None
171 def set_trace_params( 172 self, 173 name: Optional[str] = None, 174 user_id: Optional[str] = None, 175 session_id: Optional[str] = None, 176 version: Optional[str] = None, 177 release: Optional[str] = None, 178 metadata: Optional[Any] = None, 179 tags: Optional[List[str]] = None, 180 public: Optional[bool] = None, 181 ): 182 """Set the trace params that will be used for all following operations. 183 184 Allows setting params of subsequent traces at any point in the code. 185 Overwrites the default params set in the callback constructor. 186 187 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 188 189 Attributes: 190 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 191 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 192 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 193 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 194 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 195 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 196 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 197 198 199 Returns: 200 None 201 """ 202 context_trace_metadata.set( 203 { 204 "name": name, 205 "user_id": user_id, 206 "session_id": session_id, 207 "version": version, 208 "release": release, 209 "metadata": metadata, 210 "tags": tags, 211 "public": public, 212 } 213 )
Set the trace params that will be used for all following operations.
Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.
Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
Attributes:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:
None
215 def start_trace(self, trace_id: Optional[str] = None) -> None: 216 """Run when an overall trace is launched.""" 217 self._llama_index_trace_name = trace_id
Run when an overall trace is launched.
219 def end_trace( 220 self, 221 trace_id: Optional[str] = None, 222 trace_map: Optional[Dict[str, List[str]]] = None, 223 ) -> None: 224 """Run when an overall trace is exited.""" 225 if not trace_map: 226 self.log.debug("No events in trace map to create the observation tree.") 227 return 228 229 # Generate Langfuse observations after trace has ended and full trace_map is available. 230 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 231 # Timestamps remain accurate as they are set at the time of the event. 232 self._create_observations_from_trace_map( 233 event_id=BASE_TRACE_EVENT, trace_map=trace_map 234 ) 235 self._update_trace_data(trace_map=trace_map)
Run when an overall trace is exited.
237 def on_event_start( 238 self, 239 event_type: CBEventType, 240 payload: Optional[Dict[str, Any]] = None, 241 event_id: str = "", 242 parent_id: str = "", 243 **kwargs: Any, 244 ) -> str: 245 """Run when an event starts and return id of event.""" 246 start_event = CallbackEvent( 247 event_id=event_id, event_type=event_type, payload=payload 248 ) 249 self.event_map[event_id].append(start_event) 250 251 return event_id
Run when an event starts and return id of event.
253 def on_event_end( 254 self, 255 event_type: CBEventType, 256 payload: Optional[Dict[str, Any]] = None, 257 event_id: str = "", 258 **kwargs: Any, 259 ) -> None: 260 """Run when an event ends.""" 261 end_event = CallbackEvent( 262 event_id=event_id, event_type=event_type, payload=payload 263 ) 264 self.event_map[event_id].append(end_event) 265 266 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 267 generation, trace = self._orphaned_LLM_generations[event_id] 268 self._handle_orphaned_LLM_end_event( 269 end_event, generation=generation, trace=trace 270 ) 271 del self._orphaned_LLM_generations[event_id]
Run when an event ends.
Inherited Members
- llama_index.core.callbacks.base_handler.BaseCallbackHandler
- event_starts_to_ignore
- event_ends_to_ignore