langfuse.llama_index

1from .llama_index import LlamaIndexCallbackHandler
2from ._instrumentor import LlamaIndexInstrumentor
3
4__all__ = [
5    "LlamaIndexCallbackHandler",
6    "LlamaIndexInstrumentor",
7]
@auto_decorate_methods_with(catch_and_log_errors, exclude=['__init__'])
class LlamaIndexCallbackHandler(llama_index.core.callbacks.base_handler.BaseCallbackHandler, langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler):
 59@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"])
 60class LlamaIndexCallbackHandler(
 61    LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler
 62):
 63    """[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead."""
 64
 65    log = logging.getLogger("langfuse")
 66
 67    def __init__(
 68        self,
 69        *,
 70        public_key: Optional[str] = None,
 71        secret_key: Optional[str] = None,
 72        host: Optional[str] = None,
 73        debug: bool = False,
 74        session_id: Optional[str] = None,
 75        user_id: Optional[str] = None,
 76        trace_name: Optional[str] = None,
 77        release: Optional[str] = None,
 78        version: Optional[str] = None,
 79        tags: Optional[List[str]] = None,
 80        metadata: Optional[Any] = None,
 81        threads: Optional[int] = None,
 82        flush_at: Optional[int] = None,
 83        flush_interval: Optional[int] = None,
 84        max_retries: Optional[int] = None,
 85        timeout: Optional[int] = None,
 86        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 87        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 88        tokenizer: Optional[Callable[[str], list]] = None,
 89        enabled: Optional[bool] = None,
 90        httpx_client: Optional[httpx.Client] = None,
 91        sdk_integration: Optional[str] = None,
 92        sample_rate: Optional[float] = None,
 93    ) -> None:
 94        LlamaIndexBaseCallbackHandler.__init__(
 95            self,
 96            event_starts_to_ignore=event_starts_to_ignore or [],
 97            event_ends_to_ignore=event_ends_to_ignore or [],
 98        )
 99        LangfuseBaseCallbackHandler.__init__(
100            self,
101            public_key=public_key,
102            secret_key=secret_key,
103            host=host,
104            debug=debug,
105            session_id=session_id,
106            user_id=user_id,
107            trace_name=trace_name,
108            release=release,
109            version=version,
110            tags=tags,
111            metadata=metadata,
112            threads=threads,
113            flush_at=flush_at,
114            flush_interval=flush_interval,
115            max_retries=max_retries,
116            timeout=timeout,
117            enabled=enabled,
118            httpx_client=httpx_client,
119            sdk_integration=sdk_integration or "llama-index_callback",
120            sample_rate=sample_rate,
121        )
122
123        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
124        self._llama_index_trace_name: Optional[str] = None
125        self._token_counter = TokenCounter(tokenizer)
126
127        # For stream-chat, the last LLM end_event arrives after the trace has ended
128        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
129        self._orphaned_LLM_generations: Dict[
130            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
131        ] = {}
132
133    def set_root(
134        self,
135        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
136        *,
137        update_root: bool = False,
138    ) -> None:
139        """Set the root trace or span for the callback handler.
140
141        Args:
142            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
143                be used for all following operations.
144
145        Keyword Args:
146            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
147
148        Returns:
149            None
150        """
151        context_root.set(root)
152
153        if root is None:
154            self.trace = None
155            self.root_span = None
156            self._task_manager = self.langfuse.task_manager if self.langfuse else None
157
158            return
159
160        if isinstance(root, StatefulTraceClient):
161            self.trace = root
162
163        elif isinstance(root, StatefulSpanClient):
164            self.root_span = root
165            self.trace = StatefulTraceClient(
166                root.client,
167                root.trace_id,
168                StateType.TRACE,
169                root.trace_id,
170                root.task_manager,
171                root.environment,
172            )
173
174        self._task_manager = root.task_manager
175        self.update_stateful_client = update_root
176
177    def set_trace_params(
178        self,
179        name: Optional[str] = None,
180        user_id: Optional[str] = None,
181        session_id: Optional[str] = None,
182        version: Optional[str] = None,
183        release: Optional[str] = None,
184        metadata: Optional[Any] = None,
185        tags: Optional[List[str]] = None,
186        public: Optional[bool] = None,
187    ):
188        """Set the trace params that will be used for all following operations.
189
190        Allows setting params of subsequent traces at any point in the code.
191        Overwrites the default params set in the callback constructor.
192
193        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
194
195        Attributes:
196            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
197            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
198            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
199            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
200            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
201            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
202            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
203
204
205        Returns:
206            None
207        """
208        context_trace_metadata.set(
209            {
210                "name": name,
211                "user_id": user_id,
212                "session_id": session_id,
213                "version": version,
214                "release": release,
215                "metadata": metadata,
216                "tags": tags,
217                "public": public,
218            }
219        )
220
221    def start_trace(self, trace_id: Optional[str] = None) -> None:
222        """Run when an overall trace is launched."""
223        self._llama_index_trace_name = trace_id
224
225    def end_trace(
226        self,
227        trace_id: Optional[str] = None,
228        trace_map: Optional[Dict[str, List[str]]] = None,
229    ) -> None:
230        """Run when an overall trace is exited."""
231        if not trace_map:
232            self.log.debug("No events in trace map to create the observation tree.")
233            return
234
235        # Generate Langfuse observations after trace has ended and full trace_map is available.
236        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
237        # Timestamps remain accurate as they are set at the time of the event.
238        self._create_observations_from_trace_map(
239            event_id=BASE_TRACE_EVENT, trace_map=trace_map
240        )
241        self._update_trace_data(trace_map=trace_map)
242
243    def on_event_start(
244        self,
245        event_type: CBEventType,
246        payload: Optional[Dict[str, Any]] = None,
247        event_id: str = "",
248        parent_id: str = "",
249        **kwargs: Any,
250    ) -> str:
251        """Run when an event starts and return id of event."""
252        start_event = CallbackEvent(
253            event_id=event_id, event_type=event_type, payload=payload
254        )
255        self.event_map[event_id].append(start_event)
256
257        return event_id
258
259    def on_event_end(
260        self,
261        event_type: CBEventType,
262        payload: Optional[Dict[str, Any]] = None,
263        event_id: str = "",
264        **kwargs: Any,
265    ) -> None:
266        """Run when an event ends."""
267        end_event = CallbackEvent(
268            event_id=event_id, event_type=event_type, payload=payload
269        )
270        self.event_map[event_id].append(end_event)
271
272        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
273            generation, trace = self._orphaned_LLM_generations[event_id]
274            self._handle_orphaned_LLM_end_event(
275                end_event, generation=generation, trace=trace
276            )
277            del self._orphaned_LLM_generations[event_id]
278
279    def _create_observations_from_trace_map(
280        self,
281        event_id: str,
282        trace_map: Dict[str, List[str]],
283        parent: Optional[
284            Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient]
285        ] = None,
286    ) -> None:
287        """Recursively create langfuse observations based on the trace_map."""
288        if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id):
289            return
290
291        if event_id == BASE_TRACE_EVENT:
292            observation = self._get_root_observation()
293        else:
294            observation = self._create_observation(
295                event_id=event_id, parent=parent, trace_id=self.trace.id
296            )
297
298        for child_event_id in trace_map.get(event_id, []):
299            self._create_observations_from_trace_map(
300                event_id=child_event_id, parent=observation, trace_map=trace_map
301            )
302
303    def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]:
304        user_provided_root = context_root.get()
305
306        # Get trace metadata from contextvars or use default values
307        trace_metadata = context_trace_metadata.get()
308        name = (
309            trace_metadata["name"]
310            or self.trace_name
311            or f"LlamaIndex_{self._llama_index_trace_name}"
312        )
313        version = trace_metadata["version"] or self.version
314        release = trace_metadata["release"] or self.release
315        session_id = trace_metadata["session_id"] or self.session_id
316        user_id = trace_metadata["user_id"] or self.user_id
317        metadata = trace_metadata["metadata"] or self.metadata
318        tags = trace_metadata["tags"] or self.tags
319        public = trace_metadata["public"] or None
320
321        # Make sure that if a user-provided root is set, it has been set in the same trace
322        # and it's not a root from a different trace
323        if (
324            user_provided_root is not None
325            and self.trace
326            and self.trace.id == user_provided_root.trace_id
327        ):
328            if self.update_stateful_client:
329                user_provided_root.update(
330                    name=name,
331                    version=version,
332                    session_id=session_id,
333                    user_id=user_id,
334                    metadata=metadata,
335                    tags=tags,
336                    release=release,
337                    public=public,
338                )
339
340            return user_provided_root
341
342        else:
343            self.trace = self.langfuse.trace(
344                id=str(uuid4()),
345                name=name,
346                version=version,
347                session_id=session_id,
348                user_id=user_id,
349                metadata=metadata,
350                tags=tags,
351                release=release,
352                public=public,
353            )
354
355            return self.trace
356
357    def _create_observation(
358        self,
359        event_id: str,
360        parent: Union[
361            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
362        ],
363        trace_id: str,
364    ) -> Union[StatefulSpanClient, StatefulGenerationClient]:
365        event_type = self.event_map[event_id][0].event_type
366
367        if event_type == CBEventType.LLM:
368            return self._handle_LLM_events(event_id, parent, trace_id)
369        elif event_type == CBEventType.EMBEDDING:
370            return self._handle_embedding_events(event_id, parent, trace_id)
371        else:
372            return self._handle_span_events(event_id, parent, trace_id)
373
374    def _handle_LLM_events(
375        self,
376        event_id: str,
377        parent: Union[
378            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
379        ],
380        trace_id: str,
381    ) -> StatefulGenerationClient:
382        events = self.event_map[event_id]
383        start_event, end_event = events[0], events[-1]
384
385        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
386            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
387            name = serialized.get("class_name", "LLM")
388            temperature = serialized.get("temperature", None)
389            max_tokens = serialized.get("max_tokens", None)
390            timeout = serialized.get("timeout", None)
391
392        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
393        parsed_metadata = self._parse_metadata_from_event(end_event)
394
395        generation = parent.generation(
396            id=event_id,
397            trace_id=trace_id,
398            version=self.version,
399            name=name,
400            start_time=start_event.time,
401            metadata=parsed_metadata,
402            model_parameters={
403                "temperature": temperature,
404                "max_tokens": max_tokens,
405                "request_timeout": timeout,
406            },
407            **parsed_end_payload,
408        )
409
410        # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id
411        if len(events) == 1:
412            self._orphaned_LLM_generations[event_id] = (generation, self.trace)
413
414        return generation
415
416    def _handle_orphaned_LLM_end_event(
417        self,
418        end_event: CallbackEvent,
419        generation: StatefulGenerationClient,
420        trace: StatefulTraceClient,
421    ) -> None:
422        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
423
424        generation.update(
425            **parsed_end_payload,
426        )
427
428        if generation.trace_id != trace.id:
429            raise ValueError(
430                f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}"
431            )
432
433        trace.update(output=parsed_end_payload["output"])
434
435    def _parse_LLM_end_event_payload(
436        self, end_event: CallbackEvent
437    ) -> ParsedLLMEndPayload:
438        result: ParsedLLMEndPayload = {
439            "input": None,
440            "output": None,
441            "usage": None,
442            "model": None,
443            "end_time": end_event.time,
444        }
445
446        if not end_event.payload:
447            return result
448
449        result["input"] = self._parse_input_from_event(end_event)
450        result["output"] = self._parse_output_from_event(end_event)
451        result["model"], result["usage"] = self._parse_usage_from_event_payload(
452            end_event.payload
453        )
454
455        return result
456
457    def _parse_usage_from_event_payload(self, event_payload: Dict):
458        model = usage = None
459
460        if not (
461            EventPayload.MESSAGES in event_payload
462            and EventPayload.RESPONSE in event_payload
463        ):
464            return model, usage
465
466        response = event_payload.get(EventPayload.RESPONSE)
467
468        if response and hasattr(response, "raw") and response.raw is not None:
469            if isinstance(response.raw, dict):
470                raw_dict = response.raw
471            elif isinstance(response.raw, BaseModel):
472                raw_dict = response.raw.model_dump()
473            else:
474                raw_dict = {}
475
476            model = raw_dict.get("model", None)
477            raw_token_usage = raw_dict.get("usage", {})
478
479            if isinstance(raw_token_usage, dict):
480                token_usage = raw_token_usage
481            elif isinstance(raw_token_usage, BaseModel):
482                token_usage = raw_token_usage.model_dump()
483            else:
484                token_usage = {}
485
486            if token_usage:
487                usage = {
488                    "input": token_usage.get("prompt_tokens", None),
489                    "output": token_usage.get("completion_tokens", None),
490                    "total": token_usage.get("total_tokens", None),
491                }
492
493        return model, usage
494
495    def _handle_embedding_events(
496        self,
497        event_id: str,
498        parent: Union[
499            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
500        ],
501        trace_id: str,
502    ) -> StatefulGenerationClient:
503        events = self.event_map[event_id]
504        start_event, end_event = events[0], events[-1]
505
506        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
507            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
508            name = serialized.get("class_name", "Embedding")
509            model = serialized.get("model_name", None)
510            timeout = serialized.get("timeout", None)
511
512        if end_event.payload:
513            chunks = end_event.payload.get(EventPayload.CHUNKS, [])
514            token_count = sum(
515                self._token_counter.get_string_tokens(chunk) for chunk in chunks
516            )
517
518            usage = {
519                "input": 0,
520                "output": 0,
521                "total": token_count or None,
522            }
523
524        input = self._parse_input_from_event(end_event)
525        output = self._parse_output_from_event(end_event)
526
527        generation = parent.generation(
528            id=event_id,
529            trace_id=trace_id,
530            name=name,
531            start_time=start_event.time,
532            end_time=end_event.time,
533            version=self.version,
534            model=model,
535            input=input,
536            output=output,
537            usage=usage or None,
538            model_parameters={
539                "request_timeout": timeout,
540            },
541        )
542
543        return generation
544
545    def _handle_span_events(
546        self,
547        event_id: str,
548        parent: Union[
549            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
550        ],
551        trace_id: str,
552    ) -> StatefulSpanClient:
553        events = self.event_map[event_id]
554        start_event, end_event = events[0], events[-1]
555
556        extracted_input = self._parse_input_from_event(start_event)
557        extracted_output = self._parse_output_from_event(end_event)
558        extracted_metadata = self._parse_metadata_from_event(end_event)
559
560        metadata = (
561            extracted_metadata if extracted_output != extracted_metadata else None
562        )
563
564        name = start_event.event_type.value
565
566        # Update name to the actual tool's name used by openai agent if available
567        if (
568            name == "function_call"
569            and start_event.payload
570            and start_event.payload.get("tool", None)
571        ):
572            tool_name = start_event.payload.get("tool", name)
573            name = (
574                tool_name
575                if isinstance(tool_name, str)
576                else (
577                    tool_name.name
578                    if hasattr(tool_name, "name")
579                    else tool_name.__class__.__name__
580                )
581            )
582
583        span = parent.span(
584            id=event_id,
585            trace_id=trace_id,
586            start_time=start_event.time,
587            name=name,
588            version=self.version,
589            session_id=self.session_id,
590            input=extracted_input,
591            output=extracted_output,
592            metadata=metadata,
593        )
594
595        if end_event:
596            span.end(end_time=end_event.time)
597
598        return span
599
600    def _update_trace_data(self, trace_map):
601        context_root_value = context_root.get()
602        if context_root_value and not self.update_stateful_client:
603            return
604
605        child_event_ids = trace_map.get(BASE_TRACE_EVENT, [])
606        if not child_event_ids:
607            return
608
609        event_pair = self.event_map.get(child_event_ids[0])
610        if not event_pair or len(event_pair) < 2:
611            return
612
613        start_event, end_event = event_pair
614        input = self._parse_input_from_event(start_event)
615        output = self._parse_output_from_event(end_event)
616
617        if input or output:
618            if context_root_value and self.update_stateful_client:
619                context_root_value.update(input=input, output=output)
620            else:
621                self.trace.update(input=input, output=output)
622
623    def _parse_input_from_event(self, event: CallbackEvent):
624        if event.payload is None:
625            return
626
627        payload = event.payload.copy()
628
629        if EventPayload.SERIALIZED in payload:
630            # Always pop Serialized from payload as it may contain LLM api keys
631            payload.pop(EventPayload.SERIALIZED)
632
633        if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload:
634            chunks = payload.get(EventPayload.CHUNKS)
635            return {"num_chunks": len(chunks)}
636
637        if (
638            event.event_type == CBEventType.NODE_PARSING
639            and EventPayload.DOCUMENTS in payload
640        ):
641            documents = payload.pop(EventPayload.DOCUMENTS)
642            payload["documents"] = [doc.metadata for doc in documents]
643            return payload
644
645        for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]:
646            if key in payload:
647                return payload.get(key)
648
649        return payload or None
650
651    def _parse_output_from_event(self, event: CallbackEvent):
652        if event.payload is None:
653            return
654
655        payload = event.payload.copy()
656
657        if EventPayload.SERIALIZED in payload:
658            # Always pop Serialized from payload as it may contain LLM api keys
659            payload.pop(EventPayload.SERIALIZED)
660
661        if (
662            event.event_type == CBEventType.EMBEDDING
663            and EventPayload.EMBEDDINGS in payload
664        ):
665            embeddings = payload.get(EventPayload.EMBEDDINGS)
666            return {"num_embeddings": len(embeddings)}
667
668        if (
669            event.event_type == CBEventType.NODE_PARSING
670            and EventPayload.NODES in payload
671        ):
672            nodes = payload.pop(EventPayload.NODES)
673            payload["num_nodes"] = len(nodes)
674            return payload
675
676        if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload:
677            chunks = payload.pop(EventPayload.CHUNKS)
678            payload["num_chunks"] = len(chunks)
679
680        if EventPayload.COMPLETION in payload:
681            return payload.get(EventPayload.COMPLETION)
682
683        if EventPayload.RESPONSE in payload:
684            response = payload.get(EventPayload.RESPONSE)
685
686            # Skip streaming responses as consuming them would block the user's execution path
687            if "Streaming" in type(response).__name__:
688                return None
689
690            if hasattr(response, "response"):
691                return response.response
692
693            if hasattr(response, "message"):
694                output = dict(response.message)
695                if "additional_kwargs" in output:
696                    if "tool_calls" in output["additional_kwargs"]:
697                        output["tool_calls"] = output["additional_kwargs"]["tool_calls"]
698
699                    del output["additional_kwargs"]
700
701                return output
702
703        return payload or None
704
705    def _parse_metadata_from_event(self, event: CallbackEvent):
706        if event.payload is None:
707            return
708
709        metadata = {}
710
711        for key in event.payload.keys():
712            if key not in [
713                EventPayload.MESSAGES,
714                EventPayload.QUERY_STR,
715                EventPayload.PROMPT,
716                EventPayload.COMPLETION,
717                EventPayload.SERIALIZED,
718                "additional_kwargs",
719            ]:
720                if key != EventPayload.RESPONSE:
721                    metadata[key] = event.payload[key]
722                else:
723                    response = event.payload.get(EventPayload.RESPONSE)
724
725                    if "Streaming" in type(response).__name__:
726                        continue
727
728                    for res_key, value in vars(response).items():
729                        if (
730                            not res_key.startswith("_")
731                            and res_key
732                            not in [
733                                "response",
734                                "response_txt",
735                                "message",
736                                "additional_kwargs",
737                                "delta",
738                                "raw",
739                            ]
740                            and not isinstance(value, Generator)
741                        ):
742                            metadata[res_key] = value
743
744        return metadata or None

