langfuse.llama_index
57@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"]) 58class LlamaIndexCallbackHandler( 59 LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler 60): 61 """[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.""" 62 63 log = logging.getLogger("langfuse") 64 65 def __init__( 66 self, 67 *, 68 public_key: Optional[str] = None, 69 secret_key: Optional[str] = None, 70 host: Optional[str] = None, 71 debug: bool = False, 72 session_id: Optional[str] = None, 73 user_id: Optional[str] = None, 74 trace_name: Optional[str] = None, 75 release: Optional[str] = None, 76 version: Optional[str] = None, 77 tags: Optional[List[str]] = None, 78 metadata: Optional[Any] = None, 79 threads: Optional[int] = None, 80 flush_at: Optional[int] = None, 81 flush_interval: Optional[int] = None, 82 max_retries: Optional[int] = None, 83 timeout: Optional[int] = None, 84 event_starts_to_ignore: Optional[List[CBEventType]] = None, 85 event_ends_to_ignore: Optional[List[CBEventType]] = None, 86 tokenizer: Optional[Callable[[str], list]] = None, 87 enabled: Optional[bool] = None, 88 httpx_client: Optional[httpx.Client] = None, 89 sdk_integration: Optional[str] = None, 90 sample_rate: Optional[float] = None, 91 ) -> None: 92 LlamaIndexBaseCallbackHandler.__init__( 93 self, 94 event_starts_to_ignore=event_starts_to_ignore or [], 95 event_ends_to_ignore=event_ends_to_ignore or [], 96 ) 97 LangfuseBaseCallbackHandler.__init__( 98 self, 99 public_key=public_key, 100 secret_key=secret_key, 101 host=host, 102 debug=debug, 103 session_id=session_id, 104 user_id=user_id, 105 trace_name=trace_name, 106 release=release, 107 version=version, 108 tags=tags, 109 metadata=metadata, 110 threads=threads, 111 flush_at=flush_at, 112 flush_interval=flush_interval, 113 max_retries=max_retries, 114 timeout=timeout, 115 enabled=enabled, 116 httpx_client=httpx_client, 117 sdk_integration=sdk_integration or "llama-index_callback", 118 sample_rate=sample_rate, 119 ) 120 121 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 122 self._llama_index_trace_name: Optional[str] = None 123 self._token_counter = TokenCounter(tokenizer) 124 125 # For stream-chat, the last LLM end_event arrives after the trace has ended 126 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 127 self._orphaned_LLM_generations: Dict[ 128 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 129 ] = {} 130 131 def set_root( 132 self, 133 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 134 *, 135 update_root: bool = False, 136 ) -> None: 137 """Set the root trace or span for the callback handler. 138 139 Args: 140 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 141 be used for all following operations. 142 143 Keyword Args: 144 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 145 146 Returns: 147 None 148 """ 149 context_root.set(root) 150 151 if root is None: 152 self.trace = None 153 self.root_span = None 154 self._task_manager = self.langfuse.task_manager if self.langfuse else None 155 156 return 157 158 if isinstance(root, StatefulTraceClient): 159 self.trace = root 160 161 elif isinstance(root, StatefulSpanClient): 162 self.root_span = root 163 self.trace = StatefulTraceClient( 164 root.client, 165 root.trace_id, 166 StateType.TRACE, 167 root.trace_id, 168 root.task_manager, 169 ) 170 171 self._task_manager = root.task_manager 172 self.update_stateful_client = update_root 173 174 def set_trace_params( 175 self, 176 name: Optional[str] = None, 177 user_id: Optional[str] = None, 178 session_id: Optional[str] = None, 179 version: Optional[str] = None, 180 release: Optional[str] = None, 181 metadata: Optional[Any] = None, 182 tags: Optional[List[str]] = None, 183 public: Optional[bool] = None, 184 ): 185 """Set the trace params that will be used for all following operations. 186 187 Allows setting params of subsequent traces at any point in the code. 188 Overwrites the default params set in the callback constructor. 189 190 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 191 192 Attributes: 193 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 194 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 195 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 196 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 197 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 198 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 199 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 200 201 202 Returns: 203 None 204 """ 205 context_trace_metadata.set( 206 { 207 "name": name, 208 "user_id": user_id, 209 "session_id": session_id, 210 "version": version, 211 "release": release, 212 "metadata": metadata, 213 "tags": tags, 214 "public": public, 215 } 216 ) 217 218 def start_trace(self, trace_id: Optional[str] = None) -> None: 219 """Run when an overall trace is launched.""" 220 self._llama_index_trace_name = trace_id 221 222 def end_trace( 223 self, 224 trace_id: Optional[str] = None, 225 trace_map: Optional[Dict[str, List[str]]] = None, 226 ) -> None: 227 """Run when an overall trace is exited.""" 228 if not trace_map: 229 self.log.debug("No events in trace map to create the observation tree.") 230 return 231 232 # Generate Langfuse observations after trace has ended and full trace_map is available. 233 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 234 # Timestamps remain accurate as they are set at the time of the event. 235 self._create_observations_from_trace_map( 236 event_id=BASE_TRACE_EVENT, trace_map=trace_map 237 ) 238 self._update_trace_data(trace_map=trace_map) 239 240 def on_event_start( 241 self, 242 event_type: CBEventType, 243 payload: Optional[Dict[str, Any]] = None, 244 event_id: str = "", 245 parent_id: str = "", 246 **kwargs: Any, 247 ) -> str: 248 """Run when an event starts and return id of event.""" 249 start_event = CallbackEvent( 250 event_id=event_id, event_type=event_type, payload=payload 251 ) 252 self.event_map[event_id].append(start_event) 253 254 return event_id 255 256 def on_event_end( 257 self, 258 event_type: CBEventType, 259 payload: Optional[Dict[str, Any]] = None, 260 event_id: str = "", 261 **kwargs: Any, 262 ) -> None: 263 """Run when an event ends.""" 264 end_event = CallbackEvent( 265 event_id=event_id, event_type=event_type, payload=payload 266 ) 267 self.event_map[event_id].append(end_event) 268 269 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 270 generation, trace = self._orphaned_LLM_generations[event_id] 271 self._handle_orphaned_LLM_end_event( 272 end_event, generation=generation, trace=trace 273 ) 274 del self._orphaned_LLM_generations[event_id] 275 276 def _create_observations_from_trace_map( 277 self, 278 event_id: str, 279 trace_map: Dict[str, List[str]], 280 parent: Optional[ 281 Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient] 282 ] = None, 283 ) -> None: 284 """Recursively create langfuse observations based on the trace_map.""" 285 if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id): 286 return 287 288 if event_id == BASE_TRACE_EVENT: 289 observation = self._get_root_observation() 290 else: 291 observation = self._create_observation( 292 event_id=event_id, parent=parent, trace_id=self.trace.id 293 ) 294 295 for child_event_id in trace_map.get(event_id, []): 296 self._create_observations_from_trace_map( 297 event_id=child_event_id, parent=observation, trace_map=trace_map 298 ) 299 300 def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]: 301 user_provided_root = context_root.get() 302 303 # Get trace metadata from contextvars or use default values 304 trace_metadata = context_trace_metadata.get() 305 name = ( 306 trace_metadata["name"] 307 or self.trace_name 308 or f"LlamaIndex_{self._llama_index_trace_name}" 309 ) 310 version = trace_metadata["version"] or self.version 311 release = trace_metadata["release"] or self.release 312 session_id = trace_metadata["session_id"] or self.session_id 313 user_id = trace_metadata["user_id"] or self.user_id 314 metadata = trace_metadata["metadata"] or self.metadata 315 tags = trace_metadata["tags"] or self.tags 316 public = trace_metadata["public"] or None 317 318 # Make sure that if a user-provided root is set, it has been set in the same trace 319 # and it's not a root from a different trace 320 if ( 321 user_provided_root is not None 322 and self.trace 323 and self.trace.id == user_provided_root.trace_id 324 ): 325 if self.update_stateful_client: 326 user_provided_root.update( 327 name=name, 328 version=version, 329 session_id=session_id, 330 user_id=user_id, 331 metadata=metadata, 332 tags=tags, 333 release=release, 334 public=public, 335 ) 336 337 return user_provided_root 338 339 else: 340 self.trace = self.langfuse.trace( 341 id=str(uuid4()), 342 name=name, 343 version=version, 344 session_id=session_id, 345 user_id=user_id, 346 metadata=metadata, 347 tags=tags, 348 release=release, 349 public=public, 350 ) 351 352 return self.trace 353 354 def _create_observation( 355 self, 356 event_id: str, 357 parent: Union[ 358 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 359 ], 360 trace_id: str, 361 ) -> Union[StatefulSpanClient, StatefulGenerationClient]: 362 event_type = self.event_map[event_id][0].event_type 363 364 if event_type == CBEventType.LLM: 365 return self._handle_LLM_events(event_id, parent, trace_id) 366 elif event_type == CBEventType.EMBEDDING: 367 return self._handle_embedding_events(event_id, parent, trace_id) 368 else: 369 return self._handle_span_events(event_id, parent, trace_id) 370 371 def _handle_LLM_events( 372 self, 373 event_id: str, 374 parent: Union[ 375 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 376 ], 377 trace_id: str, 378 ) -> StatefulGenerationClient: 379 events = self.event_map[event_id] 380 start_event, end_event = events[0], events[-1] 381 382 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 383 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 384 name = serialized.get("class_name", "LLM") 385 temperature = serialized.get("temperature", None) 386 max_tokens = serialized.get("max_tokens", None) 387 timeout = serialized.get("timeout", None) 388 389 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 390 parsed_metadata = self._parse_metadata_from_event(end_event) 391 392 generation = parent.generation( 393 id=event_id, 394 trace_id=trace_id, 395 version=self.version, 396 name=name, 397 start_time=start_event.time, 398 metadata=parsed_metadata, 399 model_parameters={ 400 "temperature": temperature, 401 "max_tokens": max_tokens, 402 "request_timeout": timeout, 403 }, 404 **parsed_end_payload, 405 ) 406 407 # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id 408 if len(events) == 1: 409 self._orphaned_LLM_generations[event_id] = (generation, self.trace) 410 411 return generation 412 413 def _handle_orphaned_LLM_end_event( 414 self, 415 end_event: CallbackEvent, 416 generation: StatefulGenerationClient, 417 trace: StatefulTraceClient, 418 ) -> None: 419 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 420 421 generation.update( 422 **parsed_end_payload, 423 ) 424 425 if generation.trace_id != trace.id: 426 raise ValueError( 427 f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}" 428 ) 429 430 trace.update(output=parsed_end_payload["output"]) 431 432 def _parse_LLM_end_event_payload( 433 self, end_event: CallbackEvent 434 ) -> ParsedLLMEndPayload: 435 result: ParsedLLMEndPayload = { 436 "input": None, 437 "output": None, 438 "usage": None, 439 "model": None, 440 "end_time": end_event.time, 441 } 442 443 if not end_event.payload: 444 return result 445 446 result["input"] = self._parse_input_from_event(end_event) 447 result["output"] = self._parse_output_from_event(end_event) 448 result["model"], result["usage"] = self._parse_usage_from_event_payload( 449 end_event.payload 450 ) 451 452 return result 453 454 def _parse_usage_from_event_payload(self, event_payload: Dict): 455 model = usage = None 456 457 if not ( 458 EventPayload.MESSAGES in event_payload 459 and EventPayload.RESPONSE in event_payload 460 ): 461 return model, usage 462 463 response = event_payload.get(EventPayload.RESPONSE) 464 465 if response and hasattr(response, "raw") and response.raw is not None: 466 if isinstance(response.raw, dict): 467 raw_dict = response.raw 468 elif isinstance(response.raw, BaseModel): 469 raw_dict = response.raw.model_dump() 470 else: 471 raw_dict = {} 472 473 model = raw_dict.get("model", None) 474 raw_token_usage = raw_dict.get("usage", {}) 475 476 if isinstance(raw_token_usage, dict): 477 token_usage = raw_token_usage 478 elif isinstance(raw_token_usage, BaseModel): 479 token_usage = raw_token_usage.model_dump() 480 else: 481 token_usage = {} 482 483 if token_usage: 484 usage = { 485 "input": token_usage.get("prompt_tokens", None), 486 "output": token_usage.get("completion_tokens", None), 487 "total": token_usage.get("total_tokens", None), 488 } 489 490 return model, usage 491 492 def _handle_embedding_events( 493 self, 494 event_id: str, 495 parent: Union[ 496 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 497 ], 498 trace_id: str, 499 ) -> StatefulGenerationClient: 500 events = self.event_map[event_id] 501 start_event, end_event = events[0], events[-1] 502 503 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 504 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 505 name = serialized.get("class_name", "Embedding") 506 model = serialized.get("model_name", None) 507 timeout = serialized.get("timeout", None) 508 509 if end_event.payload: 510 chunks = end_event.payload.get(EventPayload.CHUNKS, []) 511 token_count = sum( 512 self._token_counter.get_string_tokens(chunk) for chunk in chunks 513 ) 514 515 usage = { 516 "input": 0, 517 "output": 0, 518 "total": token_count or None, 519 } 520 521 input = self._parse_input_from_event(end_event) 522 output = self._parse_output_from_event(end_event) 523 524 generation = parent.generation( 525 id=event_id, 526 trace_id=trace_id, 527 name=name, 528 start_time=start_event.time, 529 end_time=end_event.time, 530 version=self.version, 531 model=model, 532 input=input, 533 output=output, 534 usage=usage or None, 535 model_parameters={ 536 "request_timeout": timeout, 537 }, 538 ) 539 540 return generation 541 542 def _handle_span_events( 543 self, 544 event_id: str, 545 parent: Union[ 546 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 547 ], 548 trace_id: str, 549 ) -> StatefulSpanClient: 550 events = self.event_map[event_id] 551 start_event, end_event = events[0], events[-1] 552 553 extracted_input = self._parse_input_from_event(start_event) 554 extracted_output = self._parse_output_from_event(end_event) 555 extracted_metadata = self._parse_metadata_from_event(end_event) 556 557 metadata = ( 558 extracted_metadata if extracted_output != extracted_metadata else None 559 ) 560 561 name = start_event.event_type.value 562 563 # Update name to the actual tool's name used by openai agent if available 564 if ( 565 name == "function_call" 566 and start_event.payload 567 and start_event.payload.get("tool", None) 568 ): 569 tool_name = start_event.payload.get("tool", name) 570 name = ( 571 tool_name 572 if isinstance(tool_name, str) 573 else ( 574 tool_name.name 575 if hasattr(tool_name, "name") 576 else tool_name.__class__.__name__ 577 ) 578 ) 579 580 span = parent.span( 581 id=event_id, 582 trace_id=trace_id, 583 start_time=start_event.time, 584 name=name, 585 version=self.version, 586 session_id=self.session_id, 587 input=extracted_input, 588 output=extracted_output, 589 metadata=metadata, 590 ) 591 592 if end_event: 593 span.end(end_time=end_event.time) 594 595 return span 596 597 def _update_trace_data(self, trace_map): 598 context_root_value = context_root.get() 599 if context_root_value and not self.update_stateful_client: 600 return 601 602 child_event_ids = trace_map.get(BASE_TRACE_EVENT, []) 603 if not child_event_ids: 604 return 605 606 event_pair = self.event_map.get(child_event_ids[0]) 607 if not event_pair or len(event_pair) < 2: 608 return 609 610 start_event, end_event = event_pair 611 input = self._parse_input_from_event(start_event) 612 output = self._parse_output_from_event(end_event) 613 614 if input or output: 615 if context_root_value and self.update_stateful_client: 616 context_root_value.update(input=input, output=output) 617 else: 618 self.trace.update(input=input, output=output) 619 620 def _parse_input_from_event(self, event: CallbackEvent): 621 if event.payload is None: 622 return 623 624 payload = event.payload.copy() 625 626 if EventPayload.SERIALIZED in payload: 627 # Always pop Serialized from payload as it may contain LLM api keys 628 payload.pop(EventPayload.SERIALIZED) 629 630 if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload: 631 chunks = payload.get(EventPayload.CHUNKS) 632 return {"num_chunks": len(chunks)} 633 634 if ( 635 event.event_type == CBEventType.NODE_PARSING 636 and EventPayload.DOCUMENTS in payload 637 ): 638 documents = payload.pop(EventPayload.DOCUMENTS) 639 payload["documents"] = [doc.metadata for doc in documents] 640 return payload 641 642 for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]: 643 if key in payload: 644 return payload.get(key) 645 646 return payload or None 647 648 def _parse_output_from_event(self, event: CallbackEvent): 649 if event.payload is None: 650 return 651 652 payload = event.payload.copy() 653 654 if EventPayload.SERIALIZED in payload: 655 # Always pop Serialized from payload as it may contain LLM api keys 656 payload.pop(EventPayload.SERIALIZED) 657 658 if ( 659 event.event_type == CBEventType.EMBEDDING 660 and EventPayload.EMBEDDINGS in payload 661 ): 662 embeddings = payload.get(EventPayload.EMBEDDINGS) 663 return {"num_embeddings": len(embeddings)} 664 665 if ( 666 event.event_type == CBEventType.NODE_PARSING 667 and EventPayload.NODES in payload 668 ): 669 nodes = payload.pop(EventPayload.NODES) 670 payload["num_nodes"] = len(nodes) 671 return payload 672 673 if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload: 674 chunks = payload.pop(EventPayload.CHUNKS) 675 payload["num_chunks"] = len(chunks) 676 677 if EventPayload.COMPLETION in payload: 678 return payload.get(EventPayload.COMPLETION) 679 680 if EventPayload.RESPONSE in payload: 681 response = payload.get(EventPayload.RESPONSE) 682 683 # Skip streaming responses as consuming them would block the user's execution path 684 if "Streaming" in type(response).__name__: 685 return None 686 687 if hasattr(response, "response"): 688 return response.response 689 690 if hasattr(response, "message"): 691 output = dict(response.message) 692 if "additional_kwargs" in output: 693 if "tool_calls" in output["additional_kwargs"]: 694 output["tool_calls"] = output["additional_kwargs"]["tool_calls"] 695 696 del output["additional_kwargs"] 697 698 return output 699 700 return payload or None 701 702 def _parse_metadata_from_event(self, event: CallbackEvent): 703 if event.payload is None: 704 return 705 706 metadata = {} 707 708 for key in event.payload.keys(): 709 if key not in [ 710 EventPayload.MESSAGES, 711 EventPayload.QUERY_STR, 712 EventPayload.PROMPT, 713 EventPayload.COMPLETION, 714 EventPayload.SERIALIZED, 715 "additional_kwargs", 716 ]: 717 if key != EventPayload.RESPONSE: 718 metadata[key] = event.payload[key] 719 else: 720 response = event.payload.get(EventPayload.RESPONSE) 721 722 if "Streaming" in type(response).__name__: 723 continue 724 725 for res_key, value in vars(response).items(): 726 if ( 727 not res_key.startswith("_") 728 and res_key 729 not in [ 730 "response", 731 "response_txt", 732 "message", 733 "additional_kwargs", 734 "delta", 735 "raw", 736 ] 737 and not isinstance(value, Generator) 738 ): 739 metadata[res_key] = value 740 741 return metadata or None
[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.
65 def __init__( 66 self, 67 *, 68 public_key: Optional[str] = None, 69 secret_key: Optional[str] = None, 70 host: Optional[str] = None, 71 debug: bool = False, 72 session_id: Optional[str] = None, 73 user_id: Optional[str] = None, 74 trace_name: Optional[str] = None, 75 release: Optional[str] = None, 76 version: Optional[str] = None, 77 tags: Optional[List[str]] = None, 78 metadata: Optional[Any] = None, 79 threads: Optional[int] = None, 80 flush_at: Optional[int] = None, 81 flush_interval: Optional[int] = None, 82 max_retries: Optional[int] = None, 83 timeout: Optional[int] = None, 84 event_starts_to_ignore: Optional[List[CBEventType]] = None, 85 event_ends_to_ignore: Optional[List[CBEventType]] = None, 86 tokenizer: Optional[Callable[[str], list]] = None, 87 enabled: Optional[bool] = None, 88 httpx_client: Optional[httpx.Client] = None, 89 sdk_integration: Optional[str] = None, 90 sample_rate: Optional[float] = None, 91 ) -> None: 92 LlamaIndexBaseCallbackHandler.__init__( 93 self, 94 event_starts_to_ignore=event_starts_to_ignore or [], 95 event_ends_to_ignore=event_ends_to_ignore or [], 96 ) 97 LangfuseBaseCallbackHandler.__init__( 98 self, 99 public_key=public_key, 100 secret_key=secret_key, 101 host=host, 102 debug=debug, 103 session_id=session_id, 104 user_id=user_id, 105 trace_name=trace_name, 106 release=release, 107 version=version, 108 tags=tags, 109 metadata=metadata, 110 threads=threads, 111 flush_at=flush_at, 112 flush_interval=flush_interval, 113 max_retries=max_retries, 114 timeout=timeout, 115 enabled=enabled, 116 httpx_client=httpx_client, 117 sdk_integration=sdk_integration or "llama-index_callback", 118 sample_rate=sample_rate, 119 ) 120 121 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 122 self._llama_index_trace_name: Optional[str] = None 123 self._token_counter = TokenCounter(tokenizer) 124 125 # For stream-chat, the last LLM end_event arrives after the trace has ended 126 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 127 self._orphaned_LLM_generations: Dict[ 128 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 129 ] = {}
Initialize the base callback handler.
131 def set_root( 132 self, 133 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 134 *, 135 update_root: bool = False, 136 ) -> None: 137 """Set the root trace or span for the callback handler. 138 139 Args: 140 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 141 be used for all following operations. 142 143 Keyword Args: 144 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 145 146 Returns: 147 None 148 """ 149 context_root.set(root) 150 151 if root is None: 152 self.trace = None 153 self.root_span = None 154 self._task_manager = self.langfuse.task_manager if self.langfuse else None 155 156 return 157 158 if isinstance(root, StatefulTraceClient): 159 self.trace = root 160 161 elif isinstance(root, StatefulSpanClient): 162 self.root_span = root 163 self.trace = StatefulTraceClient( 164 root.client, 165 root.trace_id, 166 StateType.TRACE, 167 root.trace_id, 168 root.task_manager, 169 ) 170 171 self._task_manager = root.task_manager 172 self.update_stateful_client = update_root
Set the root trace or span for the callback handler.
Arguments:
- root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:
update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
Returns:
None
174 def set_trace_params( 175 self, 176 name: Optional[str] = None, 177 user_id: Optional[str] = None, 178 session_id: Optional[str] = None, 179 version: Optional[str] = None, 180 release: Optional[str] = None, 181 metadata: Optional[Any] = None, 182 tags: Optional[List[str]] = None, 183 public: Optional[bool] = None, 184 ): 185 """Set the trace params that will be used for all following operations. 186 187 Allows setting params of subsequent traces at any point in the code. 188 Overwrites the default params set in the callback constructor. 189 190 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 191 192 Attributes: 193 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 194 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 195 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 196 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 197 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 198 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 199 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 200 201 202 Returns: 203 None 204 """ 205 context_trace_metadata.set( 206 { 207 "name": name, 208 "user_id": user_id, 209 "session_id": session_id, 210 "version": version, 211 "release": release, 212 "metadata": metadata, 213 "tags": tags, 214 "public": public, 215 } 216 )
Set the trace params that will be used for all following operations.
Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.
Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
Attributes:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:
None
218 def start_trace(self, trace_id: Optional[str] = None) -> None: 219 """Run when an overall trace is launched.""" 220 self._llama_index_trace_name = trace_id
Run when an overall trace is launched.
222 def end_trace( 223 self, 224 trace_id: Optional[str] = None, 225 trace_map: Optional[Dict[str, List[str]]] = None, 226 ) -> None: 227 """Run when an overall trace is exited.""" 228 if not trace_map: 229 self.log.debug("No events in trace map to create the observation tree.") 230 return 231 232 # Generate Langfuse observations after trace has ended and full trace_map is available. 233 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 234 # Timestamps remain accurate as they are set at the time of the event. 235 self._create_observations_from_trace_map( 236 event_id=BASE_TRACE_EVENT, trace_map=trace_map 237 ) 238 self._update_trace_data(trace_map=trace_map)
Run when an overall trace is exited.
240 def on_event_start( 241 self, 242 event_type: CBEventType, 243 payload: Optional[Dict[str, Any]] = None, 244 event_id: str = "", 245 parent_id: str = "", 246 **kwargs: Any, 247 ) -> str: 248 """Run when an event starts and return id of event.""" 249 start_event = CallbackEvent( 250 event_id=event_id, event_type=event_type, payload=payload 251 ) 252 self.event_map[event_id].append(start_event) 253 254 return event_id
Run when an event starts and return id of event.
256 def on_event_end( 257 self, 258 event_type: CBEventType, 259 payload: Optional[Dict[str, Any]] = None, 260 event_id: str = "", 261 **kwargs: Any, 262 ) -> None: 263 """Run when an event ends.""" 264 end_event = CallbackEvent( 265 event_id=event_id, event_type=event_type, payload=payload 266 ) 267 self.event_map[event_id].append(end_event) 268 269 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 270 generation, trace = self._orphaned_LLM_generations[event_id] 271 self._handle_orphaned_LLM_end_event( 272 end_event, generation=generation, trace=trace 273 ) 274 del self._orphaned_LLM_generations[event_id]
Run when an event ends.
Inherited Members
- llama_index.core.callbacks.base_handler.BaseCallbackHandler
- event_starts_to_ignore
- event_ends_to_ignore
28class LlamaIndexInstrumentor: 29 """Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse. 30 31 This beta integration is currently under active development and subject to change. 32 Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931 33 34 For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler). 35 36 Usage: 37 instrumentor = LlamaIndexInstrumentor() 38 instrumentor.start() 39 40 # After calling start(), all LlamaIndex executions will be automatically traced 41 42 # To trace a specific execution or set custom trace ID/params, use the context manager: 43 with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): 44 # Your LlamaIndex code here 45 index = get_llama_index_index() 46 response = index.as_query_engine().query("Your query here") 47 48 instrumentor.flush() 49 50 The instrumentor will automatically capture and log events and spans from LlamaIndex 51 to Langfuse, providing detailed observability into your LLM application. 52 53 Args: 54 public_key (Optional[str]): Langfuse public key 55 secret_key (Optional[str]): Langfuse secret key 56 host (Optional[str]): Langfuse API host 57 debug (Optional[bool]): Enable debug logging 58 threads (Optional[int]): Number of threads for async operations 59 flush_at (Optional[int]): Number of items to flush at 60 flush_interval (Optional[int]): Flush interval in seconds 61 max_retries (Optional[int]): Maximum number of retries for failed requests 62 timeout (Optional[int]): Timeout for requests in seconds 63 httpx_client (Optional[httpx.Client]): Custom HTTPX client 64 enabled (Optional[bool]): Enable/disable the instrumentor 65 sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0) 66 mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data. 67 """ 68 69 def __init__( 70 self, 71 *, 72 public_key: Optional[str] = None, 73 secret_key: Optional[str] = None, 74 host: Optional[str] = None, 75 debug: Optional[bool] = None, 76 threads: Optional[int] = None, 77 flush_at: Optional[int] = None, 78 flush_interval: Optional[int] = None, 79 max_retries: Optional[int] = None, 80 timeout: Optional[int] = None, 81 httpx_client: Optional[httpx.Client] = None, 82 enabled: Optional[bool] = None, 83 sample_rate: Optional[float] = None, 84 mask: Optional[MaskFunction] = None, 85 ): 86 self._langfuse = LangfuseSingleton().get( 87 public_key=public_key, 88 secret_key=secret_key, 89 host=host, 90 debug=debug, 91 threads=threads, 92 flush_at=flush_at, 93 flush_interval=flush_interval, 94 max_retries=max_retries, 95 timeout=timeout, 96 httpx_client=httpx_client, 97 enabled=enabled, 98 sample_rate=sample_rate, 99 mask=mask, 100 sdk_integration="llama-index_instrumentation", 101 ) 102 self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse) 103 self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse) 104 self._context = InstrumentorContext() 105 106 def start(self): 107 """Start the automatic tracing of LlamaIndex operations. 108 109 Once called, all subsequent LlamaIndex executions will be automatically traced 110 and logged to Langfuse without any additional code changes required. 111 112 Example: 113 ```python 114 instrumentor = LlamaIndexInstrumentor() 115 instrumentor.start() 116 117 # From this point, all LlamaIndex operations are automatically traced 118 index = VectorStoreIndex.from_documents(documents) 119 query_engine = index.as_query_engine() 120 response = query_engine.query("What is the capital of France?") 121 122 # The above operations will be automatically logged to Langfuse 123 instrumentor.flush() 124 ``` 125 """ 126 self._context.reset() 127 dispatcher = get_dispatcher() 128 129 # Span Handler 130 if not any( 131 isinstance(handler, type(self._span_handler)) 132 for handler in dispatcher.span_handlers 133 ): 134 dispatcher.add_span_handler(self._span_handler) 135 136 # Event Handler 137 if not any( 138 isinstance(handler, type(self._event_handler)) 139 for handler in dispatcher.event_handlers 140 ): 141 dispatcher.add_event_handler(self._event_handler) 142 143 def stop(self): 144 """Stop the automatic tracing of LlamaIndex operations. 145 146 This method removes the span and event handlers from the LlamaIndex dispatcher, 147 effectively stopping the automatic tracing and logging to Langfuse. 148 149 After calling this method, LlamaIndex operations will no longer be automatically 150 traced unless `start()` is called again. 151 152 Example: 153 ```python 154 instrumentor = LlamaIndexInstrumentor() 155 instrumentor.start() 156 157 # LlamaIndex operations are automatically traced here 158 159 instrumentor.stop() 160 161 # LlamaIndex operations are no longer automatically traced 162 ``` 163 """ 164 self._context.reset() 165 dispatcher = get_dispatcher() 166 167 # Span Handler, in-place filter 168 dispatcher.span_handlers[:] = filter( 169 lambda h: not isinstance(h, type(self._span_handler)), 170 dispatcher.span_handlers, 171 ) 172 173 # Event Handler, in-place filter 174 dispatcher.event_handlers[:] = filter( 175 lambda h: not isinstance(h, type(self._event_handler)), 176 dispatcher.event_handlers, 177 ) 178 179 @contextmanager 180 def observe( 181 self, 182 *, 183 trace_id: Optional[str] = None, 184 parent_observation_id: Optional[str] = None, 185 update_parent: Optional[bool] = None, 186 trace_name: Optional[str] = None, 187 user_id: Optional[str] = None, 188 session_id: Optional[str] = None, 189 version: Optional[str] = None, 190 release: Optional[str] = None, 191 metadata: Optional[Dict[str, Any]] = None, 192 tags: Optional[List[str]] = None, 193 public: Optional[bool] = None, 194 ): 195 """Access context manager for observing and tracing LlamaIndex operations. 196 197 This method allows you to wrap LlamaIndex operations in a context that 198 automatically traces and logs them to Langfuse. It provides fine-grained 199 control over the trace properties and ensures proper instrumentation. 200 201 Args: 202 trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated. 203 parent_observation_id (Optional[str]): ID of the parent observation, if any. 204 update_parent (Optional[bool]): Whether to update the parent trace. 205 trace_name (Optional[str]): Name of the trace. 206 user_id (Optional[str]): ID of the user associated with this trace. 207 session_id (Optional[str]): ID of the session associated with this trace. 208 version (Optional[str]): Version information for this trace. 209 release (Optional[str]): Release information for this trace. 210 metadata (Optional[Dict[str, Any]]): Additional metadata for the trace. 211 tags (Optional[List[str]]): Tags associated with this trace. 212 public (Optional[bool]): Whether this trace should be public. 213 214 Yields: 215 StatefulTraceClient: A client for interacting with the current trace. 216 217 Example: 218 ```python 219 instrumentor = LlamaIndexInstrumentor() 220 221 with instrumentor.observe(trace_id="unique_id", user_id="user123"): 222 # LlamaIndex operations here will be traced 223 index.as_query_engine().query("What is the capital of France?") 224 225 # Tracing stops after the context manager exits 226 227 instrumentor.flush() 228 ``` 229 230 Note: 231 If the instrumentor is not already started, this method will start it 232 for the duration of the context and stop it afterwards. 233 """ 234 was_instrumented = self._is_instrumented 235 236 if not was_instrumented: 237 self.start() 238 239 if parent_observation_id is not None and trace_id is None: 240 logger.warning( 241 "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id." 242 ) 243 parent_observation_id = None 244 245 final_trace_id = trace_id or str(uuid.uuid4()) 246 247 self._context.update( 248 is_user_managed_trace=True, 249 trace_id=final_trace_id, 250 parent_observation_id=parent_observation_id, 251 update_parent=update_parent, 252 trace_name=trace_name, 253 user_id=user_id, 254 session_id=session_id, 255 version=version, 256 release=release, 257 metadata=metadata, 258 tags=tags, 259 public=public, 260 ) 261 262 yield self._get_trace_client(final_trace_id) 263 264 self._context.reset() 265 266 if not was_instrumented: 267 self.stop() 268 269 @property 270 def _is_instrumented(self) -> bool: 271 """Check if the dispatcher is instrumented.""" 272 dispatcher = get_dispatcher() 273 274 return any( 275 isinstance(handler, type(self._span_handler)) 276 for handler in dispatcher.span_handlers 277 ) and any( 278 isinstance(handler, type(self._event_handler)) 279 for handler in dispatcher.event_handlers 280 ) 281 282 def _get_trace_client(self, trace_id: str) -> StatefulTraceClient: 283 return StatefulTraceClient( 284 client=self._langfuse.client, 285 id=trace_id, 286 trace_id=trace_id, 287 task_manager=self._langfuse.task_manager, 288 state_type=StateType.TRACE, 289 ) 290 291 @property 292 def client_instance(self) -> Langfuse: 293 """Return the Langfuse client instance associated with this instrumentor. 294 295 This property provides access to the underlying Langfuse client, allowing 296 direct interaction with Langfuse functionality if needed. 297 298 Returns: 299 Langfuse: The Langfuse client instance. 300 """ 301 return self._langfuse 302 303 def flush(self) -> None: 304 """Flush any pending tasks in the task manager. 305 306 This method ensures that all queued tasks are sent to Langfuse immediately. 307 It's useful for scenarios where you want to guarantee that all instrumentation 308 data has been transmitted before your application terminates or moves on to 309 a different phase. 310 311 Note: 312 This method is a wrapper around the `flush()` method of the underlying 313 Langfuse client instance. It's provided here for convenience and to maintain 314 a consistent interface within the instrumentor. 315 316 Example: 317 ```python 318 instrumentor = LlamaIndexInstrumentor(langfuse_client) 319 # ... perform some operations ... 320 instrumentor.flush() # Ensure all data is sent to Langfuse 321 ``` 322 """ 323 self.client_instance.flush()
Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.
This beta integration is currently under active development and subject to change. Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931
For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).
Usage:
instrumentor = LlamaIndexInstrumentor() instrumentor.start()
After calling start(), all LlamaIndex executions will be automatically traced
To trace a specific execution or set custom trace ID/params, use the context manager:
with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): # Your LlamaIndex code here index = get_llama_index_index() response = index.as_query_engine().query("Your query here")
instrumentor.flush()
The instrumentor will automatically capture and log events and spans from LlamaIndex to Langfuse, providing detailed observability into your LLM application.
Arguments:
- public_key (Optional[str]): Langfuse public key
- secret_key (Optional[str]): Langfuse secret key
- host (Optional[str]): Langfuse API host
- debug (Optional[bool]): Enable debug logging
- threads (Optional[int]): Number of threads for async operations
- flush_at (Optional[int]): Number of items to flush at
- flush_interval (Optional[int]): Flush interval in seconds
- max_retries (Optional[int]): Maximum number of retries for failed requests
- timeout (Optional[int]): Timeout for requests in seconds
- httpx_client (Optional[httpx.Client]): Custom HTTPX client
- enabled (Optional[bool]): Enable/disable the instrumentor
- sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
- mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument
data
and return a serializable, masked version of the data.
69 def __init__( 70 self, 71 *, 72 public_key: Optional[str] = None, 73 secret_key: Optional[str] = None, 74 host: Optional[str] = None, 75 debug: Optional[bool] = None, 76 threads: Optional[int] = None, 77 flush_at: Optional[int] = None, 78 flush_interval: Optional[int] = None, 79 max_retries: Optional[int] = None, 80 timeout: Optional[int] = None, 81 httpx_client: Optional[httpx.Client] = None, 82 enabled: Optional[bool] = None, 83 sample_rate: Optional[float] = None, 84 mask: Optional[MaskFunction] = None, 85 ): 86 self._langfuse = LangfuseSingleton().get( 87 public_key=public_key, 88 secret_key=secret_key, 89 host=host, 90 debug=debug, 91 threads=threads, 92 flush_at=flush_at, 93 flush_interval=flush_interval, 94 max_retries=max_retries, 95 timeout=timeout, 96 httpx_client=httpx_client, 97 enabled=enabled, 98 sample_rate=sample_rate, 99 mask=mask, 100 sdk_integration="llama-index_instrumentation", 101 ) 102 self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse) 103 self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse) 104 self._context = InstrumentorContext()
106 def start(self): 107 """Start the automatic tracing of LlamaIndex operations. 108 109 Once called, all subsequent LlamaIndex executions will be automatically traced 110 and logged to Langfuse without any additional code changes required. 111 112 Example: 113 ```python 114 instrumentor = LlamaIndexInstrumentor() 115 instrumentor.start() 116 117 # From this point, all LlamaIndex operations are automatically traced 118 index = VectorStoreIndex.from_documents(documents) 119 query_engine = index.as_query_engine() 120 response = query_engine.query("What is the capital of France?") 121 122 # The above operations will be automatically logged to Langfuse 123 instrumentor.flush() 124 ``` 125 """ 126 self._context.reset() 127 dispatcher = get_dispatcher() 128 129 # Span Handler 130 if not any( 131 isinstance(handler, type(self._span_handler)) 132 for handler in dispatcher.span_handlers 133 ): 134 dispatcher.add_span_handler(self._span_handler) 135 136 # Event Handler 137 if not any( 138 isinstance(handler, type(self._event_handler)) 139 for handler in dispatcher.event_handlers 140 ): 141 dispatcher.add_event_handler(self._event_handler)
Start the automatic tracing of LlamaIndex operations.
Once called, all subsequent LlamaIndex executions will be automatically traced and logged to Langfuse without any additional code changes required.
Example:
instrumentor = LlamaIndexInstrumentor() instrumentor.start() # From this point, all LlamaIndex operations are automatically traced index = VectorStoreIndex.from_documents(documents) query_engine = index.as_query_engine() response = query_engine.query("What is the capital of France?") # The above operations will be automatically logged to Langfuse instrumentor.flush()
143 def stop(self): 144 """Stop the automatic tracing of LlamaIndex operations. 145 146 This method removes the span and event handlers from the LlamaIndex dispatcher, 147 effectively stopping the automatic tracing and logging to Langfuse. 148 149 After calling this method, LlamaIndex operations will no longer be automatically 150 traced unless `start()` is called again. 151 152 Example: 153 ```python 154 instrumentor = LlamaIndexInstrumentor() 155 instrumentor.start() 156 157 # LlamaIndex operations are automatically traced here 158 159 instrumentor.stop() 160 161 # LlamaIndex operations are no longer automatically traced 162 ``` 163 """ 164 self._context.reset() 165 dispatcher = get_dispatcher() 166 167 # Span Handler, in-place filter 168 dispatcher.span_handlers[:] = filter( 169 lambda h: not isinstance(h, type(self._span_handler)), 170 dispatcher.span_handlers, 171 ) 172 173 # Event Handler, in-place filter 174 dispatcher.event_handlers[:] = filter( 175 lambda h: not isinstance(h, type(self._event_handler)), 176 dispatcher.event_handlers, 177 )
Stop the automatic tracing of LlamaIndex operations.
This method removes the span and event handlers from the LlamaIndex dispatcher, effectively stopping the automatic tracing and logging to Langfuse.
After calling this method, LlamaIndex operations will no longer be automatically
traced unless start()
is called again.
Example:
instrumentor = LlamaIndexInstrumentor() instrumentor.start() # LlamaIndex operations are automatically traced here instrumentor.stop() # LlamaIndex operations are no longer automatically traced
179 @contextmanager 180 def observe( 181 self, 182 *, 183 trace_id: Optional[str] = None, 184 parent_observation_id: Optional[str] = None, 185 update_parent: Optional[bool] = None, 186 trace_name: Optional[str] = None, 187 user_id: Optional[str] = None, 188 session_id: Optional[str] = None, 189 version: Optional[str] = None, 190 release: Optional[str] = None, 191 metadata: Optional[Dict[str, Any]] = None, 192 tags: Optional[List[str]] = None, 193 public: Optional[bool] = None, 194 ): 195 """Access context manager for observing and tracing LlamaIndex operations. 196 197 This method allows you to wrap LlamaIndex operations in a context that 198 automatically traces and logs them to Langfuse. It provides fine-grained 199 control over the trace properties and ensures proper instrumentation. 200 201 Args: 202 trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated. 203 parent_observation_id (Optional[str]): ID of the parent observation, if any. 204 update_parent (Optional[bool]): Whether to update the parent trace. 205 trace_name (Optional[str]): Name of the trace. 206 user_id (Optional[str]): ID of the user associated with this trace. 207 session_id (Optional[str]): ID of the session associated with this trace. 208 version (Optional[str]): Version information for this trace. 209 release (Optional[str]): Release information for this trace. 210 metadata (Optional[Dict[str, Any]]): Additional metadata for the trace. 211 tags (Optional[List[str]]): Tags associated with this trace. 212 public (Optional[bool]): Whether this trace should be public. 213 214 Yields: 215 StatefulTraceClient: A client for interacting with the current trace. 216 217 Example: 218 ```python 219 instrumentor = LlamaIndexInstrumentor() 220 221 with instrumentor.observe(trace_id="unique_id", user_id="user123"): 222 # LlamaIndex operations here will be traced 223 index.as_query_engine().query("What is the capital of France?") 224 225 # Tracing stops after the context manager exits 226 227 instrumentor.flush() 228 ``` 229 230 Note: 231 If the instrumentor is not already started, this method will start it 232 for the duration of the context and stop it afterwards. 233 """ 234 was_instrumented = self._is_instrumented 235 236 if not was_instrumented: 237 self.start() 238 239 if parent_observation_id is not None and trace_id is None: 240 logger.warning( 241 "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id." 242 ) 243 parent_observation_id = None 244 245 final_trace_id = trace_id or str(uuid.uuid4()) 246 247 self._context.update( 248 is_user_managed_trace=True, 249 trace_id=final_trace_id, 250 parent_observation_id=parent_observation_id, 251 update_parent=update_parent, 252 trace_name=trace_name, 253 user_id=user_id, 254 session_id=session_id, 255 version=version, 256 release=release, 257 metadata=metadata, 258 tags=tags, 259 public=public, 260 ) 261 262 yield self._get_trace_client(final_trace_id) 263 264 self._context.reset() 265 266 if not was_instrumented: 267 self.stop()
Access context manager for observing and tracing LlamaIndex operations.
This method allows you to wrap LlamaIndex operations in a context that automatically traces and logs them to Langfuse. It provides fine-grained control over the trace properties and ensures proper instrumentation.
Arguments:
- trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
- parent_observation_id (Optional[str]): ID of the parent observation, if any.
- update_parent (Optional[bool]): Whether to update the parent trace.
- trace_name (Optional[str]): Name of the trace.
- user_id (Optional[str]): ID of the user associated with this trace.
- session_id (Optional[str]): ID of the session associated with this trace.
- version (Optional[str]): Version information for this trace.
- release (Optional[str]): Release information for this trace.
- metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
- tags (Optional[List[str]]): Tags associated with this trace.
- public (Optional[bool]): Whether this trace should be public.
Yields:
StatefulTraceClient: A client for interacting with the current trace.
Example:
instrumentor = LlamaIndexInstrumentor() with instrumentor.observe(trace_id="unique_id", user_id="user123"): # LlamaIndex operations here will be traced index.as_query_engine().query("What is the capital of France?") # Tracing stops after the context manager exits instrumentor.flush()
Note:
If the instrumentor is not already started, this method will start it for the duration of the context and stop it afterwards.
291 @property 292 def client_instance(self) -> Langfuse: 293 """Return the Langfuse client instance associated with this instrumentor. 294 295 This property provides access to the underlying Langfuse client, allowing 296 direct interaction with Langfuse functionality if needed. 297 298 Returns: 299 Langfuse: The Langfuse client instance. 300 """ 301 return self._langfuse
Return the Langfuse client instance associated with this instrumentor.
This property provides access to the underlying Langfuse client, allowing direct interaction with Langfuse functionality if needed.
Returns:
Langfuse: The Langfuse client instance.
303 def flush(self) -> None: 304 """Flush any pending tasks in the task manager. 305 306 This method ensures that all queued tasks are sent to Langfuse immediately. 307 It's useful for scenarios where you want to guarantee that all instrumentation 308 data has been transmitted before your application terminates or moves on to 309 a different phase. 310 311 Note: 312 This method is a wrapper around the `flush()` method of the underlying 313 Langfuse client instance. It's provided here for convenience and to maintain 314 a consistent interface within the instrumentor. 315 316 Example: 317 ```python 318 instrumentor = LlamaIndexInstrumentor(langfuse_client) 319 # ... perform some operations ... 320 instrumentor.flush() # Ensure all data is sent to Langfuse 321 ``` 322 """ 323 self.client_instance.flush()
Flush any pending tasks in the task manager.
This method ensures that all queued tasks are sent to Langfuse immediately. It's useful for scenarios where you want to guarantee that all instrumentation data has been transmitted before your application terminates or moves on to a different phase.
Note:
This method is a wrapper around the
flush()
method of the underlying Langfuse client instance. It's provided here for convenience and to maintain a consistent interface within the instrumentor.
Example:
instrumentor = LlamaIndexInstrumentor(langfuse_client) # ... perform some operations ... instrumentor.flush() # Ensure all data is sent to Langfuse