langfuse.llama_index

1from .llama_index import LlamaIndexCallbackHandler
2from ._instrumentor import LlamaIndexInstrumentor
3
4__all__ = [
5    "LlamaIndexCallbackHandler",
6    "LlamaIndexInstrumentor",
7]
@auto_decorate_methods_with(catch_and_log_errors, exclude=['__init__'])
class LlamaIndexCallbackHandler(llama_index.core.callbacks.base_handler.BaseCallbackHandler, langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler):
 57@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"])
 58class LlamaIndexCallbackHandler(
 59    LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler
 60):
 61    """LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future."""
 62
 63    log = logging.getLogger("langfuse")
 64
 65    def __init__(
 66        self,
 67        *,
 68        public_key: Optional[str] = None,
 69        secret_key: Optional[str] = None,
 70        host: Optional[str] = None,
 71        debug: bool = False,
 72        session_id: Optional[str] = None,
 73        user_id: Optional[str] = None,
 74        trace_name: Optional[str] = None,
 75        release: Optional[str] = None,
 76        version: Optional[str] = None,
 77        tags: Optional[List[str]] = None,
 78        metadata: Optional[Any] = None,
 79        threads: Optional[int] = None,
 80        flush_at: Optional[int] = None,
 81        flush_interval: Optional[int] = None,
 82        max_retries: Optional[int] = None,
 83        timeout: Optional[int] = None,
 84        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 85        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 86        tokenizer: Optional[Callable[[str], list]] = None,
 87        enabled: Optional[bool] = None,
 88        httpx_client: Optional[httpx.Client] = None,
 89        sdk_integration: Optional[str] = None,
 90        sample_rate: Optional[float] = None,
 91    ) -> None:
 92        LlamaIndexBaseCallbackHandler.__init__(
 93            self,
 94            event_starts_to_ignore=event_starts_to_ignore or [],
 95            event_ends_to_ignore=event_ends_to_ignore or [],
 96        )
 97        LangfuseBaseCallbackHandler.__init__(
 98            self,
 99            public_key=public_key,
100            secret_key=secret_key,
101            host=host,
102            debug=debug,
103            session_id=session_id,
104            user_id=user_id,
105            trace_name=trace_name,
106            release=release,
107            version=version,
108            tags=tags,
109            metadata=metadata,
110            threads=threads,
111            flush_at=flush_at,
112            flush_interval=flush_interval,
113            max_retries=max_retries,
114            timeout=timeout,
115            enabled=enabled,
116            httpx_client=httpx_client,
117            sdk_integration=sdk_integration or "llama-index_callback",
118            sample_rate=sample_rate,
119        )
120
121        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
122        self._llama_index_trace_name: Optional[str] = None
123        self._token_counter = TokenCounter(tokenizer)
124
125        # For stream-chat, the last LLM end_event arrives after the trace has ended
126        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
127        self._orphaned_LLM_generations: Dict[
128            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
129        ] = {}
130
131    def set_root(
132        self,
133        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
134        *,
135        update_root: bool = False,
136    ) -> None:
137        """Set the root trace or span for the callback handler.
138
139        Args:
140            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
141                be used for all following operations.
142
143        Keyword Args:
144            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
145
146        Returns:
147            None
148        """
149        context_root.set(root)
150
151        if root is None:
152            self.trace = None
153            self.root_span = None
154            self._task_manager = self.langfuse.task_manager if self.langfuse else None
155
156            return
157
158        if isinstance(root, StatefulTraceClient):
159            self.trace = root
160
161        elif isinstance(root, StatefulSpanClient):
162            self.root_span = root
163            self.trace = StatefulTraceClient(
164                root.client,
165                root.trace_id,
166                StateType.TRACE,
167                root.trace_id,
168                root.task_manager,
169            )
170
171        self._task_manager = root.task_manager
172        self.update_stateful_client = update_root
173
174    def set_trace_params(
175        self,
176        name: Optional[str] = None,
177        user_id: Optional[str] = None,
178        session_id: Optional[str] = None,
179        version: Optional[str] = None,
180        release: Optional[str] = None,
181        metadata: Optional[Any] = None,
182        tags: Optional[List[str]] = None,
183        public: Optional[bool] = None,
184    ):
185        """Set the trace params that will be used for all following operations.
186
187        Allows setting params of subsequent traces at any point in the code.
188        Overwrites the default params set in the callback constructor.
189
190        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
191
192        Attributes:
193            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
194            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
195            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
196            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
197            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
198            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
199            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
200
201
202        Returns:
203            None
204        """
205        context_trace_metadata.set(
206            {
207                "name": name,
208                "user_id": user_id,
209                "session_id": session_id,
210                "version": version,
211                "release": release,
212                "metadata": metadata,
213                "tags": tags,
214                "public": public,
215            }
216        )
217
218    def start_trace(self, trace_id: Optional[str] = None) -> None:
219        """Run when an overall trace is launched."""
220        self._llama_index_trace_name = trace_id
221
222    def end_trace(
223        self,
224        trace_id: Optional[str] = None,
225        trace_map: Optional[Dict[str, List[str]]] = None,
226    ) -> None:
227        """Run when an overall trace is exited."""
228        if not trace_map:
229            self.log.debug("No events in trace map to create the observation tree.")
230            return
231
232        # Generate Langfuse observations after trace has ended and full trace_map is available.
233        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
234        # Timestamps remain accurate as they are set at the time of the event.
235        self._create_observations_from_trace_map(
236            event_id=BASE_TRACE_EVENT, trace_map=trace_map
237        )
238        self._update_trace_data(trace_map=trace_map)
239
240    def on_event_start(
241        self,
242        event_type: CBEventType,
243        payload: Optional[Dict[str, Any]] = None,
244        event_id: str = "",
245        parent_id: str = "",
246        **kwargs: Any,
247    ) -> str:
248        """Run when an event starts and return id of event."""
249        start_event = CallbackEvent(
250            event_id=event_id, event_type=event_type, payload=payload
251        )
252        self.event_map[event_id].append(start_event)
253
254        return event_id
255
256    def on_event_end(
257        self,
258        event_type: CBEventType,
259        payload: Optional[Dict[str, Any]] = None,
260        event_id: str = "",
261        **kwargs: Any,
262    ) -> None:
263        """Run when an event ends."""
264        end_event = CallbackEvent(
265            event_id=event_id, event_type=event_type, payload=payload
266        )
267        self.event_map[event_id].append(end_event)
268
269        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
270            generation, trace = self._orphaned_LLM_generations[event_id]
271            self._handle_orphaned_LLM_end_event(
272                end_event, generation=generation, trace=trace
273            )
274            del self._orphaned_LLM_generations[event_id]
275
276    def _create_observations_from_trace_map(
277        self,
278        event_id: str,
279        trace_map: Dict[str, List[str]],
280        parent: Optional[
281            Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient]
282        ] = None,
283    ) -> None:
284        """Recursively create langfuse observations based on the trace_map."""
285        if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id):
286            return
287
288        if event_id == BASE_TRACE_EVENT:
289            observation = self._get_root_observation()
290        else:
291            observation = self._create_observation(
292                event_id=event_id, parent=parent, trace_id=self.trace.id
293            )
294
295        for child_event_id in trace_map.get(event_id, []):
296            self._create_observations_from_trace_map(
297                event_id=child_event_id, parent=observation, trace_map=trace_map
298            )
299
300    def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]:
301        user_provided_root = context_root.get()
302
303        # Get trace metadata from contextvars or use default values
304        trace_metadata = context_trace_metadata.get()
305        name = (
306            trace_metadata["name"]
307            or self.trace_name
308            or f"LlamaIndex_{self._llama_index_trace_name}"
309        )
310        version = trace_metadata["version"] or self.version
311        release = trace_metadata["release"] or self.release
312        session_id = trace_metadata["session_id"] or self.session_id
313        user_id = trace_metadata["user_id"] or self.user_id
314        metadata = trace_metadata["metadata"] or self.metadata
315        tags = trace_metadata["tags"] or self.tags
316        public = trace_metadata["public"] or None
317
318        # Make sure that if a user-provided root is set, it has been set in the same trace
319        # and it's not a root from a different trace
320        if (
321            user_provided_root is not None
322            and self.trace
323            and self.trace.id == user_provided_root.trace_id
324        ):
325            if self.update_stateful_client:
326                user_provided_root.update(
327                    name=name,
328                    version=version,
329                    session_id=session_id,
330                    user_id=user_id,
331                    metadata=metadata,
332                    tags=tags,
333                    release=release,
334                    public=public,
335                )
336
337            return user_provided_root
338
339        else:
340            self.trace = self.langfuse.trace(
341                id=str(uuid4()),
342                name=name,
343                version=version,
344                session_id=session_id,
345                user_id=user_id,
346                metadata=metadata,
347                tags=tags,
348                release=release,
349                public=public,
350            )
351
352            return self.trace
353
354    def _create_observation(
355        self,
356        event_id: str,
357        parent: Union[
358            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
359        ],
360        trace_id: str,
361    ) -> Union[StatefulSpanClient, StatefulGenerationClient]:
362        event_type = self.event_map[event_id][0].event_type
363
364        if event_type == CBEventType.LLM:
365            return self._handle_LLM_events(event_id, parent, trace_id)
366        elif event_type == CBEventType.EMBEDDING:
367            return self._handle_embedding_events(event_id, parent, trace_id)
368        else:
369            return self._handle_span_events(event_id, parent, trace_id)
370
371    def _handle_LLM_events(
372        self,
373        event_id: str,
374        parent: Union[
375            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
376        ],
377        trace_id: str,
378    ) -> StatefulGenerationClient:
379        events = self.event_map[event_id]
380        start_event, end_event = events[0], events[-1]
381
382        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
383            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
384            name = serialized.get("class_name", "LLM")
385            temperature = serialized.get("temperature", None)
386            max_tokens = serialized.get("max_tokens", None)
387            timeout = serialized.get("timeout", None)
388
389        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
390        parsed_metadata = self._parse_metadata_from_event(end_event)
391
392        generation = parent.generation(
393            id=event_id,
394            trace_id=trace_id,
395            version=self.version,
396            name=name,
397            start_time=start_event.time,
398            metadata=parsed_metadata,
399            model_parameters={
400                "temperature": temperature,
401                "max_tokens": max_tokens,
402                "request_timeout": timeout,
403            },
404            **parsed_end_payload,
405        )
406
407        # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id
408        if len(events) == 1:
409            self._orphaned_LLM_generations[event_id] = (generation, self.trace)
410
411        return generation
412
413    def _handle_orphaned_LLM_end_event(
414        self,
415        end_event: CallbackEvent,
416        generation: StatefulGenerationClient,
417        trace: StatefulTraceClient,
418    ) -> None:
419        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
420
421        generation.update(
422            **parsed_end_payload,
423        )
424
425        if generation.trace_id != trace.id:
426            raise ValueError(
427                f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}"
428            )
429
430        trace.update(output=parsed_end_payload["output"])
431
432    def _parse_LLM_end_event_payload(
433        self, end_event: CallbackEvent
434    ) -> ParsedLLMEndPayload:
435        result: ParsedLLMEndPayload = {
436            "input": None,
437            "output": None,
438            "usage": None,
439            "model": None,
440            "end_time": end_event.time,
441        }
442
443        if not end_event.payload:
444            return result
445
446        result["input"] = self._parse_input_from_event(end_event)
447        result["output"] = self._parse_output_from_event(end_event)
448        result["model"], result["usage"] = self._parse_usage_from_event_payload(
449            end_event.payload
450        )
451
452        return result
453
454    def _parse_usage_from_event_payload(self, event_payload: Dict):
455        model = usage = None
456
457        if not (
458            EventPayload.MESSAGES in event_payload
459            and EventPayload.RESPONSE in event_payload
460        ):
461            return model, usage
462
463        response = event_payload.get(EventPayload.RESPONSE)
464
465        if response and hasattr(response, "raw") and response.raw is not None:
466            if isinstance(response.raw, dict):
467                raw_dict = response.raw
468            elif isinstance(response.raw, BaseModel):
469                raw_dict = response.raw.model_dump()
470            else:
471                raw_dict = {}
472
473            model = raw_dict.get("model", None)
474            raw_token_usage = raw_dict.get("usage", {})
475
476            if isinstance(raw_token_usage, dict):
477                token_usage = raw_token_usage
478            elif isinstance(raw_token_usage, BaseModel):
479                token_usage = raw_token_usage.model_dump()
480            else:
481                token_usage = {}
482
483            if token_usage:
484                usage = {
485                    "input": token_usage.get("prompt_tokens", None),
486                    "output": token_usage.get("completion_tokens", None),
487                    "total": token_usage.get("total_tokens", None),
488                }
489
490        return model, usage
491
492    def _handle_embedding_events(
493        self,
494        event_id: str,
495        parent: Union[
496            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
497        ],
498        trace_id: str,
499    ) -> StatefulGenerationClient:
500        events = self.event_map[event_id]
501        start_event, end_event = events[0], events[-1]
502
503        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
504            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
505            name = serialized.get("class_name", "Embedding")
506            model = serialized.get("model_name", None)
507            timeout = serialized.get("timeout", None)
508
509        if end_event.payload:
510            chunks = end_event.payload.get(EventPayload.CHUNKS, [])
511            token_count = sum(
512                self._token_counter.get_string_tokens(chunk) for chunk in chunks
513            )
514
515            usage = {
516                "input": 0,
517                "output": 0,
518                "total": token_count or None,
519            }
520
521        input = self._parse_input_from_event(end_event)
522        output = self._parse_output_from_event(end_event)
523
524        generation = parent.generation(
525            id=event_id,
526            trace_id=trace_id,
527            name=name,
528            start_time=start_event.time,
529            end_time=end_event.time,
530            version=self.version,
531            model=model,
532            input=input,
533            output=output,
534            usage=usage or None,
535            model_parameters={
536                "request_timeout": timeout,
537            },
538        )
539
540        return generation
541
542    def _handle_span_events(
543        self,
544        event_id: str,
545        parent: Union[
546            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
547        ],
548        trace_id: str,
549    ) -> StatefulSpanClient:
550        events = self.event_map[event_id]
551        start_event, end_event = events[0], events[-1]
552
553        extracted_input = self._parse_input_from_event(start_event)
554        extracted_output = self._parse_output_from_event(end_event)
555        extracted_metadata = self._parse_metadata_from_event(end_event)
556
557        metadata = (
558            extracted_metadata if extracted_output != extracted_metadata else None
559        )
560
561        name = start_event.event_type.value
562
563        # Update name to the actual tool's name used by openai agent if available
564        if (
565            name == "function_call"
566            and start_event.payload
567            and start_event.payload.get("tool", None)
568        ):
569            tool_name = start_event.payload.get("tool", name)
570            name = (
571                tool_name
572                if isinstance(tool_name, str)
573                else (
574                    tool_name.name
575                    if hasattr(tool_name, "name")
576                    else tool_name.__class__.__name__
577                )
578            )
579
580        span = parent.span(
581            id=event_id,
582            trace_id=trace_id,
583            start_time=start_event.time,
584            name=name,
585            version=self.version,
586            session_id=self.session_id,
587            input=extracted_input,
588            output=extracted_output,
589            metadata=metadata,
590        )
591
592        if end_event:
593            span.end(end_time=end_event.time)
594
595        return span
596
597    def _update_trace_data(self, trace_map):
598        context_root_value = context_root.get()
599        if context_root_value and not self.update_stateful_client:
600            return
601
602        child_event_ids = trace_map.get(BASE_TRACE_EVENT, [])
603        if not child_event_ids:
604            return
605
606        event_pair = self.event_map.get(child_event_ids[0])
607        if not event_pair or len(event_pair) < 2:
608            return
609
610        start_event, end_event = event_pair
611        input = self._parse_input_from_event(start_event)
612        output = self._parse_output_from_event(end_event)
613
614        if input or output:
615            if context_root_value and self.update_stateful_client:
616                context_root_value.update(input=input, output=output)
617            else:
618                self.trace.update(input=input, output=output)
619
620    def _parse_input_from_event(self, event: CallbackEvent):
621        if event.payload is None:
622            return
623
624        payload = event.payload.copy()
625
626        if EventPayload.SERIALIZED in payload:
627            # Always pop Serialized from payload as it may contain LLM api keys
628            payload.pop(EventPayload.SERIALIZED)
629
630        if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload:
631            chunks = payload.get(EventPayload.CHUNKS)
632            return {"num_chunks": len(chunks)}
633
634        if (
635            event.event_type == CBEventType.NODE_PARSING
636            and EventPayload.DOCUMENTS in payload
637        ):
638            documents = payload.pop(EventPayload.DOCUMENTS)
639            payload["documents"] = [doc.metadata for doc in documents]
640            return payload
641
642        for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]:
643            if key in payload:
644                return payload.get(key)
645
646        return payload or None
647
648    def _parse_output_from_event(self, event: CallbackEvent):
649        if event.payload is None:
650            return
651
652        payload = event.payload.copy()
653
654        if EventPayload.SERIALIZED in payload:
655            # Always pop Serialized from payload as it may contain LLM api keys
656            payload.pop(EventPayload.SERIALIZED)
657
658        if (
659            event.event_type == CBEventType.EMBEDDING
660            and EventPayload.EMBEDDINGS in payload
661        ):
662            embeddings = payload.get(EventPayload.EMBEDDINGS)
663            return {"num_embeddings": len(embeddings)}
664
665        if (
666            event.event_type == CBEventType.NODE_PARSING
667            and EventPayload.NODES in payload
668        ):
669            nodes = payload.pop(EventPayload.NODES)
670            payload["num_nodes"] = len(nodes)
671            return payload
672
673        if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload:
674            chunks = payload.pop(EventPayload.CHUNKS)
675            payload["num_chunks"] = len(chunks)
676
677        if EventPayload.COMPLETION in payload:
678            return payload.get(EventPayload.COMPLETION)
679
680        if EventPayload.RESPONSE in payload:
681            response = payload.get(EventPayload.RESPONSE)
682
683            # Skip streaming responses as consuming them would block the user's execution path
684            if "Streaming" in type(response).__name__:
685                return None
686
687            if hasattr(response, "response"):
688                return response.response
689
690            if hasattr(response, "message"):
691                output = dict(response.message)
692                if "additional_kwargs" in output:
693                    if "tool_calls" in output["additional_kwargs"]:
694                        output["tool_calls"] = output["additional_kwargs"]["tool_calls"]
695
696                    del output["additional_kwargs"]
697
698                return output
699
700        return payload or None
701
702    def _parse_metadata_from_event(self, event: CallbackEvent):
703        if event.payload is None:
704            return
705
706        metadata = {}
707
708        for key in event.payload.keys():
709            if key not in [
710                EventPayload.MESSAGES,
711                EventPayload.QUERY_STR,
712                EventPayload.PROMPT,
713                EventPayload.COMPLETION,
714                EventPayload.SERIALIZED,
715                "additional_kwargs",
716            ]:
717                if key != EventPayload.RESPONSE:
718                    metadata[key] = event.payload[key]
719                else:
720                    response = event.payload.get(EventPayload.RESPONSE)
721
722                    if "Streaming" in type(response).__name__:
723                        continue
724
725                    for res_key, value in vars(response).items():
726                        if (
727                            not res_key.startswith("_")
728                            and res_key
729                            not in [
730                                "response",
731                                "response_txt",
732                                "message",
733                                "additional_kwargs",
734                                "delta",
735                                "raw",
736                            ]
737                            and not isinstance(value, Generator)
738                        ):
739                            metadata[res_key] = value
740
741        return metadata or None

LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future.

LlamaIndexCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, version: Optional[str] = None, tags: Optional[List[str]] = None, metadata: Optional[Any] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, event_starts_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, event_ends_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, tokenizer: Optional[Callable[[str], list]] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, sample_rate: Optional[float] = None)
 65    def __init__(
 66        self,
 67        *,
 68        public_key: Optional[str] = None,
 69        secret_key: Optional[str] = None,
 70        host: Optional[str] = None,
 71        debug: bool = False,
 72        session_id: Optional[str] = None,
 73        user_id: Optional[str] = None,
 74        trace_name: Optional[str] = None,
 75        release: Optional[str] = None,
 76        version: Optional[str] = None,
 77        tags: Optional[List[str]] = None,
 78        metadata: Optional[Any] = None,
 79        threads: Optional[int] = None,
 80        flush_at: Optional[int] = None,
 81        flush_interval: Optional[int] = None,
 82        max_retries: Optional[int] = None,
 83        timeout: Optional[int] = None,
 84        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 85        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 86        tokenizer: Optional[Callable[[str], list]] = None,
 87        enabled: Optional[bool] = None,
 88        httpx_client: Optional[httpx.Client] = None,
 89        sdk_integration: Optional[str] = None,
 90        sample_rate: Optional[float] = None,
 91    ) -> None:
 92        LlamaIndexBaseCallbackHandler.__init__(
 93            self,
 94            event_starts_to_ignore=event_starts_to_ignore or [],
 95            event_ends_to_ignore=event_ends_to_ignore or [],
 96        )
 97        LangfuseBaseCallbackHandler.__init__(
 98            self,
 99            public_key=public_key,
100            secret_key=secret_key,
101            host=host,
102            debug=debug,
103            session_id=session_id,
104            user_id=user_id,
105            trace_name=trace_name,
106            release=release,
107            version=version,
108            tags=tags,
109            metadata=metadata,
110            threads=threads,
111            flush_at=flush_at,
112            flush_interval=flush_interval,
113            max_retries=max_retries,
114            timeout=timeout,
115            enabled=enabled,
116            httpx_client=httpx_client,
117            sdk_integration=sdk_integration or "llama-index_callback",
118            sample_rate=sample_rate,
119        )
120
121        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
122        self._llama_index_trace_name: Optional[str] = None
123        self._token_counter = TokenCounter(tokenizer)
124
125        # For stream-chat, the last LLM end_event arrives after the trace has ended
126        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
127        self._orphaned_LLM_generations: Dict[
128            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
129        ] = {}