[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.

LlamaIndexCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, version: Optional[str] = None, tags: Optional[List[str]] = None, metadata: Optional[Any] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, event_starts_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, event_ends_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, tokenizer: Optional[Callable[[str], list]] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, sample_rate: Optional[float] = None)
 67    def __init__(
 68        self,
 69        *,
 70        public_key: Optional[str] = None,
 71        secret_key: Optional[str] = None,
 72        host: Optional[str] = None,
 73        debug: bool = False,
 74        session_id: Optional[str] = None,
 75        user_id: Optional[str] = None,
 76        trace_name: Optional[str] = None,
 77        release: Optional[str] = None,
 78        version: Optional[str] = None,
 79        tags: Optional[List[str]] = None,
 80        metadata: Optional[Any] = None,
 81        threads: Optional[int] = None,
 82        flush_at: Optional[int] = None,
 83        flush_interval: Optional[int] = None,
 84        max_retries: Optional[int] = None,
 85        timeout: Optional[int] = None,
 86        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 87        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 88        tokenizer: Optional[Callable[[str], list]] = None,
 89        enabled: Optional[bool] = None,
 90        httpx_client: Optional[httpx.Client] = None,
 91        sdk_integration: Optional[str] = None,
 92        sample_rate: Optional[float] = None,
 93    ) -> None:
 94        LlamaIndexBaseCallbackHandler.__init__(
 95            self,
 96            event_starts_to_ignore=event_starts_to_ignore or [],
 97            event_ends_to_ignore=event_ends_to_ignore or [],
 98        )
 99        LangfuseBaseCallbackHandler.__init__(
100            self,
101            public_key=public_key,
102            secret_key=secret_key,
103            host=host,
104            debug=debug,
105            session_id=session_id,
106            user_id=user_id,
107            trace_name=trace_name,
108            release=release,
109            version=version,
110            tags=tags,
111            metadata=metadata,
112            threads=threads,
113            flush_at=flush_at,
114            flush_interval=flush_interval,
115            max_retries=max_retries,
116            timeout=timeout,
117            enabled=enabled,
118            httpx_client=httpx_client,
119            sdk_integration=sdk_integration or "llama-index_callback",
120            sample_rate=sample_rate,
121        )
122
123        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
124        self._llama_index_trace_name: Optional[str] = None
125        self._token_counter = TokenCounter(tokenizer)
126
127        # For stream-chat, the last LLM end_event arrives after the trace has ended
128        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
129        self._orphaned_LLM_generations: Dict[
130            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
131        ] = {}

