langfuse.llama_index

1from .llama_index import LlamaIndexCallbackHandler
2from ._instrumentor import LlamaIndexInstrumentor
3
4__all__ = [
5    "LlamaIndexCallbackHandler",
6    "LlamaIndexInstrumentor",
7]
@auto_decorate_methods_with(catch_and_log_errors, exclude=['__init__'])
class LlamaIndexCallbackHandler(llama_index.core.callbacks.base_handler.BaseCallbackHandler, langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler):
 57@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"])
 58class LlamaIndexCallbackHandler(
 59    LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler
 60):
 61    """[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead."""
 62
 63    log = logging.getLogger("langfuse")
 64
 65    def __init__(
 66        self,
 67        *,
 68        public_key: Optional[str] = None,
 69        secret_key: Optional[str] = None,
 70        host: Optional[str] = None,
 71        debug: bool = False,
 72        session_id: Optional[str] = None,
 73        user_id: Optional[str] = None,
 74        trace_name: Optional[str] = None,
 75        release: Optional[str] = None,
 76        version: Optional[str] = None,
 77        tags: Optional[List[str]] = None,
 78        metadata: Optional[Any] = None,
 79        threads: Optional[int] = None,
 80        flush_at: Optional[int] = None,
 81        flush_interval: Optional[int] = None,
 82        max_retries: Optional[int] = None,
 83        timeout: Optional[int] = None,
 84        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 85        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 86        tokenizer: Optional[Callable[[str], list]] = None,
 87        enabled: Optional[bool] = None,
 88        httpx_client: Optional[httpx.Client] = None,
 89        sdk_integration: Optional[str] = None,
 90        sample_rate: Optional[float] = None,
 91    ) -> None:
 92        LlamaIndexBaseCallbackHandler.__init__(
 93            self,
 94            event_starts_to_ignore=event_starts_to_ignore or [],
 95            event_ends_to_ignore=event_ends_to_ignore or [],
 96        )
 97        LangfuseBaseCallbackHandler.__init__(
 98            self,
 99            public_key=public_key,
100            secret_key=secret_key,
101            host=host,
102            debug=debug,
103            session_id=session_id,
104            user_id=user_id,
105            trace_name=trace_name,
106            release=release,
107            version=version,
108            tags=tags,
109            metadata=metadata,
110            threads=threads,
111            flush_at=flush_at,
112            flush_interval=flush_interval,
113            max_retries=max_retries,
114            timeout=timeout,
115            enabled=enabled,
116            httpx_client=httpx_client,
117            sdk_integration=sdk_integration or "llama-index_callback",
118            sample_rate=sample_rate,
119        )
120
121        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
122        self._llama_index_trace_name: Optional[str] = None
123        self._token_counter = TokenCounter(tokenizer)
124
125        # For stream-chat, the last LLM end_event arrives after the trace has ended
126        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
127        self._orphaned_LLM_generations: Dict[
128            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
129        ] = {}
130
131    def set_root(
132        self,
133        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
134        *,
135        update_root: bool = False,
136    ) -> None:
137        """Set the root trace or span for the callback handler.
138
139        Args:
140            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
141                be used for all following operations.
142
143        Keyword Args:
144            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
145
146        Returns:
147            None
148        """
149        context_root.set(root)
150
151        if root is None:
152            self.trace = None
153            self.root_span = None
154            self._task_manager = self.langfuse.task_manager if self.langfuse else None
155
156            return
157
158        if isinstance(root, StatefulTraceClient):
159            self.trace = root
160
161        elif isinstance(root, StatefulSpanClient):
162            self.root_span = root
163            self.trace = StatefulTraceClient(
164                root.client,
165                root.trace_id,
166                StateType.TRACE,
167                root.trace_id,
168                root.task_manager,
169            )
170
171        self._task_manager = root.task_manager
172        self.update_stateful_client = update_root
173
174    def set_trace_params(
175        self,
176        name: Optional[str] = None,
177        user_id: Optional[str] = None,
178        session_id: Optional[str] = None,
179        version: Optional[str] = None,
180        release: Optional[str] = None,
181        metadata: Optional[Any] = None,
182        tags: Optional[List[str]] = None,
183        public: Optional[bool] = None,
184    ):
185        """Set the trace params that will be used for all following operations.
186
187        Allows setting params of subsequent traces at any point in the code.
188        Overwrites the default params set in the callback constructor.
189
190        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
191
192        Attributes:
193            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
194            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
195            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
196            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
197            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
198            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
199            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
200
201
202        Returns:
203            None
204        """
205        context_trace_metadata.set(
206            {
207                "name": name,
208                "user_id": user_id,
209                "session_id": session_id,
210                "version": version,
211                "release": release,
212                "metadata": metadata,
213                "tags": tags,
214                "public": public,
215            }
216        )
217
218    def start_trace(self, trace_id: Optional[str] = None) -> None:
219        """Run when an overall trace is launched."""
220        self._llama_index_trace_name = trace_id
221
222    def end_trace(
223        self,
224        trace_id: Optional[str] = None,
225        trace_map: Optional[Dict[str, List[str]]] = None,
226    ) -> None:
227        """Run when an overall trace is exited."""
228        if not trace_map:
229            self.log.debug("No events in trace map to create the observation tree.")
230            return
231
232        # Generate Langfuse observations after trace has ended and full trace_map is available.
233        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
234        # Timestamps remain accurate as they are set at the time of the event.
235        self._create_observations_from_trace_map(
236            event_id=BASE_TRACE_EVENT, trace_map=trace_map
237        )
238        self._update_trace_data(trace_map=trace_map)
239
240    def on_event_start(
241        self,
242        event_type: CBEventType,
243        payload: Optional[Dict[str, Any]] = None,
244        event_id: str = "",
245        parent_id: str = "",
246        **kwargs: Any,
247    ) -> str:
248        """Run when an event starts and return id of event."""
249        start_event = CallbackEvent(
250            event_id=event_id, event_type=event_type, payload=payload
251        )
252        self.event_map[event_id].append(start_event)
253
254        return event_id
255
256    def on_event_end(
257        self,
258        event_type: CBEventType,
259        payload: Optional[Dict[str, Any]] = None,
260        event_id: str = "",
261        **kwargs: Any,
262    ) -> None:
263        """Run when an event ends."""
264        end_event = CallbackEvent(
265            event_id=event_id, event_type=event_type, payload=payload
266        )
267        self.event_map[event_id].append(end_event)
268
269        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
270            generation, trace = self._orphaned_LLM_generations[event_id]
271            self._handle_orphaned_LLM_end_event(
272                end_event, generation=generation, trace=trace
273            )
274            del self._orphaned_LLM_generations[event_id]
275
276    def _create_observations_from_trace_map(
277        self,
278        event_id: str,
279        trace_map: Dict[str, List[str]],
280        parent: Optional[
281            Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient]
282        ] = None,
283    ) -> None:
284        """Recursively create langfuse observations based on the trace_map."""
285        if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id):
286            return
287
288        if event_id == BASE_TRACE_EVENT:
289            observation = self._get_root_observation()
290        else:
291            observation = self._create_observation(
292                event_id=event_id, parent=parent, trace_id=self.trace.id
293            )
294
295        for child_event_id in trace_map.get(event_id, []):
296            self._create_observations_from_trace_map(
297                event_id=child_event_id, parent=observation, trace_map=trace_map
298            )
299
300    def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]:
301        user_provided_root = context_root.get()
302
303        # Get trace metadata from contextvars or use default values
304        trace_metadata = context_trace_metadata.get()
305        name = (
306            trace_metadata["name"]
307            or self.trace_name
308            or f"LlamaIndex_{self._llama_index_trace_name}"
309        )
310        version = trace_metadata["version"] or self.version
311        release = trace_metadata["release"] or self.release
312        session_id = trace_metadata["session_id"] or self.session_id
313        user_id = trace_metadata["user_id"] or self.user_id
314        metadata = trace_metadata["metadata"] or self.metadata
315        tags = trace_metadata["tags"] or self.tags
316        public = trace_metadata["public"] or None
317
318        # Make sure that if a user-provided root is set, it has been set in the same trace
319        # and it's not a root from a different trace
320        if (
321            user_provided_root is not None
322            and self.trace
323            and self.trace.id == user_provided_root.trace_id
324        ):
325            if self.update_stateful_client:
326                user_provided_root.update(
327                    name=name,
328                    version=version,
329                    session_id=session_id,
330                    user_id=user_id,
331                    metadata=metadata,
332                    tags=tags,
333                    release=release,
334                    public=public,
335                )
336
337            return user_provided_root
338
339        else:
340            self.trace = self.langfuse.trace(
341                id=str(uuid4()),
342                name=name,
343                version=version,
344                session_id=session_id,
345                user_id=user_id,
346                metadata=metadata,
347                tags=tags,
348                release=release,
349                public=public,
350            )
351
352            return self.trace
353
354    def _create_observation(
355        self,
356        event_id: str,
357        parent: Union[
358            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
359        ],
360        trace_id: str,
361    ) -> Union[StatefulSpanClient, StatefulGenerationClient]:
362        event_type = self.event_map[event_id][0].event_type
363
364        if event_type == CBEventType.LLM:
365            return self._handle_LLM_events(event_id, parent, trace_id)
366        elif event_type == CBEventType.EMBEDDING:
367            return self._handle_embedding_events(event_id, parent, trace_id)
368        else:
369            return self._handle_span_events(event_id, parent, trace_id)
370
371    def _handle_LLM_events(
372        self,
373        event_id: str,
374        parent: Union[
375            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
376        ],
377        trace_id: str,
378    ) -> StatefulGenerationClient:
379        events = self.event_map[event_id]
380        start_event, end_event = events[0], events[-1]
381
382        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
383            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
384            name = serialized.get("class_name", "LLM")
385            temperature = serialized.get("temperature", None)
386            max_tokens = serialized.get("max_tokens", None)
387            timeout = serialized.get("timeout", None)
388
389        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
390        parsed_metadata = self._parse_metadata_from_event(end_event)
391
392        generation = parent.generation(
393            id=event_id,
394            trace_id=trace_id,
395            version=self.version,
396            name=name,
397            start_time=start_event.time,
398            metadata=parsed_metadata,
399            model_parameters={
400                "temperature": temperature,
401                "max_tokens": max_tokens,
402                "request_timeout": timeout,
403            },
404            **parsed_end_payload,
405        )
406
407        # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id
408        if len(events) == 1:
409            self._orphaned_LLM_generations[event_id] = (generation, self.trace)
410
411        return generation
412
413    def _handle_orphaned_LLM_end_event(
414        self,
415        end_event: CallbackEvent,
416        generation: StatefulGenerationClient,
417        trace: StatefulTraceClient,
418    ) -> None:
419        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
420
421        generation.update(
422            **parsed_end_payload,
423        )
424
425        if generation.trace_id != trace.id:
426            raise ValueError(
427                f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}"
428            )
429
430        trace.update(output=parsed_end_payload["output"])
431
432    def _parse_LLM_end_event_payload(
433        self, end_event: CallbackEvent
434    ) -> ParsedLLMEndPayload:
435        result: ParsedLLMEndPayload = {
436            "input": None,
437            "output": None,
438            "usage": None,
439            "model": None,
440            "end_time": end_event.time,
441        }
442
443        if not end_event.payload:
444            return result
445
446        result["input"] = self._parse_input_from_event(end_event)
447        result["output"] = self._parse_output_from_event(end_event)
448        result["model"], result["usage"] = self._parse_usage_from_event_payload(
449            end_event.payload
450        )
451
452        return result
453
454    def _parse_usage_from_event_payload(self, event_payload: Dict):
455        model = usage = None
456
457        if not (
458            EventPayload.MESSAGES in event_payload
459            and EventPayload.RESPONSE in event_payload
460        ):
461            return model, usage
462
463        response = event_payload.get(EventPayload.RESPONSE)
464
465        if response and hasattr(response, "raw") and response.raw is not None:
466            if isinstance(response.raw, dict):
467                raw_dict = response.raw
468            elif isinstance(response.raw, BaseModel):
469                raw_dict = response.raw.model_dump()
470            else:
471                raw_dict = {}
472
473            model = raw_dict.get("model", None)
474            raw_token_usage = raw_dict.get("usage", {})
475
476            if isinstance(raw_token_usage, dict):
477                token_usage = raw_token_usage
478            elif isinstance(raw_token_usage, BaseModel):
479                token_usage = raw_token_usage.model_dump()
480            else:
481                token_usage = {}
482
483            if token_usage:
484                usage = {
485                    "input": token_usage.get("prompt_tokens", None),
486                    "output": token_usage.get("completion_tokens", None),
487                    "total": token_usage.get("total_tokens", None),
488                }
489
490        return model, usage
491
492    def _handle_embedding_events(
493        self,
494        event_id: str,
495        parent: Union[
496            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
497        ],
498        trace_id: str,
499    ) -> StatefulGenerationClient:
500        events = self.event_map[event_id]
501        start_event, end_event = events[0], events[-1]
502
503        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
504            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
505            name = serialized.get("class_name", "Embedding")
506            model = serialized.get("model_name", None)
507            timeout = serialized.get("timeout", None)
508
509        if end_event.payload:
510            chunks = end_event.payload.get(EventPayload.CHUNKS, [])
511            token_count = sum(
512                self._token_counter.get_string_tokens(chunk) for chunk in chunks
513            )
514
515            usage = {
516                "input": 0,
517                "output": 0,
518                "total": token_count or None,
519            }
520
521        input = self._parse_input_from_event(end_event)
522        output = self._parse_output_from_event(end_event)
523
524        generation = parent.generation(
525            id=event_id,
526            trace_id=trace_id,
527            name=name,
528            start_time=start_event.time,
529            end_time=end_event.time,
530            version=self.version,
531            model=model,
532            input=input,
533            output=output,
534            usage=usage or None,
535            model_parameters={
536                "request_timeout": timeout,
537            },
538        )
539
540        return generation
541
542    def _handle_span_events(
543        self,
544        event_id: str,
545        parent: Union[
546            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
547        ],
548        trace_id: str,
549    ) -> StatefulSpanClient:
550        events = self.event_map[event_id]
551        start_event, end_event = events[0], events[-1]
552
553        extracted_input = self._parse_input_from_event(start_event)
554        extracted_output = self._parse_output_from_event(end_event)
555        extracted_metadata = self._parse_metadata_from_event(end_event)
556
557        metadata = (
558            extracted_metadata if extracted_output != extracted_metadata else None
559        )
560
561        name = start_event.event_type.value
562
563        # Update name to the actual tool's name used by openai agent if available
564        if (
565            name == "function_call"
566            and start_event.payload
567            and start_event.payload.get("tool", None)
568        ):
569            tool_name = start_event.payload.get("tool", name)
570            name = (
571                tool_name
572                if isinstance(tool_name, str)
573                else (
574                    tool_name.name
575                    if hasattr(tool_name, "name")
576                    else tool_name.__class__.__name__
577                )
578            )
579
580        span = parent.span(
581            id=event_id,
582            trace_id=trace_id,
583            start_time=start_event.time,
584            name=name,
585            version=self.version,
586            session_id=self.session_id,
587            input=extracted_input,
588            output=extracted_output,
589            metadata=metadata,
590        )
591
592        if end_event:
593            span.end(end_time=end_event.time)
594
595        return span
596
597    def _update_trace_data(self, trace_map):
598        context_root_value = context_root.get()
599        if context_root_value and not self.update_stateful_client:
600            return
601
602        child_event_ids = trace_map.get(BASE_TRACE_EVENT, [])
603        if not child_event_ids:
604            return
605
606        event_pair = self.event_map.get(child_event_ids[0])
607        if not event_pair or len(event_pair) < 2:
608            return
609
610        start_event, end_event = event_pair
611        input = self._parse_input_from_event(start_event)
612        output = self._parse_output_from_event(end_event)
613
614        if input or output:
615            if context_root_value and self.update_stateful_client:
616                context_root_value.update(input=input, output=output)
617            else:
618                self.trace.update(input=input, output=output)
619
620    def _parse_input_from_event(self, event: CallbackEvent):
621        if event.payload is None:
622            return
623
624        payload = event.payload.copy()
625
626        if EventPayload.SERIALIZED in payload:
627            # Always pop Serialized from payload as it may contain LLM api keys
628            payload.pop(EventPayload.SERIALIZED)
629
630        if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload:
631            chunks = payload.get(EventPayload.CHUNKS)
632            return {"num_chunks": len(chunks)}
633
634        if (
635            event.event_type == CBEventType.NODE_PARSING
636            and EventPayload.DOCUMENTS in payload
637        ):
638            documents = payload.pop(EventPayload.DOCUMENTS)
639            payload["documents"] = [doc.metadata for doc in documents]
640            return payload
641
642        for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]:
643            if key in payload:
644                return payload.get(key)
645
646        return payload or None
647
648    def _parse_output_from_event(self, event: CallbackEvent):
649        if event.payload is None:
650            return
651
652        payload = event.payload.copy()
653
654        if EventPayload.SERIALIZED in payload:
655            # Always pop Serialized from payload as it may contain LLM api keys
656            payload.pop(EventPayload.SERIALIZED)
657
658        if (
659            event.event_type == CBEventType.EMBEDDING
660            and EventPayload.EMBEDDINGS in payload
661        ):
662            embeddings = payload.get(EventPayload.EMBEDDINGS)
663            return {"num_embeddings": len(embeddings)}
664
665        if (
666            event.event_type == CBEventType.NODE_PARSING
667            and EventPayload.NODES in payload
668        ):
669            nodes = payload.pop(EventPayload.NODES)
670            payload["num_nodes"] = len(nodes)
671            return payload
672
673        if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload:
674            chunks = payload.pop(EventPayload.CHUNKS)
675            payload["num_chunks"] = len(chunks)
676
677        if EventPayload.COMPLETION in payload:
678            return payload.get(EventPayload.COMPLETION)
679
680        if EventPayload.RESPONSE in payload:
681            response = payload.get(EventPayload.RESPONSE)
682
683            # Skip streaming responses as consuming them would block the user's execution path
684            if "Streaming" in type(response).__name__:
685                return None
686
687            if hasattr(response, "response"):
688                return response.response
689
690            if hasattr(response, "message"):
691                output = dict(response.message)
692                if "additional_kwargs" in output:
693                    if "tool_calls" in output["additional_kwargs"]:
694                        output["tool_calls"] = output["additional_kwargs"]["tool_calls"]
695
696                    del output["additional_kwargs"]
697
698                return output
699
700        return payload or None
701
702    def _parse_metadata_from_event(self, event: CallbackEvent):
703        if event.payload is None:
704            return
705
706        metadata = {}
707
708        for key in event.payload.keys():
709            if key not in [
710                EventPayload.MESSAGES,
711                EventPayload.QUERY_STR,
712                EventPayload.PROMPT,
713                EventPayload.COMPLETION,
714                EventPayload.SERIALIZED,
715                "additional_kwargs",
716            ]:
717                if key != EventPayload.RESPONSE:
718                    metadata[key] = event.payload[key]
719                else:
720                    response = event.payload.get(EventPayload.RESPONSE)
721
722                    if "Streaming" in type(response).__name__:
723                        continue
724
725                    for res_key, value in vars(response).items():
726                        if (
727                            not res_key.startswith("_")
728                            and res_key
729                            not in [
730                                "response",
731                                "response_txt",
732                                "message",
733                                "additional_kwargs",
734                                "delta",
735                                "raw",
736                            ]
737                            and not isinstance(value, Generator)
738                        ):
739                            metadata[res_key] = value
740
741        return metadata or None