Initialize the base callback handler.

log = <Logger langfuse (WARNING)>
event_map: Dict[str, List[langfuse.llama_index.utils.CallbackEvent]]
def set_root( self, root: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType], *, update_root: bool = False) -> None:
131    def set_root(
132        self,
133        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
134        *,
135        update_root: bool = False,
136    ) -> None:
137        """Set the root trace or span for the callback handler.
138
139        Args:
140            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
141                be used for all following operations.
142
143        Keyword Args:
144            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
145
146        Returns:
147            None
148        """
149        context_root.set(root)
150
151        if root is None:
152            self.trace = None
153            self.root_span = None
154            self._task_manager = self.langfuse.task_manager if self.langfuse else None
155
156            return
157
158        if isinstance(root, StatefulTraceClient):
159            self.trace = root
160
161        elif isinstance(root, StatefulSpanClient):
162            self.root_span = root
163            self.trace = StatefulTraceClient(
164                root.client,
165                root.trace_id,
166                StateType.TRACE,
167                root.trace_id,
168                root.task_manager,
169            )
170
171        self._task_manager = root.task_manager
172        self.update_stateful_client = update_root

Set the root trace or span for the callback handler.

Arguments:
  • root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:

update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.

Returns:

None

def set_trace_params( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
174    def set_trace_params(
175        self,
176        name: Optional[str] = None,
177        user_id: Optional[str] = None,
178        session_id: Optional[str] = None,
179        version: Optional[str] = None,
180        release: Optional[str] = None,
181        metadata: Optional[Any] = None,
182        tags: Optional[List[str]] = None,
183        public: Optional[bool] = None,
184    ):
185        """Set the trace params that will be used for all following operations.
186
187        Allows setting params of subsequent traces at any point in the code.
188        Overwrites the default params set in the callback constructor.
189
190        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
191
192        Attributes:
193            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
194            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
195            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
196            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
197            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
198            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
199            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
200
201
202        Returns:
203            None
204        """
205        context_trace_metadata.set(
206            {
207                "name": name,
208                "user_id": user_id,
209                "session_id": session_id,
210                "version": version,
211                "release": release,
212                "metadata": metadata,
213                "tags": tags,
214                "public": public,
215            }
216        )

Set the trace params that will be used for all following operations.

Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.

Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.

Attributes:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
  • public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:

None

def start_trace(self, trace_id: Optional[str] = None) -> None:
218    def start_trace(self, trace_id: Optional[str] = None) -> None:
219        """Run when an overall trace is launched."""
220        self._llama_index_trace_name = trace_id

Run when an overall trace is launched.

def end_trace( self, trace_id: Optional[str] = None, trace_map: Optional[Dict[str, List[str]]] = None) -> None:
222    def end_trace(
223        self,
224        trace_id: Optional[str] = None,
225        trace_map: Optional[Dict[str, List[str]]] = None,
226    ) -> None:
227        """Run when an overall trace is exited."""
228        if not trace_map:
229            self.log.debug("No events in trace map to create the observation tree.")
230            return
231
232        # Generate Langfuse observations after trace has ended and full trace_map is available.
233        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
234        # Timestamps remain accurate as they are set at the time of the event.
235        self._create_observations_from_trace_map(
236            event_id=BASE_TRACE_EVENT, trace_map=trace_map
237        )
238        self._update_trace_data(trace_map=trace_map)

Run when an overall trace is exited.

def on_event_start( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', parent_id: str = '', **kwargs: Any) -> str:
240    def on_event_start(
241        self,
242        event_type: CBEventType,
243        payload: Optional[Dict[str, Any]] = None,
244        event_id: str = "",
245        parent_id: str = "",
246        **kwargs: Any,
247    ) -> str:
248        """Run when an event starts and return id of event."""
249        start_event = CallbackEvent(
250            event_id=event_id, event_type=event_type, payload=payload
251        )
252        self.event_map[event_id].append(start_event)
253
254        return event_id

Run when an event starts and return id of event.

def on_event_end( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', **kwargs: Any) -> None:
256    def on_event_end(
257        self,
258        event_type: CBEventType,
259        payload: Optional[Dict[str, Any]] = None,
260        event_id: str = "",
261        **kwargs: Any,
262    ) -> None:
263        """Run when an event ends."""
264        end_event = CallbackEvent(
265            event_id=event_id, event_type=event_type, payload=payload
266        )
267        self.event_map[event_id].append(end_event)
268
269        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
270            generation, trace = self._orphaned_LLM_generations[event_id]
271            self._handle_orphaned_LLM_end_event(
272                end_event, generation=generation, trace=trace
273            )
274            del self._orphaned_LLM_generations[event_id]

Run when an event ends.

Inherited Members
llama_index.core.callbacks.base_handler.BaseCallbackHandler
event_starts_to_ignore
event_ends_to_ignore
langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
trace
get_trace_id
get_trace_url
flush
auth_check
class LlamaIndexInstrumentor:
 27class LlamaIndexInstrumentor:
 28    """[BETA] Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.
 29
 30    This beta integration is currently under active development and subject to change.
 31    Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931
 32
 33    For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).
 34
 35    Usage:
 36        instrumentor = LlamaIndexInstrumentor()
 37        instrumentor.start()
 38
 39        # After calling start(), all LlamaIndex executions will be automatically traced
 40
 41        # To trace a specific execution or set custom trace ID/params, use the context manager:
 42        with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"):
 43            # Your LlamaIndex code here
 44            index = get_llama_index_index()
 45            response = index.as_query_engine().query("Your query here")
 46
 47        instrumentor.flush()
 48
 49    The instrumentor will automatically capture and log events and spans from LlamaIndex
 50    to Langfuse, providing detailed observability into your LLM application.
 51
 52    Args:
 53        public_key (Optional[str]): Langfuse public key
 54        secret_key (Optional[str]): Langfuse secret key
 55        host (Optional[str]): Langfuse API host
 56        debug (Optional[bool]): Enable debug logging
 57        threads (Optional[int]): Number of threads for async operations
 58        flush_at (Optional[int]): Number of items to flush at
 59        flush_interval (Optional[int]): Flush interval in seconds
 60        max_retries (Optional[int]): Maximum number of retries for failed requests
 61        timeout (Optional[int]): Timeout for requests in seconds
 62        httpx_client (Optional[httpx.Client]): Custom HTTPX client
 63        enabled (Optional[bool]): Enable/disable the instrumentor
 64        sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
 65    """
 66
 67    def __init__(
 68        self,
 69        *,
 70        public_key: Optional[str] = None,
 71        secret_key: Optional[str] = None,
 72        host: Optional[str] = None,
 73        debug: Optional[bool] = None,
 74        threads: Optional[int] = None,
 75        flush_at: Optional[int] = None,
 76        flush_interval: Optional[int] = None,
 77        max_retries: Optional[int] = None,
 78        timeout: Optional[int] = None,
 79        httpx_client: Optional[httpx.Client] = None,
 80        enabled: Optional[bool] = None,
 81        sample_rate: Optional[float] = None,
 82    ):
 83        self._langfuse = LangfuseSingleton().get(
 84            public_key=public_key,
 85            secret_key=secret_key,
 86            host=host,
 87            debug=debug,
 88            threads=threads,
 89            flush_at=flush_at,
 90            flush_interval=flush_interval,
 91            max_retries=max_retries,
 92            timeout=timeout,
 93            httpx_client=httpx_client,
 94            enabled=enabled,
 95            sample_rate=sample_rate,
 96            sdk_integration="llama-index_instrumentation",
 97        )
 98        self._observation_updates = {}
 99        self._span_handler = LlamaIndexSpanHandler(
100            langfuse_client=self._langfuse,
101            observation_updates=self._observation_updates,
102        )
103        self._event_handler = LlamaIndexEventHandler(
104            langfuse_client=self._langfuse,
105            observation_updates=self._observation_updates,
106        )
107        self._context = InstrumentorContext()
108
109    def start(self):
110        """Start the automatic tracing of LlamaIndex operations.
111
112        Once called, all subsequent LlamaIndex executions will be automatically traced
113        and logged to Langfuse without any additional code changes required.
114
115        Example:
116            ```python
117            instrumentor = LlamaIndexInstrumentor()
118            instrumentor.start()
119
120            # From this point, all LlamaIndex operations are automatically traced
121            index = VectorStoreIndex.from_documents(documents)
122            query_engine = index.as_query_engine()
123            response = query_engine.query("What is the capital of France?")
124
125            # The above operations will be automatically logged to Langfuse
126            instrumentor.flush()
127            ```
128        """
129        self._context.reset()
130        dispatcher = get_dispatcher()
131
132        # Span Handler
133        if not any(
134            isinstance(handler, type(self._span_handler))
135            for handler in dispatcher.span_handlers
136        ):
137            dispatcher.add_span_handler(self._span_handler)
138
139        # Event Handler
140        if not any(
141            isinstance(handler, type(self._event_handler))
142            for handler in dispatcher.event_handlers
143        ):
144            dispatcher.add_event_handler(self._event_handler)
145
146    def stop(self):
147        """Stop the automatic tracing of LlamaIndex operations.
148
149        This method removes the span and event handlers from the LlamaIndex dispatcher,
150        effectively stopping the automatic tracing and logging to Langfuse.
151
152        After calling this method, LlamaIndex operations will no longer be automatically
153        traced unless `start()` is called again.
154
155        Example:
156            ```python
157            instrumentor = LlamaIndexInstrumentor()
158            instrumentor.start()
159
160            # LlamaIndex operations are automatically traced here
161
162            instrumentor.stop()
163
164            # LlamaIndex operations are no longer automatically traced
165            ```
166        """
167        self._context.reset()
168        dispatcher = get_dispatcher()
169
170        # Span Handler, in-place filter
171        dispatcher.span_handlers[:] = filter(
172            lambda h: not isinstance(h, type(self._span_handler)),
173            dispatcher.span_handlers,
174        )
175
176        # Event Handler, in-place filter
177        dispatcher.event_handlers[:] = filter(
178            lambda h: not isinstance(h, type(self._event_handler)),
179            dispatcher.event_handlers,
180        )
181
182    @contextmanager
183    def observe(
184        self,
185        *,
186        trace_id: Optional[str] = None,
187        parent_observation_id: Optional[str] = None,
188        update_parent: Optional[bool] = None,
189        trace_name: Optional[str] = None,
190        user_id: Optional[str] = None,
191        session_id: Optional[str] = None,
192        version: Optional[str] = None,
193        release: Optional[str] = None,
194        metadata: Optional[Dict[str, Any]] = None,
195        tags: Optional[List[str]] = None,
196        public: Optional[bool] = None,
197    ):
198        """Access context manager for observing and tracing LlamaIndex operations.
199
200        This method allows you to wrap LlamaIndex operations in a context that
201        automatically traces and logs them to Langfuse. It provides fine-grained
202        control over the trace properties and ensures proper instrumentation.
203
204        Args:
205            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
206            parent_observation_id (Optional[str]): ID of the parent observation, if any.
207            update_parent (Optional[bool]): Whether to update the parent trace.
208            trace_name (Optional[str]): Name of the trace.
209            user_id (Optional[str]): ID of the user associated with this trace.
210            session_id (Optional[str]): ID of the session associated with this trace.
211            version (Optional[str]): Version information for this trace.
212            release (Optional[str]): Release information for this trace.
213            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
214            tags (Optional[List[str]]): Tags associated with this trace.
215            public (Optional[bool]): Whether this trace should be public.
216
217        Yields:
218            StatefulTraceClient: A client for interacting with the current trace.
219
220        Example:
221            ```python
222            instrumentor = LlamaIndexInstrumentor()
223
224            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
225                # LlamaIndex operations here will be traced
226                index.as_query_engine().query("What is the capital of France?")
227
228            # Tracing stops after the context manager exits
229
230            instrumentor.flush()
231            ```
232
233        Note:
234            If the instrumentor is not already started, this method will start it
235            for the duration of the context and stop it afterwards.
236        """
237        was_instrumented = self._is_instrumented
238
239        if not was_instrumented:
240            self.start()
241
242        if parent_observation_id is not None and trace_id is None:
243            logger.warning(
244                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
245            )
246            parent_observation_id = None
247
248        final_trace_id = trace_id or str(uuid.uuid4())
249
250        self._context.update(
251            is_user_managed_trace=True,
252            trace_id=final_trace_id,
253            parent_observation_id=parent_observation_id,
254            update_parent=update_parent,
255            trace_name=trace_name,
256            user_id=user_id,
257            session_id=session_id,
258            version=version,
259            release=release,
260            metadata=metadata,
261            tags=tags,
262            public=public,
263        )
264
265        yield self._get_trace_client(final_trace_id)
266
267        self._context.reset()
268
269        if not was_instrumented:
270            self.stop()
271
272    @property
273    def _is_instrumented(self) -> bool:
274        """Check if the dispatcher is instrumented."""
275        dispatcher = get_dispatcher()
276
277        return any(
278            isinstance(handler, type(self._span_handler))
279            for handler in dispatcher.span_handlers
280        ) and any(
281            isinstance(handler, type(self._event_handler))
282            for handler in dispatcher.event_handlers
283        )
284
285    def _get_trace_client(self, trace_id: str) -> StatefulTraceClient:
286        return StatefulTraceClient(
287            client=self._langfuse.client,
288            id=trace_id,
289            trace_id=trace_id,
290            task_manager=self._langfuse.task_manager,
291            state_type=StateType.TRACE,
292        )
293
294    @property
295    def client_instance(self) -> Langfuse:
296        """Return the Langfuse client instance associated with this instrumentor.
297
298        This property provides access to the underlying Langfuse client, allowing
299        direct interaction with Langfuse functionality if needed.
300
301        Returns:
302            Langfuse: The Langfuse client instance.
303        """
304        return self._langfuse
305
306    def flush(self) -> None:
307        """Flush any pending tasks in the task manager.
308
309        This method ensures that all queued tasks are sent to Langfuse immediately.
310        It's useful for scenarios where you want to guarantee that all instrumentation
311        data has been transmitted before your application terminates or moves on to
312        a different phase.
313
314        Note:
315            This method is a wrapper around the `flush()` method of the underlying
316            Langfuse client instance. It's provided here for convenience and to maintain
317            a consistent interface within the instrumentor.
318
319        Example:
320            ```python
321            instrumentor = LlamaIndexInstrumentor(langfuse_client)
322            # ... perform some operations ...
323            instrumentor.flush()  # Ensure all data is sent to Langfuse
324            ```
325        """
326        self.client_instance.flush()

[BETA] Instrumentor for exporting LlamaIndex instrumentation module spans to Langfuse.

This beta integration is currently under active development and subject to change. Please provide feedback to the Langfuse team: https://github.com/langfuse/langfuse/issues/1931

For production setups, please use the existing callback-based integration (LlamaIndexCallbackHandler).

Usage:

instrumentor = LlamaIndexInstrumentor() instrumentor.start()

After calling start(), all LlamaIndex executions will be automatically traced

To trace a specific execution or set custom trace ID/params, use the context manager:

with instrumentor.observe(trace_id="unique_trace_id", user_id="user123"): # Your LlamaIndex code here index = get_llama_index_index() response = index.as_query_engine().query("Your query here")

instrumentor.flush()

The instrumentor will automatically capture and log events and spans from LlamaIndex to Langfuse, providing detailed observability into your LLM application.

Arguments:
  • public_key (Optional[str]): Langfuse public key
  • secret_key (Optional[str]): Langfuse secret key
  • host (Optional[str]): Langfuse API host
  • debug (Optional[bool]): Enable debug logging
  • threads (Optional[int]): Number of threads for async operations
  • flush_at (Optional[int]): Number of items to flush at
  • flush_interval (Optional[int]): Flush interval in seconds
  • max_retries (Optional[int]): Maximum number of retries for failed requests
  • timeout (Optional[int]): Timeout for requests in seconds
  • httpx_client (Optional[httpx.Client]): Custom HTTPX client
  • enabled (Optional[bool]): Enable/disable the instrumentor
  • sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
LlamaIndexInstrumentor( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None)
 67    def __init__(
 68        self,
 69        *,
 70        public_key: Optional[str] = None,
 71        secret_key: Optional[str] = None,
 72        host: Optional[str] = None,
 73        debug: Optional[bool] = None,
 74        threads: Optional[int] = None,
 75        flush_at: Optional[int] = None,
 76        flush_interval: Optional[int] = None,
 77        max_retries: Optional[int] = None,
 78        timeout: Optional[int] = None,
 79        httpx_client: Optional[httpx.Client] = None,
 80        enabled: Optional[bool] = None,
 81        sample_rate: Optional[float] = None,
 82    ):
 83        self._langfuse = LangfuseSingleton().get(
 84            public_key=public_key,
 85            secret_key=secret_key,
 86            host=host,
 87            debug=debug,
 88            threads=threads,
 89            flush_at=flush_at,
 90            flush_interval=flush_interval,
 91            max_retries=max_retries,
 92            timeout=timeout,
 93            httpx_client=httpx_client,
 94            enabled=enabled,
 95            sample_rate=sample_rate,
 96            sdk_integration="llama-index_instrumentation",
 97        )
 98        self._observation_updates = {}
 99        self._span_handler = LlamaIndexSpanHandler(
100            langfuse_client=self._langfuse,
101            observation_updates=self._observation_updates,
102        )
103        self._event_handler = LlamaIndexEventHandler(
104            langfuse_client=self._langfuse,
105            observation_updates=self._observation_updates,
106        )
107        self._context = InstrumentorContext()
def start(self):
109    def start(self):
110        """Start the automatic tracing of LlamaIndex operations.
111
112        Once called, all subsequent LlamaIndex executions will be automatically traced
113        and logged to Langfuse without any additional code changes required.
114
115        Example:
116            ```python
117            instrumentor = LlamaIndexInstrumentor()
118            instrumentor.start()
119
120            # From this point, all LlamaIndex operations are automatically traced
121            index = VectorStoreIndex.from_documents(documents)
122            query_engine = index.as_query_engine()
123            response = query_engine.query("What is the capital of France?")
124
125            # The above operations will be automatically logged to Langfuse
126            instrumentor.flush()
127            ```
128        """
129        self._context.reset()
130        dispatcher = get_dispatcher()
131
132        # Span Handler
133        if not any(
134            isinstance(handler, type(self._span_handler))
135            for handler in dispatcher.span_handlers
136        ):
137            dispatcher.add_span_handler(self._span_handler)
138
139        # Event Handler
140        if not any(
141            isinstance(handler, type(self._event_handler))
142            for handler in dispatcher.event_handlers
143        ):
144            dispatcher.add_event_handler(self._event_handler)

Start the automatic tracing of LlamaIndex operations.

Once called, all subsequent LlamaIndex executions will be automatically traced and logged to Langfuse without any additional code changes required.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# From this point, all LlamaIndex operations are automatically traced
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
response = query_engine.query("What is the capital of France?")

# The above operations will be automatically logged to Langfuse
instrumentor.flush()
def stop(self):
146    def stop(self):
147        """Stop the automatic tracing of LlamaIndex operations.
148
149        This method removes the span and event handlers from the LlamaIndex dispatcher,
150        effectively stopping the automatic tracing and logging to Langfuse.
151
152        After calling this method, LlamaIndex operations will no longer be automatically
153        traced unless `start()` is called again.
154
155        Example:
156            ```python
157            instrumentor = LlamaIndexInstrumentor()
158            instrumentor.start()
159
160            # LlamaIndex operations are automatically traced here
161
162            instrumentor.stop()
163
164            # LlamaIndex operations are no longer automatically traced
165            ```
166        """
167        self._context.reset()
168        dispatcher = get_dispatcher()
169
170        # Span Handler, in-place filter
171        dispatcher.span_handlers[:] = filter(
172            lambda h: not isinstance(h, type(self._span_handler)),
173            dispatcher.span_handlers,
174        )
175
176        # Event Handler, in-place filter
177        dispatcher.event_handlers[:] = filter(
178            lambda h: not isinstance(h, type(self._event_handler)),
179            dispatcher.event_handlers,
180        )

Stop the automatic tracing of LlamaIndex operations.

This method removes the span and event handlers from the LlamaIndex dispatcher, effectively stopping the automatic tracing and logging to Langfuse.

After calling this method, LlamaIndex operations will no longer be automatically traced unless start() is called again.

Example:
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()

# LlamaIndex operations are automatically traced here

instrumentor.stop()

# LlamaIndex operations are no longer automatically traced
@contextmanager
def observe( self, *, trace_id: Optional[str] = None, parent_observation_id: Optional[str] = None, update_parent: Optional[bool] = None, trace_name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
182    @contextmanager
183    def observe(
184        self,
185        *,
186        trace_id: Optional[str] = None,
187        parent_observation_id: Optional[str] = None,
188        update_parent: Optional[bool] = None,
189        trace_name: Optional[str] = None,
190        user_id: Optional[str] = None,
191        session_id: Optional[str] = None,
192        version: Optional[str] = None,
193        release: Optional[str] = None,
194        metadata: Optional[Dict[str, Any]] = None,
195        tags: Optional[List[str]] = None,
196        public: Optional[bool] = None,
197    ):
198        """Access context manager for observing and tracing LlamaIndex operations.
199
200        This method allows you to wrap LlamaIndex operations in a context that
201        automatically traces and logs them to Langfuse. It provides fine-grained
202        control over the trace properties and ensures proper instrumentation.
203
204        Args:
205            trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
206            parent_observation_id (Optional[str]): ID of the parent observation, if any.
207            update_parent (Optional[bool]): Whether to update the parent trace.
208            trace_name (Optional[str]): Name of the trace.
209            user_id (Optional[str]): ID of the user associated with this trace.
210            session_id (Optional[str]): ID of the session associated with this trace.
211            version (Optional[str]): Version information for this trace.
212            release (Optional[str]): Release information for this trace.
213            metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
214            tags (Optional[List[str]]): Tags associated with this trace.
215            public (Optional[bool]): Whether this trace should be public.
216
217        Yields:
218            StatefulTraceClient: A client for interacting with the current trace.
219
220        Example:
221            ```python
222            instrumentor = LlamaIndexInstrumentor()
223
224            with instrumentor.observe(trace_id="unique_id", user_id="user123"):
225                # LlamaIndex operations here will be traced
226                index.as_query_engine().query("What is the capital of France?")
227
228            # Tracing stops after the context manager exits
229
230            instrumentor.flush()
231            ```
232
233        Note:
234            If the instrumentor is not already started, this method will start it
235            for the duration of the context and stop it afterwards.
236        """
237        was_instrumented = self._is_instrumented
238
239        if not was_instrumented:
240            self.start()
241
242        if parent_observation_id is not None and trace_id is None:
243            logger.warning(
244                "trace_id must be provided if parent_observation_id is provided. Ignoring parent_observation_id."
245            )
246            parent_observation_id = None
247
248        final_trace_id = trace_id or str(uuid.uuid4())
249
250        self._context.update(
251            is_user_managed_trace=True,
252            trace_id=final_trace_id,
253            parent_observation_id=parent_observation_id,
254            update_parent=update_parent,
255            trace_name=trace_name,
256            user_id=user_id,
257            session_id=session_id,
258            version=version,
259            release=release,
260            metadata=metadata,
261            tags=tags,
262            public=public,
263        )
264
265        yield self._get_trace_client(final_trace_id)
266
267        self._context.reset()
268
269        if not was_instrumented:
270            self.stop()

Access context manager for observing and tracing LlamaIndex operations.

This method allows you to wrap LlamaIndex operations in a context that automatically traces and logs them to Langfuse. It provides fine-grained control over the trace properties and ensures proper instrumentation.

Arguments:
  • trace_id (Optional[str]): Unique identifier for the trace. If not provided, a UUID will be generated.
  • parent_observation_id (Optional[str]): ID of the parent observation, if any.
  • update_parent (Optional[bool]): Whether to update the parent trace.
  • trace_name (Optional[str]): Name of the trace.
  • user_id (Optional[str]): ID of the user associated with this trace.
  • session_id (Optional[str]): ID of the session associated with this trace.
  • version (Optional[str]): Version information for this trace.
  • release (Optional[str]): Release information for this trace.
  • metadata (Optional[Dict[str, Any]]): Additional metadata for the trace.
  • tags (Optional[List[str]]): Tags associated with this trace.
  • public (Optional[bool]): Whether this trace should be public.
Yields:

StatefulTraceClient: A client for interacting with the current trace.

Example:
instrumentor = LlamaIndexInstrumentor()

with instrumentor.observe(trace_id="unique_id", user_id="user123"):
    # LlamaIndex operations here will be traced
    index.as_query_engine().query("What is the capital of France?")

# Tracing stops after the context manager exits

instrumentor.flush()
Note:

If the instrumentor is not already started, this method will start it for the duration of the context and stop it afterwards.

client_instance: langfuse.client.Langfuse
294    @property
295    def client_instance(self) -> Langfuse:
296        """Return the Langfuse client instance associated with this instrumentor.
297
298        This property provides access to the underlying Langfuse client, allowing
299        direct interaction with Langfuse functionality if needed.
300
301        Returns:
302            Langfuse: The Langfuse client instance.
303        """
304        return self._langfuse

Return the Langfuse client instance associated with this instrumentor.

This property provides access to the underlying Langfuse client, allowing direct interaction with Langfuse functionality if needed.

Returns:

Langfuse: The Langfuse client instance.

def flush(self) -> None:
306    def flush(self) -> None:
307        """Flush any pending tasks in the task manager.
308
309        This method ensures that all queued tasks are sent to Langfuse immediately.
310        It's useful for scenarios where you want to guarantee that all instrumentation
311        data has been transmitted before your application terminates or moves on to
312        a different phase.
313
314        Note:
315            This method is a wrapper around the `flush()` method of the underlying
316            Langfuse client instance. It's provided here for convenience and to maintain
317            a consistent interface within the instrumentor.
318
319        Example:
320            ```python
321            instrumentor = LlamaIndexInstrumentor(langfuse_client)
322            # ... perform some operations ...
323            instrumentor.flush()  # Ensure all data is sent to Langfuse
324            ```
325        """
326        self.client_instance.flush()

Flush any pending tasks in the task manager.

This method ensures that all queued tasks are sent to Langfuse immediately. It's useful for scenarios where you want to guarantee that all instrumentation data has been transmitted before your application terminates or moves on to a different phase.

Note:

This method is a wrapper around the flush() method of the underlying Langfuse client instance. It's provided here for convenience and to maintain a consistent interface within the instrumentor.

Example:
instrumentor = LlamaIndexInstrumentor(langfuse_client)
# ... perform some operations ...
instrumentor.flush()  # Ensure all data is sent to Langfuse