langfuse.llama_index
59@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"]) 60class LlamaIndexCallbackHandler( 61 LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler 62): 63 """[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.""" 64 65 log = logging.getLogger("langfuse") 66 67 def __init__( 68 self, 69 *, 70 public_key: Optional[str] = None, 71 secret_key: Optional[str] = None, 72 host: Optional[str] = None, 73 debug: bool = False, 74 session_id: Optional[str] = None, 75 user_id: Optional[str] = None, 76 trace_name: Optional[str] = None, 77 release: Optional[str] = None, 78 version: Optional[str] = None, 79 tags: Optional[List[str]] = None, 80 metadata: Optional[Any] = None, 81 threads: Optional[int] = None, 82 flush_at: Optional[int] = None, 83 flush_interval: Optional[int] = None, 84 max_retries: Optional[int] = None, 85 timeout: Optional[int] = None, 86 event_starts_to_ignore: Optional[List[CBEventType]] = None, 87 event_ends_to_ignore: Optional[List[CBEventType]] = None, 88 tokenizer: Optional[Callable[[str], list]] = None, 89 enabled: Optional[bool] = None, 90 httpx_client: Optional[httpx.Client] = None, 91 sdk_integration: Optional[str] = None, 92 sample_rate: Optional[float] = None, 93 ) -> None: 94 LlamaIndexBaseCallbackHandler.__init__( 95 self, 96 event_starts_to_ignore=event_starts_to_ignore or [], 97 event_ends_to_ignore=event_ends_to_ignore or [], 98 ) 99 LangfuseBaseCallbackHandler.__init__( 100 self, 101 public_key=public_key, 102 secret_key=secret_key, 103 host=host, 104 debug=debug, 105 session_id=session_id, 106 user_id=user_id, 107 trace_name=trace_name, 108 release=release, 109 version=version, 110 tags=tags, 111 metadata=metadata, 112 threads=threads, 113 flush_at=flush_at, 114 flush_interval=flush_interval, 115 max_retries=max_retries, 116 timeout=timeout, 117 enabled=enabled, 118 httpx_client=httpx_client, 119 sdk_integration=sdk_integration or "llama-index_callback", 120 sample_rate=sample_rate, 121 ) 122 123 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 124 self._llama_index_trace_name: Optional[str] = None 125 self._token_counter = TokenCounter(tokenizer) 126 127 # For stream-chat, the last LLM end_event arrives after the trace has ended 128 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 129 self._orphaned_LLM_generations: Dict[ 130 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 131 ] = {} 132 133 def set_root( 134 self, 135 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 136 *, 137 update_root: bool = False, 138 ) -> None: 139 """Set the root trace or span for the callback handler. 140 141 Args: 142 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 143 be used for all following operations. 144 145 Keyword Args: 146 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 147 148 Returns: 149 None 150 """ 151 context_root.set(root) 152 153 if root is None: 154 self.trace = None 155 self.root_span = None 156 self._task_manager = self.langfuse.task_manager if self.langfuse else None 157 158 return 159 160 if isinstance(root, StatefulTraceClient): 161 self.trace = root 162 163 elif isinstance(root, StatefulSpanClient): 164 self.root_span = root 165 self.trace = StatefulTraceClient( 166 root.client, 167 root.trace_id, 168 StateType.TRACE, 169 root.trace_id, 170 root.task_manager, 171 root.environment, 172 ) 173 174 self._task_manager = root.task_manager 175 self.update_stateful_client = update_root 176 177 def set_trace_params( 178 self, 179 name: Optional[str] = None, 180 user_id: Optional[str] = None, 181 session_id: Optional[str] = None, 182 version: Optional[str] = None, 183 release: Optional[str] = None, 184 metadata: Optional[Any] = None, 185 tags: Optional[List[str]] = None, 186 public: Optional[bool] = None, 187 ): 188 """Set the trace params that will be used for all following operations. 189 190 Allows setting params of subsequent traces at any point in the code. 191 Overwrites the default params set in the callback constructor. 192 193 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 194 195 Attributes: 196 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 197 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 198 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 199 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 200 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 201 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 202 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 203 204 205 Returns: 206 None 207 """ 208 context_trace_metadata.set( 209 { 210 "name": name, 211 "user_id": user_id, 212 "session_id": session_id, 213 "version": version, 214 "release": release, 215 "metadata": metadata, 216 "tags": tags, 217 "public": public, 218 } 219 ) 220 221 def start_trace(self, trace_id: Optional[str] = None) -> None: 222 """Run when an overall trace is launched.""" 223 self._llama_index_trace_name = trace_id 224 225 def end_trace( 226 self, 227 trace_id: Optional[str] = None, 228 trace_map: Optional[Dict[str, List[str]]] = None, 229 ) -> None: 230 """Run when an overall trace is exited.""" 231 if not trace_map: 232 self.log.debug("No events in trace map to create the observation tree.") 233 return 234 235 # Generate Langfuse observations after trace has ended and full trace_map is available. 236 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 237 # Timestamps remain accurate as they are set at the time of the event. 238 self._create_observations_from_trace_map( 239 event_id=BASE_TRACE_EVENT, trace_map=trace_map 240 ) 241 self._update_trace_data(trace_map=trace_map) 242 243 def on_event_start( 244 self, 245 event_type: CBEventType, 246 payload: Optional[Dict[str, Any]] = None, 247 event_id: str = "", 248 parent_id: str = "", 249 **kwargs: Any, 250 ) -> str: 251 """Run when an event starts and return id of event.""" 252 start_event = CallbackEvent( 253 event_id=event_id, event_type=event_type, payload=payload 254 ) 255 self.event_map[event_id].append(start_event) 256 257 return event_id 258 259 def on_event_end( 260 self, 261 event_type: CBEventType, 262 payload: Optional[Dict[str, Any]] = None, 263 event_id: str = "", 264 **kwargs: Any, 265 ) -> None: 266 """Run when an event ends.""" 267 end_event = CallbackEvent( 268 event_id=event_id, event_type=event_type, payload=payload 269 ) 270 self.event_map[event_id].append(end_event) 271 272 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 273 generation, trace = self._orphaned_LLM_generations[event_id] 274 self._handle_orphaned_LLM_end_event( 275 end_event, generation=generation, trace=trace 276 ) 277 del self._orphaned_LLM_generations[event_id] 278 279 def _create_observations_from_trace_map( 280 self, 281 event_id: str, 282 trace_map: Dict[str, List[str]], 283 parent: Optional[ 284 Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient] 285 ] = None, 286 ) -> None: 287 """Recursively create langfuse observations based on the trace_map.""" 288 if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id): 289 return 290 291 if event_id == BASE_TRACE_EVENT: 292 observation = self._get_root_observation() 293 else: 294 observation = self._create_observation( 295 event_id=event_id, parent=parent, trace_id=self.trace.id 296 ) 297 298 for child_event_id in trace_map.get(event_id, []): 299 self._create_observations_from_trace_map( 300 event_id=child_event_id, parent=observation, trace_map=trace_map 301 ) 302 303 def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]: 304 user_provided_root = context_root.get() 305 306 # Get trace metadata from contextvars or use default values 307 trace_metadata = context_trace_metadata.get() 308 name = ( 309 trace_metadata["name"] 310 or self.trace_name 311 or f"LlamaIndex_{self._llama_index_trace_name}" 312 ) 313 version = trace_metadata["version"] or self.version 314 release = trace_metadata["release"] or self.release 315 session_id = trace_metadata["session_id"] or self.session_id 316 user_id = trace_metadata["user_id"] or self.user_id 317 metadata = trace_metadata["metadata"] or self.metadata 318 tags = trace_metadata["tags"] or self.tags 319 public = trace_metadata["public"] or None 320 321 # Make sure that if a user-provided root is set, it has been set in the same trace 322 # and it's not a root from a different trace 323 if ( 324 user_provided_root is not None 325 and self.trace 326 and self.trace.id == user_provided_root.trace_id 327 ): 328 if self.update_stateful_client: 329 user_provided_root.update( 330 name=name, 331 version=version, 332 session_id=session_id, 333 user_id=user_id, 334 metadata=metadata, 335 tags=tags, 336 release=release, 337 public=public, 338 ) 339 340 return user_provided_root 341 342 else: 343 self.trace = self.langfuse.trace( 344 id=str(uuid4()), 345 name=name, 346 version=version, 347 session_id=session_id, 348 user_id=user_id, 349 metadata=metadata, 350 tags=tags, 351 release=release, 352 public=public, 353 ) 354 355 return self.trace 356 357 def _create_observation( 358 self, 359 event_id: str, 360 parent: Union[ 361 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 362 ], 363 trace_id: str, 364 ) -> Union[StatefulSpanClient, StatefulGenerationClient]: 365 event_type = self.event_map[event_id][0].event_type 366 367 if event_type == CBEventType.LLM: 368 return self._handle_LLM_events(event_id, parent, trace_id) 369 elif event_type == CBEventType.EMBEDDING: 370 return self._handle_embedding_events(event_id, parent, trace_id) 371 else: 372 return self._handle_span_events(event_id, parent, trace_id) 373 374 def _handle_LLM_events( 375 self, 376 event_id: str, 377 parent: Union[ 378 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 379 ], 380 trace_id: str, 381 ) -> StatefulGenerationClient: 382 events = self.event_map[event_id] 383 start_event, end_event = events[0], events[-1] 384 385 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 386 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 387 name = serialized.get("class_name", "LLM") 388 temperature = serialized.get("temperature", None) 389 max_tokens = serialized.get("max_tokens", None) 390 timeout = serialized.get("timeout", None) 391 392 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 393 parsed_metadata = self._parse_metadata_from_event(end_event) 394 395 generation = parent.generation( 396 id=event_id, 397 trace_id=trace_id, 398 version=self.version, 399 name=name, 400 start_time=start_event.time, 401 metadata=parsed_metadata, 402 model_parameters={ 403 "temperature": temperature, 404 "max_tokens": max_tokens, 405 "request_timeout": timeout, 406 }, 407 **parsed_end_payload, 408 ) 409 410 # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id 411 if len(events) == 1: 412 self._orphaned_LLM_generations[event_id] = (generation, self.trace) 413 414 return generation 415 416 def _handle_orphaned_LLM_end_event( 417 self, 418 end_event: CallbackEvent, 419 generation: StatefulGenerationClient, 420 trace: StatefulTraceClient, 421 ) -> None: 422 parsed_end_payload = self._parse_LLM_end_event_payload(end_event) 423 424 generation.update( 425 **parsed_end_payload, 426 ) 427 428 if generation.trace_id != trace.id: 429 raise ValueError( 430 f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}" 431 ) 432 433 trace.update(output=parsed_end_payload["output"]) 434 435 def _parse_LLM_end_event_payload( 436 self, end_event: CallbackEvent 437 ) -> ParsedLLMEndPayload: 438 result: ParsedLLMEndPayload = { 439 "input": None, 440 "output": None, 441 "usage": None, 442 "model": None, 443 "end_time": end_event.time, 444 } 445 446 if not end_event.payload: 447 return result 448 449 result["input"] = self._parse_input_from_event(end_event) 450 result["output"] = self._parse_output_from_event(end_event) 451 result["model"], result["usage"] = self._parse_usage_from_event_payload( 452 end_event.payload 453 ) 454 455 return result 456 457 def _parse_usage_from_event_payload(self, event_payload: Dict): 458 model = usage = None 459 460 if not ( 461 EventPayload.MESSAGES in event_payload 462 and EventPayload.RESPONSE in event_payload 463 ): 464 return model, usage 465 466 response = event_payload.get(EventPayload.RESPONSE) 467 468 if response and hasattr(response, "raw") and response.raw is not None: 469 if isinstance(response.raw, dict): 470 raw_dict = response.raw 471 elif isinstance(response.raw, BaseModel): 472 raw_dict = response.raw.model_dump() 473 else: 474 raw_dict = {} 475 476 model = raw_dict.get("model", None) 477 raw_token_usage = raw_dict.get("usage", {}) 478 479 if isinstance(raw_token_usage, dict): 480 token_usage = raw_token_usage 481 elif isinstance(raw_token_usage, BaseModel): 482 token_usage = raw_token_usage.model_dump() 483 else: 484 token_usage = {} 485 486 if token_usage: 487 usage = { 488 "input": token_usage.get("prompt_tokens", None), 489 "output": token_usage.get("completion_tokens", None), 490 "total": token_usage.get("total_tokens", None), 491 } 492 493 return model, usage 494 495 def _handle_embedding_events( 496 self, 497 event_id: str, 498 parent: Union[ 499 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 500 ], 501 trace_id: str, 502 ) -> StatefulGenerationClient: 503 events = self.event_map[event_id] 504 start_event, end_event = events[0], events[-1] 505 506 if start_event.payload and EventPayload.SERIALIZED in start_event.payload: 507 serialized = start_event.payload.get(EventPayload.SERIALIZED, {}) 508 name = serialized.get("class_name", "Embedding") 509 model = serialized.get("model_name", None) 510 timeout = serialized.get("timeout", None) 511 512 if end_event.payload: 513 chunks = end_event.payload.get(EventPayload.CHUNKS, []) 514 token_count = sum( 515 self._token_counter.get_string_tokens(chunk) for chunk in chunks 516 ) 517 518 usage = { 519 "input": 0, 520 "output": 0, 521 "total": token_count or None, 522 } 523 524 input = self._parse_input_from_event(end_event) 525 output = self._parse_output_from_event(end_event) 526 527 generation = parent.generation( 528 id=event_id, 529 trace_id=trace_id, 530 name=name, 531 start_time=start_event.time, 532 end_time=end_event.time, 533 version=self.version, 534 model=model, 535 input=input, 536 output=output, 537 usage=usage or None, 538 model_parameters={ 539 "request_timeout": timeout, 540 }, 541 ) 542 543 return generation 544 545 def _handle_span_events( 546 self, 547 event_id: str, 548 parent: Union[ 549 StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient 550 ], 551 trace_id: str, 552 ) -> StatefulSpanClient: 553 events = self.event_map[event_id] 554 start_event, end_event = events[0], events[-1] 555 556 extracted_input = self._parse_input_from_event(start_event) 557 extracted_output = self._parse_output_from_event(end_event) 558 extracted_metadata = self._parse_metadata_from_event(end_event) 559 560 metadata = ( 561 extracted_metadata if extracted_output != extracted_metadata else None 562 ) 563 564 name = start_event.event_type.value 565 566 # Update name to the actual tool's name used by openai agent if available 567 if ( 568 name == "function_call" 569 and start_event.payload 570 and start_event.payload.get("tool", None) 571 ): 572 tool_name = start_event.payload.get("tool", name) 573 name = ( 574 tool_name 575 if isinstance(tool_name, str) 576 else ( 577 tool_name.name 578 if hasattr(tool_name, "name") 579 else tool_name.__class__.__name__ 580 ) 581 ) 582 583 span = parent.span( 584 id=event_id, 585 trace_id=trace_id, 586 start_time=start_event.time, 587 name=name, 588 version=self.version, 589 session_id=self.session_id, 590 input=extracted_input, 591 output=extracted_output, 592 metadata=metadata, 593 ) 594 595 if end_event: 596 span.end(end_time=end_event.time) 597 598 return span 599 600 def _update_trace_data(self, trace_map): 601 context_root_value = context_root.get() 602 if context_root_value and not self.update_stateful_client: 603 return 604 605 child_event_ids = trace_map.get(BASE_TRACE_EVENT, []) 606 if not child_event_ids: 607 return 608 609 event_pair = self.event_map.get(child_event_ids[0]) 610 if not event_pair or len(event_pair) < 2: 611 return 612 613 start_event, end_event = event_pair 614 input = self._parse_input_from_event(start_event) 615 output = self._parse_output_from_event(end_event) 616 617 if input or output: 618 if context_root_value and self.update_stateful_client: 619 context_root_value.update(input=input, output=output) 620 else: 621 self.trace.update(input=input, output=output) 622 623 def _parse_input_from_event(self, event: CallbackEvent): 624 if event.payload is None: 625 return 626 627 payload = event.payload.copy() 628 629 if EventPayload.SERIALIZED in payload: 630 # Always pop Serialized from payload as it may contain LLM api keys 631 payload.pop(EventPayload.SERIALIZED) 632 633 if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload: 634 chunks = payload.get(EventPayload.CHUNKS) 635 return {"num_chunks": len(chunks)} 636 637 if ( 638 event.event_type == CBEventType.NODE_PARSING 639 and EventPayload.DOCUMENTS in payload 640 ): 641 documents = payload.pop(EventPayload.DOCUMENTS) 642 payload["documents"] = [doc.metadata for doc in documents] 643 return payload 644 645 for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]: 646 if key in payload: 647 return payload.get(key) 648 649 return payload or None 650 651 def _parse_output_from_event(self, event: CallbackEvent): 652 if event.payload is None: 653 return 654 655 payload = event.payload.copy() 656 657 if EventPayload.SERIALIZED in payload: 658 # Always pop Serialized from payload as it may contain LLM api keys 659 payload.pop(EventPayload.SERIALIZED) 660 661 if ( 662 event.event_type == CBEventType.EMBEDDING 663 and EventPayload.EMBEDDINGS in payload 664 ): 665 embeddings = payload.get(EventPayload.EMBEDDINGS) 666 return {"num_embeddings": len(embeddings)} 667 668 if ( 669 event.event_type == CBEventType.NODE_PARSING 670 and EventPayload.NODES in payload 671 ): 672 nodes = payload.pop(EventPayload.NODES) 673 payload["num_nodes"] = len(nodes) 674 return payload 675 676 if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload: 677 chunks = payload.pop(EventPayload.CHUNKS) 678 payload["num_chunks"] = len(chunks) 679 680 if EventPayload.COMPLETION in payload: 681 return payload.get(EventPayload.COMPLETION) 682 683 if EventPayload.RESPONSE in payload: 684 response = payload.get(EventPayload.RESPONSE) 685 686 # Skip streaming responses as consuming them would block the user's execution path 687 if "Streaming" in type(response).__name__: 688 return None 689 690 if hasattr(response, "response"): 691 return response.response 692 693 if hasattr(response, "message"): 694 output = dict(response.message) 695 if "additional_kwargs" in output: 696 if "tool_calls" in output["additional_kwargs"]: 697 output["tool_calls"] = output["additional_kwargs"]["tool_calls"] 698 699 del output["additional_kwargs"] 700 701 return output 702 703 return payload or None 704 705 def _parse_metadata_from_event(self, event: CallbackEvent): 706 if event.payload is None: 707 return 708 709 metadata = {} 710 711 for key in event.payload.keys(): 712 if key not in [ 713 EventPayload.MESSAGES, 714 EventPayload.QUERY_STR, 715 EventPayload.PROMPT, 716 EventPayload.COMPLETION, 717 EventPayload.SERIALIZED, 718 "additional_kwargs", 719 ]: 720 if key != EventPayload.RESPONSE: 721 metadata[key] = event.payload[key] 722 else: 723 response = event.payload.get(EventPayload.RESPONSE) 724 725 if "Streaming" in type(response).__name__: 726 continue 727 728 for res_key, value in vars(response).items(): 729 if ( 730 not res_key.startswith("_") 731 and res_key 732 not in [ 733 "response", 734 "response_txt", 735 "message", 736 "additional_kwargs", 737 "delta", 738 "raw", 739 ] 740 and not isinstance(value, Generator) 741 ): 742 metadata[res_key] = value 743 744 return metadata or None
[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.
67 def __init__( 68 self, 69 *, 70 public_key: Optional[str] = None, 71 secret_key: Optional[str] = None, 72 host: Optional[str] = None, 73 debug: bool = False, 74 session_id: Optional[str] = None, 75 user_id: Optional[str] = None, 76 trace_name: Optional[str] = None, 77 release: Optional[str] = None, 78 version: Optional[str] = None, 79 tags: Optional[List[str]] = None, 80 metadata: Optional[Any] = None, 81 threads: Optional[int] = None, 82 flush_at: Optional[int] = None, 83 flush_interval: Optional[int] = None, 84 max_retries: Optional[int] = None, 85 timeout: Optional[int] = None, 86 event_starts_to_ignore: Optional[List[CBEventType]] = None, 87 event_ends_to_ignore: Optional[List[CBEventType]] = None, 88 tokenizer: Optional[Callable[[str], list]] = None, 89 enabled: Optional[bool] = None, 90 httpx_client: Optional[httpx.Client] = None, 91 sdk_integration: Optional[str] = None, 92 sample_rate: Optional[float] = None, 93 ) -> None: 94 LlamaIndexBaseCallbackHandler.__init__( 95 self, 96 event_starts_to_ignore=event_starts_to_ignore or [], 97 event_ends_to_ignore=event_ends_to_ignore or [], 98 ) 99 LangfuseBaseCallbackHandler.__init__( 100 self, 101 public_key=public_key, 102 secret_key=secret_key, 103 host=host, 104 debug=debug, 105 session_id=session_id, 106 user_id=user_id, 107 trace_name=trace_name, 108 release=release, 109 version=version, 110 tags=tags, 111 metadata=metadata, 112 threads=threads, 113 flush_at=flush_at, 114 flush_interval=flush_interval, 115 max_retries=max_retries, 116 timeout=timeout, 117 enabled=enabled, 118 httpx_client=httpx_client, 119 sdk_integration=sdk_integration or "llama-index_callback", 120 sample_rate=sample_rate, 121 ) 122 123 self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list) 124 self._llama_index_trace_name: Optional[str] = None 125 self._token_counter = TokenCounter(tokenizer) 126 127 # For stream-chat, the last LLM end_event arrives after the trace has ended 128 # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended 129 self._orphaned_LLM_generations: Dict[ 130 str, Tuple[StatefulGenerationClient, StatefulTraceClient] 131 ] = {}
Initialize the base callback handler.
133 def set_root( 134 self, 135 root: Optional[Union[StatefulTraceClient, StatefulSpanClient]], 136 *, 137 update_root: bool = False, 138 ) -> None: 139 """Set the root trace or span for the callback handler. 140 141 Args: 142 root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to 143 be used for all following operations. 144 145 Keyword Args: 146 update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run. 147 148 Returns: 149 None 150 """ 151 context_root.set(root) 152 153 if root is None: 154 self.trace = None 155 self.root_span = None 156 self._task_manager = self.langfuse.task_manager if self.langfuse else None 157 158 return 159 160 if isinstance(root, StatefulTraceClient): 161 self.trace = root 162 163 elif isinstance(root, StatefulSpanClient): 164 self.root_span = root 165 self.trace = StatefulTraceClient( 166 root.client, 167 root.trace_id, 168 StateType.TRACE, 169 root.trace_id, 170 root.task_manager, 171 root.environment, 172 ) 173 174 self._task_manager = root.task_manager 175 self.update_stateful_client = update_root
Set the root trace or span for the callback handler.
Arguments:
- root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:
update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
Returns:
None
177 def set_trace_params( 178 self, 179 name: Optional[str] = None, 180 user_id: Optional[str] = None, 181 session_id: Optional[str] = None, 182 version: Optional[str] = None, 183 release: Optional[str] = None, 184 metadata: Optional[Any] = None, 185 tags: Optional[List[str]] = None, 186 public: Optional[bool] = None, 187 ): 188 """Set the trace params that will be used for all following operations. 189 190 Allows setting params of subsequent traces at any point in the code. 191 Overwrites the default params set in the callback constructor. 192 193 Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method. 194 195 Attributes: 196 name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI. 197 user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics. 198 session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. 199 version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. 200 metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. 201 tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API. 202 public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. 203 204 205 Returns: 206 None 207 """ 208 context_trace_metadata.set( 209 { 210 "name": name, 211 "user_id": user_id, 212 "session_id": session_id, 213 "version": version, 214 "release": release, 215 "metadata": metadata, 216 "tags": tags, 217 "public": public, 218 } 219 )
Set the trace params that will be used for all following operations.
Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.
Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
Attributes:
- name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
- user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
- session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
- version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
- metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
- tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
- public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:
None
221 def start_trace(self, trace_id: Optional[str] = None) -> None: 222 """Run when an overall trace is launched.""" 223 self._llama_index_trace_name = trace_id
Run when an overall trace is launched.
225 def end_trace( 226 self, 227 trace_id: Optional[str] = None, 228 trace_map: Optional[Dict[str, List[str]]] = None, 229 ) -> None: 230 """Run when an overall trace is exited.""" 231 if not trace_map: 232 self.log.debug("No events in trace map to create the observation tree.") 233 return 234 235 # Generate Langfuse observations after trace has ended and full trace_map is available. 236 # For long-running traces this leads to events only being sent to Langfuse after the trace has ended. 237 # Timestamps remain accurate as they are set at the time of the event. 238 self._create_observations_from_trace_map( 239 event_id=BASE_TRACE_EVENT, trace_map=trace_map 240 ) 241 self._update_trace_data(trace_map=trace_map)
Run when an overall trace is exited.
243 def on_event_start( 244 self, 245 event_type: CBEventType, 246 payload: Optional[Dict[str, Any]] = None, 247 event_id: str = "", 248 parent_id: str = "", 249 **kwargs: Any, 250 ) -> str: 251 """Run when an event starts and return id of event.""" 252 start_event = CallbackEvent( 253 event_id=event_id, event_type=event_type, payload=payload 254 ) 255 self.event_map[event_id].append(start_event) 256 257 return event_id
Run when an event starts and return id of event.
259 def on_event_end( 260 self, 261 event_type: CBEventType, 262 payload: Optional[Dict[str, Any]] = None, 263 event_id: str = "", 264 **kwargs: Any, 265 ) -> None: 266 """Run when an event ends.""" 267 end_event = CallbackEvent( 268 event_id=event_id, event_type=event_type, payload=payload 269 ) 270 self.event_map[event_id].append(end_event) 271 272 if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations: 273 generation, trace = self._orphaned_LLM_generations[event_id] 274 self._handle_orphaned_LLM_end_event( 275 end_event, generation=generation, trace=trace 276 ) 277 del self._orphaned_LLM_generations[event_id]
Run when an event ends.
Inherited Members
- llama_index.core.callbacks.base_handler.BaseCallbackHandler
- event_starts_to_ignore
- event_ends_to_ignore
28class LlamaIndexInstrumentor: 29 """Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse. 30 31 This beta integration is currently under active development and subject to change. 32 Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931 33 34 For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler). 35 36 Usage: 37 instrumentor = LlamaIndexInstrumentor() 38 instrumentor.start() 39 40 # After calling start(), all LlamaIndex executions will be automatically traced 41 42 # To trace a specific execution or set custom trace ID/params, use the context manager: 43 with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): 44 # Your LlamaIndex code here 45 index = get_llama_index_index() 46 response = index.as_query_engine().query("Your query here") 47 48 instrumentor.flush() 49 50 The instrumentor will automatically capture and log events and spans from LlamaIndex 51 to Langfuse, providing detailed observability into your LLM application. 52 53 Args: 54 public_key (Optional[str]): Langfuse public key 55 secret_key (Optional[str]): Langfuse secret key 56 host (Optional[str]): Langfuse API host 57 debug (Optional[bool]): Enable debug logging 58 threads (Optional[int]): Number of threads for async operations 59 flush_at (Optional[int]): Number of items to flush at 60 flush_interval (Optional[int]): Flush interval in seconds 61 max_retries (Optional[int]): Maximum number of retries for failed requests 62 timeout (Optional[int]): Timeout for requests in seconds 63 httpx_client (Optional[httpx.Client]): Custom HTTPX client 64 enabled (Optional[bool]): Enable/disable the instrumentor 65 sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0) 66 mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data. 67 environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable. 68 """ 69 70 def __init__( 71 self, 72 *, 73 public_key: Optional[str] = None, 74 secret_key: Optional[str] = None, 75 host: Optional[str] = None, 76 debug: Optional[bool] = None, 77 threads: Optional[int] = None, 78 flush_at: Optional[int] = None, 79 flush_interval: Optional[int] = None, 80 max_retries: Optional[int] = None, 81 timeout: Optional[int] = None, 82 httpx_client: Optional[httpx.Client] = None, 83 enabled: Optional[bool] = None, 84 sample_rate: Optional[float] = None, 85 mask: Optional[MaskFunction] = None, 86 environment: Optional[str] = None, 87 ): 88 self._langfuse = LangfuseSingleton().get( 89 public_key=public_key, 90 secret_key=secret_key, 91 host=host, 92 debug=debug, 93 threads=threads, 94 flush_at=flush_at, 95 flush_interval=flush_interval, 96 max_retries=max_retries, 97 timeout=timeout, 98 httpx_client=httpx_client, 99 enabled=enabled, 100 sample_rate=sample_rate, 101 mask=mask, 102 sdk_integration="llama-index_instrumentation", 103 environment=environment, 104 ) 105 self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse) 106 self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse) 107 self._context = InstrumentorContext() 108 109 def start(self): 110 """Start the automatic tracing of LlamaIndex operations. 111 112 Once called, all subsequent LlamaIndex executions will be automatically traced 113 and logged to Langfuse without any additional code changes required. 114 115 Example: 116 ```python 117 instrumentor = LlamaIndexInstrumentor() 118 instrumentor.start() 119 120 # From this point, all LlamaIndex operations are automatically traced 121 index = VectorStoreIndex.from_documents(documents) 122 query_engine = index.as_query_engine() 123 response = query_engine.query("What is the capital of France?") 124 125 # The above operations will be automatically logged to Langfuse 126 instrumentor.flush() 127 ``` 128 """ 129 self._context.reset() 130 dispatcher = get_dispatcher() 131 132 # Span Handler 133 if not any( 134 isinstance(handler, type(self._span_handler)) 135 for handler in dispatcher.span_handlers 136 ): 137 dispatcher.add_span_handler(self._span_handler) 138 139 # Event Handler 140 if not any( 141 isinstance(handler, type(self._event_handler)) 142 for handler in dispatcher.event_handlers 143 ): 144 dispatcher.add_event_handler(self._event_handler) 145 146 def stop(self): 147 """Stop the automatic tracing of LlamaIndex operations. 148 149 This method removes the span and event handlers from the LlamaIndex dispatcher, 150 effectively stopping the automatic tracing and logging to Langfuse. 151 152 After calling this method, LlamaIndex operations will no longer be automatically 153 traced unless `start()` is called again. 154 155 Example: 156 ```python 157 instrumentor = LlamaIndexInstrumentor() 158 instrumentor.start() 159 160 # LlamaIndex operations are automatically traced here 161 162 instrumentor.stop() 163 164 # LlamaIndex operations are no longer automatically traced 165 ``` 166 """ 167 self._context.reset() 168 dispatcher = get_dispatcher() 169 170 # Span Handler, in-place filter 171 dispatcher.span_handlers[:] = filter( 172 lambda h: not isinstance(h, type(self._span_handler)), 173 dispatcher.span_handlers, 174 ) 175 176 # Event Handler, in-place filter 177 dispatcher.event_handlers[:] = filter( 178 lambda h: not isinstance(h, type(self._event_handler)), 179 dispatcher.event_handlers, 180 ) 181 182 @contextmanager 183 def observe( 184 self, 185 *, 186 trace_id: Optional[str] = None, 187 parent_observation_id: Optional[str] = None, 188 update_parent: Optional[bool] = None, 189 trace_name: Optional[str] = None, 190 user_id: Optional[str] = None, 191 session_id: Optional[str] = None, 192 version: Optional[str] = None, 193 release: Optional[str] = None, 194 metadata: Optional[Dict[str, Any]] = None, 195 tags: Optional[List[str]] = None, 196 public: Optional[bool] = None, 197 ): 198 """Access context manager for observing and tracing LlamaIndex operations. 199 200 This method allows you to wrap LlamaIndex operations in a context that 201 automatically traces and logs them to Langfuse. It provides fine-grained 202 control over the trace properties and ensures proper instrumentation. 203 204 Args: 205 trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated. 206 parent_observation_id (Optional[str]): ID of the parent observation, if any. 207 update_parent (Optional[bool]): Whether to update the parent trace. 208 trace_name (Optional[str]): Name of the trace. 209 user_id (Optional[str]): ID of the user associated with this trace. 210 session_id (Optional[str]): ID of the session associated with this trace. 211 version (Optional[str]): Version information for this trace. 212 release (Optional[str]): Release information for this trace. 213 metadata (Optional[Dict[str, Any]]): Additional metadata for the trace. 214 tags (Optional[List[str]]): Tags associated with this trace. 215 public (Optional[bool]): Whether this trace should be public. 216 217 Yields: 218 StatefulTraceClient: A client for interacting with the current trace. 219 220 Example: 221 ```python 222 instrumentor = LlamaIndexInstrumentor() 223 224 with instrumentor.observe(trace_id="unique_id", user_id="user123"): 225 # LlamaIndex operations here will be traced 226 index.as_query_engine().query("What is the capital of France?") 227 228 # Tracing stops after the context manager exits 229 230 instrumentor.flush() 231 ``` 232 233 Note: 234 If the instrumentor is not already started, this method will start it 235 for the duration of the context and stop it afterwards. 236 """ 237 was_instrumented = self._is_instrumented 238 239 if not was_instrumented: 240 self.start() 241 242 if parent_observation_id is not None and trace_id is None: 243 logger.warning( 244 "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id." 245 ) 246 parent_observation_id = None 247 248 final_trace_id = trace_id or str(uuid.uuid4()) 249 250 self._context.update( 251 is_user_managed_trace=True, 252 trace_id=final_trace_id, 253 parent_observation_id=parent_observation_id, 254 update_parent=update_parent, 255 trace_name=trace_name, 256 user_id=user_id, 257 session_id=session_id, 258 version=version, 259 release=release, 260 metadata=metadata, 261 tags=tags, 262 public=public, 263 ) 264 265 yield self._get_trace_client(final_trace_id) 266 267 self._context.reset() 268 269 if not was_instrumented: 270 self.stop() 271 272 @property 273 def _is_instrumented(self) -> bool: 274 """Check if the dispatcher is instrumented.""" 275 dispatcher = get_dispatcher() 276 277 return any( 278 isinstance(handler, type(self._span_handler)) 279 for handler in dispatcher.span_handlers 280 ) and any( 281 isinstance(handler, type(self._event_handler)) 282 for handler in dispatcher.event_handlers 283 ) 284 285 def _get_trace_client(self, trace_id: str) -> StatefulTraceClient: 286 return StatefulTraceClient( 287 client=self._langfuse.client, 288 id=trace_id, 289 trace_id=trace_id, 290 task_manager=self._langfuse.task_manager, 291 state_type=StateType.TRACE, 292 environment=self._langfuse.environment, 293 ) 294 295 @property 296 def client_instance(self) -> Langfuse: 297 """Return the Langfuse client instance associated with this instrumentor. 298 299 This property provides access to the underlying Langfuse client, allowing 300 direct interaction with Langfuse functionality if needed. 301 302 Returns: 303 Langfuse: The Langfuse client instance. 304 """ 305 return self._langfuse 306 307 def flush(self) -> None: 308 """Flush any pending tasks in the task manager. 309 310 This method ensures that all queued tasks are sent to Langfuse immediately. 311 It's useful for scenarios where you want to guarantee that all instrumentation 312 data has been transmitted before your application terminates or moves on to 313 a different phase. 314 315 Note: 316 This method is a wrapper around the `flush()` method of the underlying 317 Langfuse client instance. It's provided here for convenience and to maintain 318 a consistent interface within the instrumentor. 319 320 Example: 321 ```python 322 instrumentor = LlamaIndexInstrumentor(langfuse_client) 323 # ... perform some operations ... 324 instrumentor.flush() # Ensure all data is sent to Langfuse 325 ``` 326 """ 327 self.client_instance.flush()
Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.
This beta integration is currently under active development and subject to change. Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931
For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).
Usage:
instrumentor = LlamaIndexInstrumentor() instrumentor.start()
After calling start(), all LlamaIndex executions will be automatically traced
To trace a specific execution or set custom trace ID/params, use the context manager:
with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): # Your LlamaIndex code here index = get_llama_index_index() response = index.as_query_engine().query("Your query here")
instrumentor.flush()
The instrumentor will automatically capture and log events and spans from LlamaIndex to Langfuse, providing detailed observability into your LLM application.
Arguments:
- public_key (Optional[str]): Langfuse public key
- secret_key (Optional[str]): Langfuse secret key
- host (Optional[str]): Langfuse API host
- debug (Optional[bool]): Enable debug logging
- threads (Optional[int]): Number of threads for async operations
- flush_at (Optional[int]): Number of items to flush at
- flush_interval (Optional[int]): Flush interval in seconds
- max_retries (Optional[int]): Maximum number of retries for failed requests
- timeout (Optional[int]): Timeout for requests in seconds
- httpx_client (Optional[httpx.Client]): Custom HTTPX client
- enabled (Optional[bool]): Enable/disable the instrumentor
- sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
- mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument
data
and return a serializable, masked version of the data. - environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via
LANGFUSE_TRACING_ENVIRONMENT
environment variable.
70 def __init__( 71 self, 72 *, 73 public_key: Optional[str] = None, 74 secret_key: Optional[str] = None, 75 host: Optional[str] = None, 76 debug: Optional[bool] = None, 77 threads: Optional[int] = None, 78 flush_at: Optional[int] = None, 79 flush_interval: Optional[int] = None, 80 max_retries: Optional[int] = None, 81 timeout: Optional[int] = None, 82 httpx_client: Optional[httpx.Client] = None, 83 enabled: Optional[bool] = None, 84 sample_rate: Optional[float] = None, 85 mask: Optional[MaskFunction] = None, 86 environment: Optional[str] = None, 87 ): 88 self._langfuse = LangfuseSingleton().get( 89 public_key=public_key, 90 secret_key=secret_key, 91 host=host, 92 debug=debug, 93 threads=threads, 94 flush_at=flush_at, 95 flush_interval=flush_interval, 96 max_retries=max_retries, 97 timeout=timeout, 98 httpx_client=httpx_client, 99 enabled=enabled, 100 sample_rate=sample_rate, 101 mask=mask, 102 sdk_integration="llama-index_instrumentation", 103 environment=environment, 104 ) 105 self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse) 106 self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse) 107 self._context = InstrumentorContext()
109 def start(self): 110 """Start the automatic tracing of LlamaIndex operations. 111 112 Once called, all subsequent LlamaIndex executions will be automatically traced 113 and logged to Langfuse without any additional code changes required. 114 115 Example: 116 ```python 117 instrumentor = LlamaIndexInstrumentor() 118 instrumentor.start() 119 120 # From this point, all LlamaIndex operations are automatically traced 121 index = VectorStoreIndex.from_documents(documents) 122 query_engine = index.as_query_engine() 123 response = query_engine.query("What is the capital of France?") 124 125 # The above operations will be automatically logged to Langfuse 126 instrumentor.flush() 127 ``` 128 """ 129 self._context.reset() 130 dispatcher = get_dispatcher() 131 132 # Span Handler 133 if not any( 134 isinstance(handler, type(self._span_handler)) 135 for handler in dispatcher.span_handlers 136 ): 137 dispatcher.add_span_handler(self._span_handler) 138 139 # Event Handler 140 if not any( 141 isinstance(handler, type(self._event_handler)) 142 for handler in dispatcher.event_handlers 143 ): 144 dispatcher.add_event_handler(self._event_handler)
Start the automatic tracing of LlamaIndex operations.
Once called, all subsequent LlamaIndex executions will be automatically traced and logged to Langfuse without any additional code changes required.
Example:
instrumentor = LlamaIndexInstrumentor() instrumentor.start() # From this point, all LlamaIndex operations are automatically traced index = VectorStoreIndex.from_documents(documents) query_engine = index.as_query_engine() response = query_engine.query("What is the capital of France?") # The above operations will be automatically logged to Langfuse instrumentor.flush()
146 def stop(self): 147 """Stop the automatic tracing of LlamaIndex operations. 148 149 This method removes the span and event handlers from the LlamaIndex dispatcher, 150 effectively stopping the automatic tracing and logging to Langfuse. 151 152 After calling this method, LlamaIndex operations will no longer be automatically 153 traced unless `start()` is called again. 154 155 Example: 156 ```python 157 instrumentor = LlamaIndexInstrumentor() 158 instrumentor.start() 159 160 # LlamaIndex operations are automatically traced here 161 162 instrumentor.stop() 163 164 # LlamaIndex operations are no longer automatically traced 165 ``` 166 """ 167 self._context.reset() 168 dispatcher = get_dispatcher() 169 170 # Span Handler, in-place filter 171 dispatcher.span_handlers[:] = filter( 172 lambda h: not isinstance(h, type(self._span_handler)), 173 dispatcher.span_handlers, 174 ) 175 176 # Event Handler, in-place filter 177 dispatcher.event_handlers[:] = filter( 178 lambda h: not isinstance(h, type(self._event_handler)), 179 dispatcher.event_handlers, 180 )
Stop the automatic tracing of LlamaIndex operations.
This method removes the span and event handlers from the LlamaIndex dispatcher, effectively stopping the automatic tracing and logging to Langfuse.
After calling this method, LlamaIndex operations will no longer be automatically
traced unless start()
is called again.
Example:
instrumentor = LlamaIndexInstrumentor() instrumentor.start() # LlamaIndex operations are automatically traced here instrumentor.stop() # LlamaIndex operations are no longer automatically traced
182 @contextmanager 183 def observe( 184 self, 185 *, 186 trace_id: Optional[str] = None, 187 parent_observation_id: Optional[str] = None, 188 update_parent: Optional[bool] = None, 189 trace_name: Optional[str] = None, 190 user_id: Optional[str] = None, 191 session_id: Optional[str] = None, 192 version: Optional[str] = None, 193 release: Optional[str] = None, 194 metadata: Optional[Dict[str, Any]] = None, 195 tags: Optional[List[str]] = None, 196 public: Optional[bool] = None, 197 ): 198 """Access context manager for observing and tracing LlamaIndex operations. 199 200 This method allows you to wrap LlamaIndex operations in a context that 201 automatically traces and logs them to Langfuse. It provides fine-grained 202 control over the trace properties and ensures proper instrumentation. 203 204 Args: 205 trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated. 206 parent_observation_id (Optional[str]): ID of the parent observation, if any. 207 update_parent (Optional[bool]): Whether to update the parent trace. 208 trace_name (Optional[str]): Name of the trace. 209 user_id (Optional[str]): ID of the user associated with this trace. 210 session_id (Optional[str]): ID of the session associated with this trace. 211 version (Optional[str]): Version information for this trace. 212 release (Optional[str]): Release information for this trace. 213 metadata (Optional[Dict[str, Any]]): Additional metadata for the trace. 214 tags (Optional[List[str]]): Tags associated with this trace. 215 public (Optional[bool]): Whether this trace should be public. 216 217 Yields: 218 StatefulTraceClient: A client for interacting with the current trace. 219 220 Example: 221 ```python 222 instrumentor = LlamaIndexInstrumentor() 223 224 with instrumentor.observe(trace_id="unique_id", user_id="user123"): 225 # LlamaIndex operations here will be traced 226 index.as_query_engine().query("What is the capital of France?") 227 228 # Tracing stops after the context manager exits 229 230 instrumentor.flush() 231 ``` 232 233 Note: 234 If the instrumentor is not already started, this method will start it 235 for the duration of the context and stop it afterwards. 236 """ 237 was_instrumented = self._is_instrumented 238 239 if not was_instrumented: 240 self.start() 241 242 if parent_observation_id is not None and trace_id is None: 243 logger.warning( 244 "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id." 245 ) 246 parent_observation_id = None 247 248 final_trace_id = trace_id or str(uuid.uuid4()) 249 250 self._context.update( 251 is_user_managed_trace=True, 252 trace_id=final_trace_id, 253 parent_observation_id=parent_observation_id, 254 update_parent=update_parent, 255 trace_name=trace_name, 256 user_id=user_id, 257 session_id=session_id, 258 version=version, 259 release=release, 260 metadata=metadata, 261 tags=tags, 262 public=public, 263 ) 264 265 yield self._get_trace_client(final_trace_id) 266 267 self._context.reset() 268 269 if not was_instrumented: 270 self.stop()
Access context manager for observing and tracing LlamaIndex operations.
This method allows you to wrap LlamaIndex operations in a context that automatically traces and logs them to Langfuse. It provides fine-grained control over the trace properties and ensures proper instrumentation.
Arguments:
- trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
- parent_observation_id (Optional[str]): ID of the parent observation, if any.
- update_parent (Optional[bool]): Whether to update the parent trace.
- trace_name (Optional[str]): Name of the trace.
- user_id (Optional[str]): ID of the user associated with this trace.
- session_id (Optional[str]): ID of the session associated with this trace.
- version (Optional[str]): Version information for this trace.
- release (Optional[str]): Release information for this trace.
- metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
- tags (Optional[List[str]]): Tags associated with this trace.
- public (Optional[bool]): Whether this trace should be public.
Yields:
StatefulTraceClient: A client for interacting with the current trace.
Example:
instrumentor = LlamaIndexInstrumentor() with instrumentor.observe(trace_id="unique_id", user_id="user123"): # LlamaIndex operations here will be traced index.as_query_engine().query("What is the capital of France?") # Tracing stops after the context manager exits instrumentor.flush()
Note:
If the instrumentor is not already started, this method will start it for the duration of the context and stop it afterwards.
295 @property 296 def client_instance(self) -> Langfuse: 297 """Return the Langfuse client instance associated with this instrumentor. 298 299 This property provides access to the underlying Langfuse client, allowing 300 direct interaction with Langfuse functionality if needed. 301 302 Returns: 303 Langfuse: The Langfuse client instance. 304 """ 305 return self._langfuse
Return the Langfuse client instance associated with this instrumentor.
This property provides access to the underlying Langfuse client, allowing direct interaction with Langfuse functionality if needed.
Returns:
Langfuse: The Langfuse client instance.
307 def flush(self) -> None: 308 """Flush any pending tasks in the task manager. 309 310 This method ensures that all queued tasks are sent to Langfuse immediately. 311 It's useful for scenarios where you want to guarantee that all instrumentation 312 data has been transmitted before your application terminates or moves on to 313 a different phase. 314 315 Note: 316 This method is a wrapper around the `flush()` method of the underlying 317 Langfuse client instance. It's provided here for convenience and to maintain 318 a consistent interface within the instrumentor. 319 320 Example: 321 ```python 322 instrumentor = LlamaIndexInstrumentor(langfuse_client) 323 # ... perform some operations ... 324 instrumentor.flush() # Ensure all data is sent to Langfuse 325 ``` 326 """ 327 self.client_instance.flush()
Flush any pending tasks in the task manager.
This method ensures that all queued tasks are sent to Langfuse immediately. It's useful for scenarios where you want to guarantee that all instrumentation data has been transmitted before your application terminates or moves on to a different phase.
Note:
This method is a wrapper around the
flush()
method of the underlying Langfuse client instance. It's provided here for convenience and to maintain a consistent interface within the instrumentor.
Example:
instrumentor = LlamaIndexInstrumentor(langfuse_client) # ... perform some operations ... instrumentor.flush() # Ensure all data is sent to Langfuse