[Deprecated] LlamaIndex callback handler for Langfuse. Deprecated, please use the LlamaIndexInstrumentor instead.

LlamaIndexCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, version: Optional[str] = None, tags: Optional[List[str]] = None, metadata: Optional[Any] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, event_starts_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, event_ends_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, tokenizer: Optional[Callable[[str], list]] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, sample_rate: Optional[float] = None)
 65    def __init__(
 66        self,
 67        *,
 68        public_key: Optional[str] = None,
 69        secret_key: Optional[str] = None,
 70        host: Optional[str] = None,
 71        debug: bool = False,
 72        session_id: Optional[str] = None,
 73        user_id: Optional[str] = None,
 74        trace_name: Optional[str] = None,
 75        release: Optional[str] = None,
 76        version: Optional[str] = None,
 77        tags: Optional[List[str]] = None,
 78        metadata: Optional[Any] = None,
 79        threads: Optional[int] = None,
 80        flush_at: Optional[int] = None,
 81        flush_interval: Optional[int] = None,
 82        max_retries: Optional[int] = None,
 83        timeout: Optional[int] = None,
 84        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 85        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 86        tokenizer: Optional[Callable[[str], list]] = None,
 87        enabled: Optional[bool] = None,
 88        httpx_client: Optional[httpx.Client] = None,
 89        sdk_integration: Optional[str] = None,
 90        sample_rate: Optional[float] = None,
 91    ) -> None:
 92        LlamaIndexBaseCallbackHandler.__init__(
 93            self,
 94            event_starts_to_ignore=event_starts_to_ignore or [],
 95            event_ends_to_ignore=event_ends_to_ignore or [],
 96        )
 97        LangfuseBaseCallbackHandler.__init__(
 98            self,
 99            public_key=public_key,
100            secret_key=secret_key,
101            host=host,
102            debug=debug,
103            session_id=session_id,
104            user_id=user_id,
105            trace_name=trace_name,
106            release=release,
107            version=version,
108            tags=tags,
109            metadata=metadata,
110            threads=threads,
111            flush_at=flush_at,
112            flush_interval=flush_interval,
113            max_retries=max_retries,
114            timeout=timeout,
115            enabled=enabled,
116            httpx_client=httpx_client,
117            sdk_integration=sdk_integration or "llama-index_callback",
118            sample_rate=sample_rate,
119        )
120
121        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
122        self._llama_index_trace_name: Optional[str] = None
123        self._token_counter = TokenCounter(tokenizer)
124
125        # For stream-chat, the last LLM end_event arrives after the trace has ended
126        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
127        self._orphaned_LLM_generations: Dict[
128            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
129        ] = {}