Initialize the base callback handler.

log = <Logger langfuse (WARNING)>
event_map: Dict[str, List[langfuse.llama_index.utils.CallbackEvent]]
def set_root( self, root: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType], *, update_root: bool = False) -> None:
133    def set_root(
134        self,
135        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
136        *,
137        update_root: bool = False,
138    ) -> None:
139        """Set the root trace or span for the callback handler.
140
141        Args:
142            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
143                be used for all following operations.
144
145        Keyword Args:
146            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
147
148        Returns:
149            None
150        """
151        context_root.set(root)
152
153        if root is None:
154            self.trace = None
155            self.root_span = None
156            self._task_manager = self.langfuse.task_manager if self.langfuse else None
157
158            return
159
160        if isinstance(root, StatefulTraceClient):
161            self.trace = root
162
163        elif isinstance(root, StatefulSpanClient):
164            self.root_span = root
165            self.trace = StatefulTraceClient(
166                root.client,
167                root.trace_id,
168                StateType.TRACE,
169                root.trace_id,
170                root.task_manager,
171                root.environment,
172            )
173
174        self._task_manager = root.task_manager
175        self.update_stateful_client = update_root

Set the root trace or span for the callback handler.

Arguments:
  • root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:

update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.

Returns:

None

def set_trace_params( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
177    def set_trace_params(
178        self,
179        name: Optional[str] = None,
180        user_id: Optional[str] = None,
181        session_id: Optional[str] = None,
182        version: Optional[str] = None,
183        release: Optional[str] = None,
184        metadata: Optional[Any] = None,
185        tags: Optional[List[str]] = None,
186        public: Optional[bool] = None,
187    ):
188        """Set the trace params that will be used for all following operations.
189
190        Allows setting params of subsequent traces at any point in the code.
191        Overwrites the default params set in the callback constructor.
192
193        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
194
195        Attributes:
196            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
197            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
198            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
199            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
200            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
201            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
202            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
203
204
205        Returns:
206            None
207        """
208        context_trace_metadata.set(
209            {
210                "name": name,
211                "user_id": user_id,
212                "session_id": session_id,
213                "version": version,
214                "release": release,
215                "metadata": metadata,
216                "tags": tags,
217                "public": public,
218            }
219        )

Set the trace params that will be used for all following operations.

Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.

Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.

Attributes:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
  • public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:

None

def start_trace(self, trace_id: Optional[str] = None) -> None:
221    def start_trace(self, trace_id: Optional[str] = None) -> None:
222        """Run when an overall trace is launched."""
223        self._llama_index_trace_name = trace_id

Run when an overall trace is launched.

def end_trace( self, trace_id: Optional[str] = None, trace_map: Optional[Dict[str, List[str]]] = None) -> None:
225    def end_trace(
226        self,
227        trace_id: Optional[str] = None,
228        trace_map: Optional[Dict[str, List[str]]] = None,
229    ) -> None:
230        """Run when an overall trace is exited."""
231        if not trace_map:
232            self.log.debug("No events in trace map to create the observation tree.")
233            return
234
235        # Generate Langfuse observations after trace has ended and full trace_map is available.
236        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
237        # Timestamps remain accurate as they are set at the time of the event.
238        self._create_observations_from_trace_map(
239            event_id=BASE_TRACE_EVENT, trace_map=trace_map
240        )
241        self._update_trace_data(trace_map=trace_map)

Run when an overall trace is exited.

def on_event_start( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', parent_id: str = '', **kwargs: Any) -> str:
243    def on_event_start(
244        self,
245        event_type: CBEventType,
246        payload: Optional[Dict[str, Any]] = None,
247        event_id: str = "",
248        parent_id: str = "",
249        **kwargs: Any,
250    ) -> str:
251        """Run when an event starts and return id of event."""
252        start_event = CallbackEvent(
253            event_id=event_id, event_type=event_type, payload=payload
254        )
255        self.event_map[event_id].append(start_event)
256
257        return event_id

Run when an event starts and return id of event.

def on_event_end( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', **kwargs: Any) -> None:
259    def on_event_end(
260        self,
261        event_type: CBEventType,
262        payload: Optional[Dict[str, Any]] = None,
263        event_id: str = "",
264        **kwargs: Any,
265    ) -> None:
266        """Run when an event ends."""
267        end_event = CallbackEvent(
268            event_id=event_id, event_type=event_type, payload=payload
269        )
270        self.event_map[event_id].append(end_event)
271
272        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
273            generation, trace = self._orphaned_LLM_generations[event_id]
274            self._handle_orphaned_LLM_end_event(
275                end_event, generation=generation, trace=trace
276            )
277            del self._orphaned_LLM_generations[event_id]

Run when an event ends.

Inherited Members
llama_index.core.callbacks.base_handler.BaseCallbackHandler
event_starts_to_ignore
event_ends_to_ignore
langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
trace
get_trace_id
get_trace_url
flush
auth_check
class LlamaIndexInstrumentor:
 28class LlamaIndexInstrumentor:
 29    """Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.
 30
 31    This beta integration is currently under active development and subject to change.
 32    Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931
 33
 34    For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).
 35
 36    Usage:
 37        instrumentor = LlamaIndexInstrumentor()
 38        instrumentor.start()
 39
 40        # After calling start(), all LlamaIndex executions will be automatically traced
 41
 42        # To trace a specific execution or set custom trace ID/params, use the context manager:
 43        with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"):
 44            # Your LlamaIndex code here
 45            index = get_llama_index_index()
 46            response = index.as_query_engine().query("Your query here")
 47
 48        instrumentor.flush()
 49
 50    The instrumentor will automatically capture and log events and spans from LlamaIndex
 51    to Langfuse, providing detailed observability into your LLM application.
 52
 53    Args:
 54        public_key (Optional[str]): Langfuse public key
 55        secret_key (Optional[str]): Langfuse secret key
 56        host (Optional[str]): Langfuse API host
 57        debug (Optional[bool]): Enable debug logging
 58        threads (Optional[int]): Number of threads for async operations
 59        flush_at (Optional[int]): Number of items to flush at
 60        flush_interval (Optional[int]): Flush interval in seconds
 61        max_retries (Optional[int]): Maximum number of retries for failed requests
 62        timeout (Optional[int]): Timeout for requests in seconds
 63        httpx_client (Optional[httpx.Client]): Custom HTTPX client
 64        enabled (Optional[bool]): Enable/disable the instrumentor
 65        sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
 66        mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
 67        environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
 68    """
 69
 70    def __init__(
 71        self,
 72        *,
 73        public_key: Optional[str] = None,
 74        secret_key: Optional[str] = None,
 75        host: Optional[str] = None,
 76        debug: Optional[bool] = None,
 77        threads: Optional[int] = None,
 78        flush_at: Optional[int] = None,
 79        flush_interval: Optional[int] = None,
 80        max_retries: Optional[int] = None,
 81        timeout: Optional[int] = None,
 82        httpx_client: Optional[httpx.Client] = None,
 83        enabled: Optional[bool] = None,
 84        sample_rate: Optional[float] = None,
 85        mask: Optional[MaskFunction] = None,
 86        environment: Optional[str] = None,
 87    ):
 88        self._langfuse = LangfuseSingleton().get(
 89            public_key=public_key,
 90            secret_key=secret_key,
 91            host=host,
 92            debug=debug,
 93            threads=threads,
 94            flush_at=flush_at,
 95            flush_interval=flush_interval,
 96            max_retries=max_retries,
 97            timeout=timeout,
 98            httpx_client=httpx_client,
 99            enabled=enabled,
100            sample_rate=sample_rate,
101            mask=mask,
102            sdk_integration="llama-index_instrumentation",
103            environment=environment,
104        )
105        self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
106        self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
107        self._context = InstrumentorContext()
108
109    def start(self):
110        """Start the automatic tracing of LlamaIndex operations.
111
112        Once called, all subsequent LlamaIndex executions will be automatically traced
113        and logged to Langfuse without any additional code changes required.
114
115        Example:
116            ```python
117            instrumentor = LlamaIndexInstrumentor()
118            instrumentor.start()
119
120            # From this point, all LlamaIndex operations are automatically traced
121            index = VectorStoreIndex.from_documents(documents)
122            query_engine = index.as_query_engine()
123            response = query_engine.query("What is the capital of France?")
124
125            # The above operations will be automatically logged to Langfuse
126            instrumentor.flush()
127            ```
128        """
129        self._context.reset()
130        dispatcher = get_dispatcher()
131
132        # Span Handler
133        if not any(
134            isinstance(handler, type(self._span_handler))
135            for handler in dispatcher.span_handlers
136        ):
137            dispatcher.add_span_handler(self._span_handler)
138
139        # Event Handler
140        if not any(
141            isinstance(handler, type(self._event_handler))
142            for handler in dispatcher.event_handlers
143        ):
144            dispatcher.add_event_handler(self._event_handler)
145
146    def stop(self):
147        """Stop the automatic tracing of LlamaIndex operations.
148
149        This method removes the span and event handlers from the LlamaIndex dispatcher,
150        effectively stopping the automatic tracing and logging to Langfuse.
151
152        After calling this method, LlamaIndex operations will no longer be automatically
153        traced unless `start()` is called again.
154
155        Example:
156            ```python
157            instrumentor = LlamaIndexInstrumentor()
158            instrumentor.start()
159
160            # LlamaIndex operations are automatically traced here
161
162            instrumentor.stop()
163
164            # LlamaIndex operations are no longer automatically traced
165            ```
166        """
167        self._context.reset()
168        dispatcher = get_dispatcher()
169
170        # Span Handler, in-place filter
171        dispatcher.span_handlers[:] = filter(
172            lambda h: not isinstance(h, type(self._span_handler)),
173            dispatcher.span_handlers,
174        )
175
176        # Event Handler, in-place filter
177        dispatcher.event_handlers[:] = filter(
178            lambda h: not isinstance(h, type(self._event_handler)),
179            dispatcher.event_handlers,
180        )
181
182    @contextmanager
183    def observe(
184        self,
185        *,
186        trace_id: Optional[str] = None,
187        parent_observation_id: Optional[str] = None,
188        update_parent: Optional[bool] = None,
189        trace_name: Optional[str] = None,
190        user_id: Optional[str] = None,
191        session_id: Optional[str] = None,
192        version: Optional[str] = None,
193        release: Optional[str] = None,
194        metadata: Optional[Dict[str, Any]] = None,
195        tags: Optional[List[str]] = None,
196        public: Optional[bool] = None,
197    ):
198        """Access context manager for observing and tracing LlamaIndex operations.
199
200        This method allows you to wrap LlamaIndex operations in a context that
201        automatically traces and logs them to Langfuse. It provides fine-grained
202        control over the trace properties and ensures proper instrumentation.
203
204        Args:
205            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
206            parent_observation_id (Optional[str]): ID of the parent observation, if any.
207            update_parent (Optional[bool]): Whether to update the parent trace.
208            trace_name (Optional[str]): Name of the trace.
209            user_id (Optional[str]): ID of the user associated with this trace.
210            session_id (Optional[str]): ID of the session associated with this trace.
211            version (Optional[str]): Version information for this trace.
212            release (Optional[str]): Release information for this trace.
213            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
214            tags (Optional[List[str]]): Tags associated with this trace.
215            public (Optional[bool]): Whether this trace should be public.
216
217        Yields:
218            StatefulTraceClient: A client for interacting with the current trace.
219
220        Example:
221            ```python
222            instrumentor = LlamaIndexInstrumentor()
223
224            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
225                # LlamaIndex operations here will be traced
226                index.as_query_engine().query("What is the capital of France?")
227
228            # Tracing stops after the context manager exits
229
230            instrumentor.flush()
231            ```
232
233        Note:
234            If the instrumentor is not already started, this method will start it
235            for the duration of the context and stop it afterwards.
236        """
237        was_instrumented = self._is_instrumented
238
239        if not was_instrumented:
240            self.start()
241
242        if parent_observation_id is not None and trace_id is None:
243            logger.warning(
244                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
245            )
246            parent_observation_id = None
247
248        final_trace_id = trace_id or str(uuid.uuid4())
249
250        self._context.update(
251            is_user_managed_trace=True,
252            trace_id=final_trace_id,
253            parent_observation_id=parent_observation_id,
254            update_parent=update_parent,
255            trace_name=trace_name,
256            user_id=user_id,
257            session_id=session_id,
258            version=version,
259            release=release,
260            metadata=metadata,
261            tags=tags,
262            public=public,
263        )
264
265        yield self._get_trace_client(final_trace_id)
266
267        self._context.reset()
268
269        if not was_instrumented:
270            self.stop()
271
272    @property
273    def _is_instrumented(self) -> bool:
274        """Check if the dispatcher is instrumented."""
275        dispatcher = get_dispatcher()
276
277        return any(
278            isinstance(handler, type(self._span_handler))
279            for handler in dispatcher.span_handlers
280        ) and any(
281            isinstance(handler, type(self._event_handler))
282            for handler in dispatcher.event_handlers
283        )
284
285    def _get_trace_client(self, trace_id: str) -> StatefulTraceClient:
286        return StatefulTraceClient(
287            client=self._langfuse.client,
288            id=trace_id,
289            trace_id=trace_id,
290            task_manager=self._langfuse.task_manager,
291            state_type=StateType.TRACE,
292            environment=self._langfuse.environment,
293        )
294
295    @property
296    def client_instance(self) -> Langfuse:
297        """Return the Langfuse client instance associated with this instrumentor.
298
299        This property provides access to the underlying Langfuse client, allowing
300        direct interaction with Langfuse functionality if needed.
301
302        Returns:
303            Langfuse: The Langfuse client instance.
304        """
305        return self._langfuse
306
307    def flush(self) -> None:
308        """Flush any pending tasks in the task manager.
309
310        This method ensures that all queued tasks are sent to Langfuse immediately.
311        It's useful for scenarios where you want to guarantee that all instrumentation
312        data has been transmitted before your application terminates or moves on to
313        a different phase.
314
315        Note:
316            This method is a wrapper around the `flush()` method of the underlying
317            Langfuse client instance. It's provided here for convenience and to maintain
318            a consistent interface within the instrumentor.
319
320        Example:
321            ```python
322            instrumentor = LlamaIndexInstrumentor(langfuse_client)
323            # ... perform some operations ...
324            instrumentor.flush()  # Ensure all data is sent to Langfuse
325            ```
326        """
327        self.client_instance.flush()

Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.

This beta integration is currently under active development and subject to change. Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931

For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).

Usage:

instrumentor = LlamaIndexInstrumentor() instrumentor.start()

After calling start(), all LlamaIndex executions will be automatically traced

To trace a specific execution or set custom trace ID/params, use the context manager:

with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): # Your LlamaIndex code here index = get_llama_index_index() response = index.as_query_engine().query("Your query here")

instrumentor.flush()

The instrumentor will automatically capture and log events and spans from LlamaIndex to Langfuse, providing detailed observability into your LLM application.

Arguments:
  • public_key (Optional[str]): Langfuse public key
  • secret_key (Optional[str]): Langfuse secret key
  • host (Optional[str]): Langfuse API host
  • debug (Optional[bool]): Enable debug logging
  • threads (Optional[int]): Number of threads for async operations
  • flush_at (Optional[int]): Number of items to flush at
  • flush_interval (Optional[int]): Flush interval in seconds
  • max_retries (Optional[int]): Maximum number of retries for failed requests
  • timeout (Optional[int]): Timeout for requests in seconds
  • httpx_client (Optional[httpx.Client]): Custom HTTPX client
  • enabled (Optional[bool]): Enable/disable the instrumentor
  • sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
  • mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument data and return a serializable, masked version of the data.
  • environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via LANGFUSE_TRACING_ENVIRONMENT environment variable.
LlamaIndexInstrumentor( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, mask: Optional[langfuse.types.MaskFunction] = None, environment: Optional[str] = None)
 70    def __init__(
 71        self,
 72        *,
 73        public_key: Optional[str] = None,
 74        secret_key: Optional[str] = None,
 75        host: Optional[str] = None,
 76        debug: Optional[bool] = None,
 77        threads: Optional[int] = None,
 78        flush_at: Optional[int] = None,
 79        flush_interval: Optional[int] = None,
 80        max_retries: Optional[int] = None,
 81        timeout: Optional[int] = None,
 82        httpx_client: Optional[httpx.Client] = None,
 83        enabled: Optional[bool] = None,
 84        sample_rate: Optional[float] = None,
 85        mask: Optional[MaskFunction] = None,
 86        environment: Optional[str] = None,
 87    ):
 88        self._langfuse = LangfuseSingleton().get(
 89            public_key=public_key,
 90            secret_key=secret_key,
 91            host=host,
 92            debug=debug,
 93            threads=threads,
 94            flush_at=flush_at,
 95            flush_interval=flush_interval,
 96            max_retries=max_retries,
 97            timeout=timeout,
 98            httpx_client=httpx_client,
 99            enabled=enabled,
100            sample_rate=sample_rate,
101            mask=mask,
102            sdk_integration="llama-index_instrumentation",
103            environment=environment,
104        )
105        self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
106        self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
107        self._context = InstrumentorContext()
def start(self):
109    def start(self):
110        """Start the automatic tracing of LlamaIndex operations.
111
112        Once called, all subsequent LlamaIndex executions will be automatically traced
113        and logged to Langfuse without any additional code changes required.
114
115        Example:
116            ```python
117            instrumentor = LlamaIndexInstrumentor()
118            instrumentor.start()
119
120            # From this point, all LlamaIndex operations are automatically traced
121            index = VectorStoreIndex.from_documents(documents)
122            query_engine = index.as_query_engine()
123            response = query_engine.query("What is the capital of France?")
124
125            # The above operations will be automatically logged to Langfuse
126            instrumentor.flush()
127            ```
128        """
129        self._context.reset()
130        dispatcher = get_dispatcher()
131
132        # Span Handler
133        if not any(
134            isinstance(handler, type(self._span_handler))
135            for handler in dispatcher.span_handlers
136        ):
137            dispatcher.add_span_handler(self._span_handler)
138
139        # Event Handler
140        if not any(
141            isinstance(handler, type(self._event_handler))
142            for handler in dispatcher.event_handlers
143        ):
144            dispatcher.add_event_handler(self._event_handler)

Start the automatic tracing of LlamaIndex operations.

Once called, all subsequent LlamaIndex executions will be automatically traced and logged to Langfuse without any additional code changes required.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# From this point, all LlamaIndex operations are automatically traced
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
response = query_engine.query("What is the capital of France?")

# The above operations will be automatically logged to Langfuse
instrumentor.flush()
def stop(self):
146    def stop(self):
147        """Stop the automatic tracing of LlamaIndex operations.
148
149        This method removes the span and event handlers from the LlamaIndex dispatcher,
150        effectively stopping the automatic tracing and logging to Langfuse.
151
152        After calling this method, LlamaIndex operations will no longer be automatically
153        traced unless `start()` is called again.
154
155        Example:
156            ```python
157            instrumentor = LlamaIndexInstrumentor()
158            instrumentor.start()
159
160            # LlamaIndex operations are automatically traced here
161
162            instrumentor.stop()
163
164            # LlamaIndex operations are no longer automatically traced
165            ```
166        """
167        self._context.reset()
168        dispatcher = get_dispatcher()
169
170        # Span Handler, in-place filter
171        dispatcher.span_handlers[:] = filter(
172            lambda h: not isinstance(h, type(self._span_handler)),
173            dispatcher.span_handlers,
174        )
175
176        # Event Handler, in-place filter
177        dispatcher.event_handlers[:] = filter(
178            lambda h: not isinstance(h, type(self._event_handler)),
179            dispatcher.event_handlers,
180        )

Stop the automatic tracing of LlamaIndex operations.

This method removes the span and event handlers from the LlamaIndex dispatcher, effectively stopping the automatic tracing and logging to Langfuse.

After calling this method, LlamaIndex operations will no longer be automatically traced unless start() is called again.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# LlamaIndex operations are automatically traced here

instrumentor.stop()

# LlamaIndex operations are no longer automatically traced
@contextmanager
def observe( self, *, trace_id: Optional[str] = None, parent_observation_id: Optional[str] = None, update_parent: Optional[bool] = None, trace_name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
182    @contextmanager
183    def observe(
184        self,
185        *,
186        trace_id: Optional[str] = None,
187        parent_observation_id: Optional[str] = None,
188        update_parent: Optional[bool] = None,
189        trace_name: Optional[str] = None,
190        user_id: Optional[str] = None,
191        session_id: Optional[str] = None,
192        version: Optional[str] = None,
193        release: Optional[str] = None,
194        metadata: Optional[Dict[str, Any]] = None,
195        tags: Optional[List[str]] = None,
196        public: Optional[bool] = None,
197    ):
198        """Access context manager for observing and tracing LlamaIndex operations.
199
200        This method allows you to wrap LlamaIndex operations in a context that
201        automatically traces and logs them to Langfuse. It provides fine-grained
202        control over the trace properties and ensures proper instrumentation.
203
204        Args:
205            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
206            parent_observation_id (Optional[str]): ID of the parent observation, if any.
207            update_parent (Optional[bool]): Whether to update the parent trace.
208            trace_name (Optional[str]): Name of the trace.
209            user_id (Optional[str]): ID of the user associated with this trace.
210            session_id (Optional[str]): ID of the session associated with this trace.
211            version (Optional[str]): Version information for this trace.
212            release (Optional[str]): Release information for this trace.
213            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
214            tags (Optional[List[str]]): Tags associated with this trace.
215            public (Optional[bool]): Whether this trace should be public.
216
217        Yields:
218            StatefulTraceClient: A client for interacting with the current trace.
219
220        Example:
221            ```python
222            instrumentor = LlamaIndexInstrumentor()
223
224            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
225                # LlamaIndex operations here will be traced
226                index.as_query_engine().query("What is the capital of France?")
227
228            # Tracing stops after the context manager exits
229
230            instrumentor.flush()
231            ```
232
233        Note:
234            If the instrumentor is not already started, this method will start it
235            for the duration of the context and stop it afterwards.
236        """
237        was_instrumented = self._is_instrumented
238
239        if not was_instrumented:
240            self.start()
241
242        if parent_observation_id is not None and trace_id is None:
243            logger.warning(
244                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
245            )
246            parent_observation_id = None
247
248        final_trace_id = trace_id or str(uuid.uuid4())
249
250        self._context.update(
251            is_user_managed_trace=True,
252            trace_id=final_trace_id,
253            parent_observation_id=parent_observation_id,
254            update_parent=update_parent,
255            trace_name=trace_name,
256            user_id=user_id,
257            session_id=session_id,
258            version=version,
259            release=release,
260            metadata=metadata,
261            tags=tags,
262            public=public,
263        )
264
265        yield self._get_trace_client(final_trace_id)
266
267        self._context.reset()
268
269        if not was_instrumented:
270            self.stop()

Access context manager for observing and tracing LlamaIndex operations.

This method allows you to wrap LlamaIndex operations in a context that automatically traces and logs them to Langfuse. It provides fine-grained control over the trace properties and ensures proper instrumentation.

Arguments:
  • trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
  • parent_observation_id (Optional[str]): ID of the parent observation, if any.
  • update_parent (Optional[bool]): Whether to update the parent trace.
  • trace_name (Optional[str]): Name of the trace.
  • user_id (Optional[str]): ID of the user associated with this trace.
  • session_id (Optional[str]): ID of the session associated with this trace.
  • version (Optional[str]): Version information for this trace.
  • release (Optional[str]): Release information for this trace.
  • metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
  • tags (Optional[List[str]]): Tags associated with this trace.
  • public (Optional[bool]): Whether this trace should be public.
Yields:

StatefulTraceClient: A client for interacting with the current trace.

Example:
instrumentor = LlamaIndexInstrumentor()

with instrumentor.observe(trace_id="unique_id", user_id="user123"):
    # LlamaIndex operations here will be traced
    index.as_query_engine().query("What is the capital of France?")

# Tracing stops after the context manager exits

instrumentor.flush()
Note:

If the instrumentor is not already started, this method will start it for the duration of the context and stop it afterwards.

client_instance: langfuse.client.Langfuse
295    @property
296    def client_instance(self) -> Langfuse:
297        """Return the Langfuse client instance associated with this instrumentor.
298
299        This property provides access to the underlying Langfuse client, allowing
300        direct interaction with Langfuse functionality if needed.
301
302        Returns:
303            Langfuse: The Langfuse client instance.
304        """
305        return self._langfuse

Return the Langfuse client instance associated with this instrumentor.

This property provides access to the underlying Langfuse client, allowing direct interaction with Langfuse functionality if needed.

Returns:

Langfuse: The Langfuse client instance.

def flush(self) -> None:
307    def flush(self) -> None:
308        """Flush any pending tasks in the task manager.
309
310        This method ensures that all queued tasks are sent to Langfuse immediately.
311        It's useful for scenarios where you want to guarantee that all instrumentation
312        data has been transmitted before your application terminates or moves on to
313        a different phase.
314
315        Note:
316            This method is a wrapper around the `flush()` method of the underlying
317            Langfuse client instance. It's provided here for convenience and to maintain
318            a consistent interface within the instrumentor.
319
320        Example:
321            ```python
322            instrumentor = LlamaIndexInstrumentor(langfuse_client)
323            # ... perform some operations ...
324            instrumentor.flush()  # Ensure all data is sent to Langfuse
325            ```
326        """
327        self.client_instance.flush()

Flush any pending tasks in the task manager.

This method ensures that all queued tasks are sent to Langfuse immediately. It's useful for scenarios where you want to guarantee that all instrumentation data has been transmitted before your application terminates or moves on to a different phase.

Note:

This method is a wrapper around the flush() method of the underlying Langfuse client instance. It's provided here for convenience and to maintain a consistent interface within the instrumentor.

Example:
instrumentor = LlamaIndexInstrumentor(langfuse_client)
# ... perform some operations ...
instrumentor.flush()  # Ensure all data is sent to Langfuse