Initialize the base callback handler.

log = <Logger langfuse (WARNING)>
event_map: Dict[str, List[langfuse.llama_index.utils.CallbackEvent]]
def set_root( self, root: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType], *, update_root: bool = False) -> None:
131    def set_root(
132        self,
133        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
134        *,
135        update_root: bool = False,
136    ) -> None:
137        """Set the root trace or span for the callback handler.
138
139        Args:
140            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
141                be used for all following operations.
142
143        Keyword Args:
144            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
145
146        Returns:
147            None
148        """
149        context_root.set(root)
150
151        if root is None:
152            self.trace = None
153            self.root_span = None
154            self._task_manager = self.langfuse.task_manager if self.langfuse else None
155
156            return
157
158        if isinstance(root, StatefulTraceClient):
159            self.trace = root
160
161        elif isinstance(root, StatefulSpanClient):
162            self.root_span = root
163            self.trace = StatefulTraceClient(
164                root.client,
165                root.trace_id,
166                StateType.TRACE,
167                root.trace_id,
168                root.task_manager,
169            )
170
171        self._task_manager = root.task_manager
172        self.update_stateful_client = update_root

Set the root trace or span for the callback handler.

Arguments:
  • root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:

update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.

Returns:

None

def set_trace_params( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
174    def set_trace_params(
175        self,
176        name: Optional[str] = None,
177        user_id: Optional[str] = None,
178        session_id: Optional[str] = None,
179        version: Optional[str] = None,
180        release: Optional[str] = None,
181        metadata: Optional[Any] = None,
182        tags: Optional[List[str]] = None,
183        public: Optional[bool] = None,
184    ):
185        """Set the trace params that will be used for all following operations.
186
187        Allows setting params of subsequent traces at any point in the code.
188        Overwrites the default params set in the callback constructor.
189
190        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
191
192        Attributes:
193            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
194            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
195            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
196            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
197            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
198            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
199            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
200
201
202        Returns:
203            None
204        """
205        context_trace_metadata.set(
206            {
207                "name": name,
208                "user_id": user_id,
209                "session_id": session_id,
210                "version": version,
211                "release": release,
212                "metadata": metadata,
213                "tags": tags,
214                "public": public,
215            }
216        )

Set the trace params that will be used for all following operations.

Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.

Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.

Attributes:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
  • public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:

None

def start_trace(self, trace_id: Optional[str] = None) -> None:
218    def start_trace(self, trace_id: Optional[str] = None) -> None:
219        """Run when an overall trace is launched."""
220        self._llama_index_trace_name = trace_id

Run when an overall trace is launched.

def end_trace( self, trace_id: Optional[str] = None, trace_map: Optional[Dict[str, List[str]]] = None) -> None:
222    def end_trace(
223        self,
224        trace_id: Optional[str] = None,
225        trace_map: Optional[Dict[str, List[str]]] = None,
226    ) -> None:
227        """Run when an overall trace is exited."""
228        if not trace_map:
229            self.log.debug("No events in trace map to create the observation tree.")
230            return
231
232        # Generate Langfuse observations after trace has ended and full trace_map is available.
233        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
234        # Timestamps remain accurate as they are set at the time of the event.
235        self._create_observations_from_trace_map(
236            event_id=BASE_TRACE_EVENT, trace_map=trace_map
237        )
238        self._update_trace_data(trace_map=trace_map)

Run when an overall trace is exited.

def on_event_start( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', parent_id: str = '', **kwargs: Any) -> str:
240    def on_event_start(
241        self,
242        event_type: CBEventType,
243        payload: Optional[Dict[str, Any]] = None,
244        event_id: str = "",
245        parent_id: str = "",
246        **kwargs: Any,
247    ) -> str:
248        """Run when an event starts and return id of event."""
249        start_event = CallbackEvent(
250            event_id=event_id, event_type=event_type, payload=payload
251        )
252        self.event_map[event_id].append(start_event)
253
254        return event_id

Run when an event starts and return id of event.

def on_event_end( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', **kwargs: Any) -> None:
256    def on_event_end(
257        self,
258        event_type: CBEventType,
259        payload: Optional[Dict[str, Any]] = None,
260        event_id: str = "",
261        **kwargs: Any,
262    ) -> None:
263        """Run when an event ends."""
264        end_event = CallbackEvent(
265            event_id=event_id, event_type=event_type, payload=payload
266        )
267        self.event_map[event_id].append(end_event)
268
269        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
270            generation, trace = self._orphaned_LLM_generations[event_id]
271            self._handle_orphaned_LLM_end_event(
272                end_event, generation=generation, trace=trace
273            )
274            del self._orphaned_LLM_generations[event_id]

Run when an event ends.

Inherited Members
llama_index.core.callbacks.base_handler.BaseCallbackHandler
event_starts_to_ignore
event_ends_to_ignore
langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
trace
get_trace_id
get_trace_url
flush
auth_check
class LlamaIndexInstrumentor:
 28class LlamaIndexInstrumentor:
 29    """Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.
 30
 31    This beta integration is currently under active development and subject to change.
 32    Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931
 33
 34    For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).
 35
 36    Usage:
 37        instrumentor = LlamaIndexInstrumentor()
 38        instrumentor.start()
 39
 40        # After calling start(), all LlamaIndex executions will be automatically traced
 41
 42        # To trace a specific execution or set custom trace ID/params, use the context manager:
 43        with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"):
 44            # Your LlamaIndex code here
 45            index = get_llama_index_index()
 46            response = index.as_query_engine().query("Your query here")
 47
 48        instrumentor.flush()
 49
 50    The instrumentor will automatically capture and log events and spans from LlamaIndex
 51    to Langfuse, providing detailed observability into your LLM application.
 52
 53    Args:
 54        public_key (Optional[str]): Langfuse public key
 55        secret_key (Optional[str]): Langfuse secret key
 56        host (Optional[str]): Langfuse API host
 57        debug (Optional[bool]): Enable debug logging
 58        threads (Optional[int]): Number of threads for async operations
 59        flush_at (Optional[int]): Number of items to flush at
 60        flush_interval (Optional[int]): Flush interval in seconds
 61        max_retries (Optional[int]): Maximum number of retries for failed requests
 62        timeout (Optional[int]): Timeout for requests in seconds
 63        httpx_client (Optional[httpx.Client]): Custom HTTPX client
 64        enabled (Optional[bool]): Enable/disable the instrumentor
 65        sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
 66        mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
 67    """
 68
 69    def __init__(
 70        self,
 71        *,
 72        public_key: Optional[str] = None,
 73        secret_key: Optional[str] = None,
 74        host: Optional[str] = None,
 75        debug: Optional[bool] = None,
 76        threads: Optional[int] = None,
 77        flush_at: Optional[int] = None,
 78        flush_interval: Optional[int] = None,
 79        max_retries: Optional[int] = None,
 80        timeout: Optional[int] = None,
 81        httpx_client: Optional[httpx.Client] = None,
 82        enabled: Optional[bool] = None,
 83        sample_rate: Optional[float] = None,
 84        mask: Optional[MaskFunction] = None,
 85    ):
 86        self._langfuse = LangfuseSingleton().get(
 87            public_key=public_key,
 88            secret_key=secret_key,
 89            host=host,
 90            debug=debug,
 91            threads=threads,
 92            flush_at=flush_at,
 93            flush_interval=flush_interval,
 94            max_retries=max_retries,
 95            timeout=timeout,
 96            httpx_client=httpx_client,
 97            enabled=enabled,
 98            sample_rate=sample_rate,
 99            mask=mask,
100            sdk_integration="llama-index_instrumentation",
101        )
102        self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
103        self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
104        self._context = InstrumentorContext()
105
106    def start(self):
107        """Start the automatic tracing of LlamaIndex operations.
108
109        Once called, all subsequent LlamaIndex executions will be automatically traced
110        and logged to Langfuse without any additional code changes required.
111
112        Example:
113            ```python
114            instrumentor = LlamaIndexInstrumentor()
115            instrumentor.start()
116
117            # From this point, all LlamaIndex operations are automatically traced
118            index = VectorStoreIndex.from_documents(documents)
119            query_engine = index.as_query_engine()
120            response = query_engine.query("What is the capital of France?")
121
122            # The above operations will be automatically logged to Langfuse
123            instrumentor.flush()
124            ```
125        """
126        self._context.reset()
127        dispatcher = get_dispatcher()
128
129        # Span Handler
130        if not any(
131            isinstance(handler, type(self._span_handler))
132            for handler in dispatcher.span_handlers
133        ):
134            dispatcher.add_span_handler(self._span_handler)
135
136        # Event Handler
137        if not any(
138            isinstance(handler, type(self._event_handler))
139            for handler in dispatcher.event_handlers
140        ):
141            dispatcher.add_event_handler(self._event_handler)
142
143    def stop(self):
144        """Stop the automatic tracing of LlamaIndex operations.
145
146        This method removes the span and event handlers from the LlamaIndex dispatcher,
147        effectively stopping the automatic tracing and logging to Langfuse.
148
149        After calling this method, LlamaIndex operations will no longer be automatically
150        traced unless `start()` is called again.
151
152        Example:
153            ```python
154            instrumentor = LlamaIndexInstrumentor()
155            instrumentor.start()
156
157            # LlamaIndex operations are automatically traced here
158
159            instrumentor.stop()
160
161            # LlamaIndex operations are no longer automatically traced
162            ```
163        """
164        self._context.reset()
165        dispatcher = get_dispatcher()
166
167        # Span Handler, in-place filter
168        dispatcher.span_handlers[:] = filter(
169            lambda h: not isinstance(h, type(self._span_handler)),
170            dispatcher.span_handlers,
171        )
172
173        # Event Handler, in-place filter
174        dispatcher.event_handlers[:] = filter(
175            lambda h: not isinstance(h, type(self._event_handler)),
176            dispatcher.event_handlers,
177        )
178
179    @contextmanager
180    def observe(
181        self,
182        *,
183        trace_id: Optional[str] = None,
184        parent_observation_id: Optional[str] = None,
185        update_parent: Optional[bool] = None,
186        trace_name: Optional[str] = None,
187        user_id: Optional[str] = None,
188        session_id: Optional[str] = None,
189        version: Optional[str] = None,
190        release: Optional[str] = None,
191        metadata: Optional[Dict[str, Any]] = None,
192        tags: Optional[List[str]] = None,
193        public: Optional[bool] = None,
194    ):
195        """Access context manager for observing and tracing LlamaIndex operations.
196
197        This method allows you to wrap LlamaIndex operations in a context that
198        automatically traces and logs them to Langfuse. It provides fine-grained
199        control over the trace properties and ensures proper instrumentation.
200
201        Args:
202            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
203            parent_observation_id (Optional[str]): ID of the parent observation, if any.
204            update_parent (Optional[bool]): Whether to update the parent trace.
205            trace_name (Optional[str]): Name of the trace.
206            user_id (Optional[str]): ID of the user associated with this trace.
207            session_id (Optional[str]): ID of the session associated with this trace.
208            version (Optional[str]): Version information for this trace.
209            release (Optional[str]): Release information for this trace.
210            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
211            tags (Optional[List[str]]): Tags associated with this trace.
212            public (Optional[bool]): Whether this trace should be public.
213
214        Yields:
215            StatefulTraceClient: A client for interacting with the current trace.
216
217        Example:
218            ```python
219            instrumentor = LlamaIndexInstrumentor()
220
221            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
222                # LlamaIndex operations here will be traced
223                index.as_query_engine().query("What is the capital of France?")
224
225            # Tracing stops after the context manager exits
226
227            instrumentor.flush()
228            ```
229
230        Note:
231            If the instrumentor is not already started, this method will start it
232            for the duration of the context and stop it afterwards.
233        """
234        was_instrumented = self._is_instrumented
235
236        if not was_instrumented:
237            self.start()
238
239        if parent_observation_id is not None and trace_id is None:
240            logger.warning(
241                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
242            )
243            parent_observation_id = None
244
245        final_trace_id = trace_id or str(uuid.uuid4())
246
247        self._context.update(
248            is_user_managed_trace=True,
249            trace_id=final_trace_id,
250            parent_observation_id=parent_observation_id,
251            update_parent=update_parent,
252            trace_name=trace_name,
253            user_id=user_id,
254            session_id=session_id,
255            version=version,
256            release=release,
257            metadata=metadata,
258            tags=tags,
259            public=public,
260        )
261
262        yield self._get_trace_client(final_trace_id)
263
264        self._context.reset()
265
266        if not was_instrumented:
267            self.stop()
268
269    @property
270    def _is_instrumented(self) -> bool:
271        """Check if the dispatcher is instrumented."""
272        dispatcher = get_dispatcher()
273
274        return any(
275            isinstance(handler, type(self._span_handler))
276            for handler in dispatcher.span_handlers
277        ) and any(
278            isinstance(handler, type(self._event_handler))
279            for handler in dispatcher.event_handlers
280        )
281
282    def _get_trace_client(self, trace_id: str) -> StatefulTraceClient:
283        return StatefulTraceClient(
284            client=self._langfuse.client,
285            id=trace_id,
286            trace_id=trace_id,
287            task_manager=self._langfuse.task_manager,
288            state_type=StateType.TRACE,
289        )
290
291    @property
292    def client_instance(self) -> Langfuse:
293        """Return the Langfuse client instance associated with this instrumentor.
294
295        This property provides access to the underlying Langfuse client, allowing
296        direct interaction with Langfuse functionality if needed.
297
298        Returns:
299            Langfuse: The Langfuse client instance.
300        """
301        return self._langfuse
302
303    def flush(self) -> None:
304        """Flush any pending tasks in the task manager.
305
306        This method ensures that all queued tasks are sent to Langfuse immediately.
307        It's useful for scenarios where you want to guarantee that all instrumentation
308        data has been transmitted before your application terminates or moves on to
309        a different phase.
310
311        Note:
312            This method is a wrapper around the `flush()` method of the underlying
313            Langfuse client instance. It's provided here for convenience and to maintain
314            a consistent interface within the instrumentor.
315
316        Example:
317            ```python
318            instrumentor = LlamaIndexInstrumentor(langfuse_client)
319            # ... perform some operations ...
320            instrumentor.flush()  # Ensure all data is sent to Langfuse
321            ```
322        """
323        self.client_instance.flush()

Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.

This beta integration is currently under active development and subject to change. Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931

For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).

Usage:

instrumentor = LlamaIndexInstrumentor() instrumentor.start()

After calling start(), all LlamaIndex executions will be automatically traced

To trace a specific execution or set custom trace ID/params, use the context manager:

with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): # Your LlamaIndex code here index = get_llama_index_index() response = index.as_query_engine().query("Your query here")

instrumentor.flush()

The instrumentor will automatically capture and log events and spans from LlamaIndex to Langfuse, providing detailed observability into your LLM application.

Arguments:
  • public_key (Optional[str]): Langfuse public key
  • secret_key (Optional[str]): Langfuse secret key
  • host (Optional[str]): Langfuse API host
  • debug (Optional[bool]): Enable debug logging
  • threads (Optional[int]): Number of threads for async operations
  • flush_at (Optional[int]): Number of items to flush at
  • flush_interval (Optional[int]): Flush interval in seconds
  • max_retries (Optional[int]): Maximum number of retries for failed requests
  • timeout (Optional[int]): Timeout for requests in seconds
  • httpx_client (Optional[httpx.Client]): Custom HTTPX client
  • enabled (Optional[bool]): Enable/disable the instrumentor
  • sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
  • mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument data and return a serializable, masked version of the data.
LlamaIndexInstrumentor( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, mask: Optional[langfuse.types.MaskFunction] = None)
 69    def __init__(
 70        self,
 71        *,
 72        public_key: Optional[str] = None,
 73        secret_key: Optional[str] = None,
 74        host: Optional[str] = None,
 75        debug: Optional[bool] = None,
 76        threads: Optional[int] = None,
 77        flush_at: Optional[int] = None,
 78        flush_interval: Optional[int] = None,
 79        max_retries: Optional[int] = None,
 80        timeout: Optional[int] = None,
 81        httpx_client: Optional[httpx.Client] = None,
 82        enabled: Optional[bool] = None,
 83        sample_rate: Optional[float] = None,
 84        mask: Optional[MaskFunction] = None,
 85    ):
 86        self._langfuse = LangfuseSingleton().get(
 87            public_key=public_key,
 88            secret_key=secret_key,
 89            host=host,
 90            debug=debug,
 91            threads=threads,
 92            flush_at=flush_at,
 93            flush_interval=flush_interval,
 94            max_retries=max_retries,
 95            timeout=timeout,
 96            httpx_client=httpx_client,
 97            enabled=enabled,
 98            sample_rate=sample_rate,
 99            mask=mask,
100            sdk_integration="llama-index_instrumentation",
101        )
102        self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
103        self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
104        self._context = InstrumentorContext()
def start(self):
106    def start(self):
107        """Start the automatic tracing of LlamaIndex operations.
108
109        Once called, all subsequent LlamaIndex executions will be automatically traced
110        and logged to Langfuse without any additional code changes required.
111
112        Example:
113            ```python
114            instrumentor = LlamaIndexInstrumentor()
115            instrumentor.start()
116
117            # From this point, all LlamaIndex operations are automatically traced
118            index = VectorStoreIndex.from_documents(documents)
119            query_engine = index.as_query_engine()
120            response = query_engine.query("What is the capital of France?")
121
122            # The above operations will be automatically logged to Langfuse
123            instrumentor.flush()
124            ```
125        """
126        self._context.reset()
127        dispatcher = get_dispatcher()
128
129        # Span Handler
130        if not any(
131            isinstance(handler, type(self._span_handler))
132            for handler in dispatcher.span_handlers
133        ):
134            dispatcher.add_span_handler(self._span_handler)
135
136        # Event Handler
137        if not any(
138            isinstance(handler, type(self._event_handler))
139            for handler in dispatcher.event_handlers
140        ):
141            dispatcher.add_event_handler(self._event_handler)

Start the automatic tracing of LlamaIndex operations.

Once called, all subsequent LlamaIndex executions will be automatically traced and logged to Langfuse without any additional code changes required.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# From this point, all LlamaIndex operations are automatically traced
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
response = query_engine.query("What is the capital of France?")

# The above operations will be automatically logged to Langfuse
instrumentor.flush()
def stop(self):
143    def stop(self):
144        """Stop the automatic tracing of LlamaIndex operations.
145
146        This method removes the span and event handlers from the LlamaIndex dispatcher,
147        effectively stopping the automatic tracing and logging to Langfuse.
148
149        After calling this method, LlamaIndex operations will no longer be automatically
150        traced unless `start()` is called again.
151
152        Example:
153            ```python
154            instrumentor = LlamaIndexInstrumentor()
155            instrumentor.start()
156
157            # LlamaIndex operations are automatically traced here
158
159            instrumentor.stop()
160
161            # LlamaIndex operations are no longer automatically traced
162            ```
163        """
164        self._context.reset()
165        dispatcher = get_dispatcher()
166
167        # Span Handler, in-place filter
168        dispatcher.span_handlers[:] = filter(
169            lambda h: not isinstance(h, type(self._span_handler)),
170            dispatcher.span_handlers,
171        )
172
173        # Event Handler, in-place filter
174        dispatcher.event_handlers[:] = filter(
175            lambda h: not isinstance(h, type(self._event_handler)),
176            dispatcher.event_handlers,
177        )

Stop the automatic tracing of LlamaIndex operations.

This method removes the span and event handlers from the LlamaIndex dispatcher, effectively stopping the automatic tracing and logging to Langfuse.

After calling this method, LlamaIndex operations will no longer be automatically traced unless start() is called again.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# LlamaIndex operations are automatically traced here

instrumentor.stop()

# LlamaIndex operations are no longer automatically traced
@contextmanager
def observe( self, *, trace_id: Optional[str] = None, parent_observation_id: Optional[str] = None, update_parent: Optional[bool] = None, trace_name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
179    @contextmanager
180    def observe(
181        self,
182        *,
183        trace_id: Optional[str] = None,
184        parent_observation_id: Optional[str] = None,
185        update_parent: Optional[bool] = None,
186        trace_name: Optional[str] = None,
187        user_id: Optional[str] = None,
188        session_id: Optional[str] = None,
189        version: Optional[str] = None,
190        release: Optional[str] = None,
191        metadata: Optional[Dict[str, Any]] = None,
192        tags: Optional[List[str]] = None,
193        public: Optional[bool] = None,
194    ):
195        """Access context manager for observing and tracing LlamaIndex operations.
196
197        This method allows you to wrap LlamaIndex operations in a context that
198        automatically traces and logs them to Langfuse. It provides fine-grained
199        control over the trace properties and ensures proper instrumentation.
200
201        Args:
202            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
203            parent_observation_id (Optional[str]): ID of the parent observation, if any.
204            update_parent (Optional[bool]): Whether to update the parent trace.
205            trace_name (Optional[str]): Name of the trace.
206            user_id (Optional[str]): ID of the user associated with this trace.
207            session_id (Optional[str]): ID of the session associated with this trace.
208            version (Optional[str]): Version information for this trace.
209            release (Optional[str]): Release information for this trace.
210            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
211            tags (Optional[List[str]]): Tags associated with this trace.
212            public (Optional[bool]): Whether this trace should be public.
213
214        Yields:
215            StatefulTraceClient: A client for interacting with the current trace.
216
217        Example:
218            ```python
219            instrumentor = LlamaIndexInstrumentor()
220
221            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
222                # LlamaIndex operations here will be traced
223                index.as_query_engine().query("What is the capital of France?")
224
225            # Tracing stops after the context manager exits
226
227            instrumentor.flush()
228            ```
229
230        Note:
231            If the instrumentor is not already started, this method will start it
232            for the duration of the context and stop it afterwards.
233        """
234        was_instrumented = self._is_instrumented
235
236        if not was_instrumented:
237            self.start()
238
239        if parent_observation_id is not None and trace_id is None:
240            logger.warning(
241                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
242            )
243            parent_observation_id = None
244
245        final_trace_id = trace_id or str(uuid.uuid4())
246
247        self._context.update(
248            is_user_managed_trace=True,
249            trace_id=final_trace_id,
250            parent_observation_id=parent_observation_id,
251            update_parent=update_parent,
252            trace_name=trace_name,
253            user_id=user_id,
254            session_id=session_id,
255            version=version,
256            release=release,
257            metadata=metadata,
258            tags=tags,
259            public=public,
260        )
261
262        yield self._get_trace_client(final_trace_id)
263
264        self._context.reset()
265
266        if not was_instrumented:
267            self.stop()

Access context manager for observing and tracing LlamaIndex operations.

This method allows you to wrap LlamaIndex operations in a context that automatically traces and logs them to Langfuse. It provides fine-grained control over the trace properties and ensures proper instrumentation.

Arguments:
  • trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
  • parent_observation_id (Optional[str]): ID of the parent observation, if any.
  • update_parent (Optional[bool]): Whether to update the parent trace.
  • trace_name (Optional[str]): Name of the trace.
  • user_id (Optional[str]): ID of the user associated with this trace.
  • session_id (Optional[str]): ID of the session associated with this trace.
  • version (Optional[str]): Version information for this trace.
  • release (Optional[str]): Release information for this trace.
  • metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
  • tags (Optional[List[str]]): Tags associated with this trace.
  • public (Optional[bool]): Whether this trace should be public.
Yields:

StatefulTraceClient: A client for interacting with the current trace.

Example:
instrumentor = LlamaIndexInstrumentor()

with instrumentor.observe(trace_id="unique_id", user_id="user123"):
    # LlamaIndex operations here will be traced
    index.as_query_engine().query("What is the capital of France?")

# Tracing stops after the context manager exits

instrumentor.flush()
Note:

If the instrumentor is not already started, this method will start it for the duration of the context and stop it afterwards.

client_instance: langfuse.client.Langfuse
291    @property
292    def client_instance(self) -> Langfuse:
293        """Return the Langfuse client instance associated with this instrumentor.
294
295        This property provides access to the underlying Langfuse client, allowing
296        direct interaction with Langfuse functionality if needed.
297
298        Returns:
299            Langfuse: The Langfuse client instance.
300        """
301        return self._langfuse

Return the Langfuse client instance associated with this instrumentor.

This property provides access to the underlying Langfuse client, allowing direct interaction with Langfuse functionality if needed.

Returns:

Langfuse: The Langfuse client instance.

def flush(self) -> None:
303    def flush(self) -> None:
304        """Flush any pending tasks in the task manager.
305
306        This method ensures that all queued tasks are sent to Langfuse immediately.
307        It's useful for scenarios where you want to guarantee that all instrumentation
308        data has been transmitted before your application terminates or moves on to
309        a different phase.
310
311        Note:
312            This method is a wrapper around the `flush()` method of the underlying
313            Langfuse client instance. It's provided here for convenience and to maintain
314            a consistent interface within the instrumentor.
315
316        Example:
317            ```python
318            instrumentor = LlamaIndexInstrumentor(langfuse_client)
319            # ... perform some operations ...
320            instrumentor.flush()  # Ensure all data is sent to Langfuse
321            ```
322        """
323        self.client_instance.flush()

Flush any pending tasks in the task manager.

This method ensures that all queued tasks are sent to Langfuse immediately. It's useful for scenarios where you want to guarantee that all instrumentation data has been transmitted before your application terminates or moves on to a different phase.

Note:

This method is a wrapper around the flush() method of the underlying Langfuse client instance. It's provided here for convenience and to maintain a consistent interface within the instrumentor.

Example:
instrumentor = LlamaIndexInstrumentor(langfuse_client)
# ... perform some operations ...
instrumentor.flush()  # Ensure all data is sent to Langfuse