langfuse.llama_index

1from .llama_index import LlamaIndexCallbackHandler
2
3__all__ = ["LlamaIndexCallbackHandler"]
@auto_decorate_methods_with(catch_and_log_errors, exclude=['__init__'])
class LlamaIndexCallbackHandler(llama_index.core.callbacks.base_handler.BaseCallbackHandler, langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler):
 56@auto_decorate_methods_with(catch_and_log_errors, exclude=["__init__"])
 57class LlamaIndexCallbackHandler(
 58    LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler
 59):
 60    """LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future."""
 61
 62    log = logging.getLogger("langfuse")
 63
 64    def __init__(
 65        self,
 66        *,
 67        public_key: Optional[str] = None,
 68        secret_key: Optional[str] = None,
 69        host: Optional[str] = None,
 70        debug: bool = False,
 71        session_id: Optional[str] = None,
 72        user_id: Optional[str] = None,
 73        trace_name: Optional[str] = None,
 74        release: Optional[str] = None,
 75        version: Optional[str] = None,
 76        tags: Optional[List[str]] = None,
 77        metadata: Optional[Any] = None,
 78        threads: Optional[int] = None,
 79        flush_at: Optional[int] = None,
 80        flush_interval: Optional[int] = None,
 81        max_retries: Optional[int] = None,
 82        timeout: Optional[int] = None,
 83        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 84        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 85        tokenizer: Optional[Callable[[str], list]] = None,
 86        enabled: Optional[bool] = None,
 87        httpx_client: Optional[httpx.Client] = None,
 88        sdk_integration: Optional[str] = None,
 89    ) -> None:
 90        LlamaIndexBaseCallbackHandler.__init__(
 91            self,
 92            event_starts_to_ignore=event_starts_to_ignore or [],
 93            event_ends_to_ignore=event_ends_to_ignore or [],
 94        )
 95        LangfuseBaseCallbackHandler.__init__(
 96            self,
 97            public_key=public_key,
 98            secret_key=secret_key,
 99            host=host,
100            debug=debug,
101            session_id=session_id,
102            user_id=user_id,
103            trace_name=trace_name,
104            release=release,
105            version=version,
106            tags=tags,
107            metadata=metadata,
108            threads=threads,
109            flush_at=flush_at,
110            flush_interval=flush_interval,
111            max_retries=max_retries,
112            timeout=timeout,
113            enabled=enabled,
114            httpx_client=httpx_client,
115            sdk_integration=sdk_integration or "llama-index_callback",
116        )
117
118        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
119        self._llama_index_trace_name: Optional[str] = None
120        self._token_counter = TokenCounter(tokenizer)
121
122        # For stream-chat, the last LLM end_event arrives after the trace has ended
123        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
124        self._orphaned_LLM_generations: Dict[
125            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
126        ] = {}
127
128    def set_root(
129        self,
130        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
131        *,
132        update_root: bool = False,
133    ) -> None:
134        """Set the root trace or span for the callback handler.
135
136        Args:
137            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
138                be used for all following operations.
139
140        Keyword Args:
141            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
142
143        Returns:
144            None
145        """
146        context_root.set(root)
147
148        if root is None:
149            self.trace = None
150            self.root_span = None
151            self._task_manager = self.langfuse.task_manager if self.langfuse else None
152
153            return
154
155        if isinstance(root, StatefulTraceClient):
156            self.trace = root
157
158        elif isinstance(root, StatefulSpanClient):
159            self.root_span = root
160            self.trace = StatefulTraceClient(
161                root.client,
162                root.trace_id,
163                StateType.TRACE,
164                root.trace_id,
165                root.task_manager,
166            )
167
168        self._task_manager = root.task_manager
169        self.update_stateful_client = update_root
170
171    def set_trace_params(
172        self,
173        name: Optional[str] = None,
174        user_id: Optional[str] = None,
175        session_id: Optional[str] = None,
176        version: Optional[str] = None,
177        release: Optional[str] = None,
178        metadata: Optional[Any] = None,
179        tags: Optional[List[str]] = None,
180        public: Optional[bool] = None,
181    ):
182        """Set the trace params that will be used for all following operations.
183
184        Allows setting params of subsequent traces at any point in the code.
185        Overwrites the default params set in the callback constructor.
186
187        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
188
189        Attributes:
190            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
191            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
192            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
193            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
194            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
195            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
196            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
197
198
199        Returns:
200            None
201        """
202        context_trace_metadata.set(
203            {
204                "name": name,
205                "user_id": user_id,
206                "session_id": session_id,
207                "version": version,
208                "release": release,
209                "metadata": metadata,
210                "tags": tags,
211                "public": public,
212            }
213        )
214
215    def start_trace(self, trace_id: Optional[str] = None) -> None:
216        """Run when an overall trace is launched."""
217        self._llama_index_trace_name = trace_id
218
219    def end_trace(
220        self,
221        trace_id: Optional[str] = None,
222        trace_map: Optional[Dict[str, List[str]]] = None,
223    ) -> None:
224        """Run when an overall trace is exited."""
225        if not trace_map:
226            self.log.debug("No events in trace map to create the observation tree.")
227            return
228
229        # Generate Langfuse observations after trace has ended and full trace_map is available.
230        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
231        # Timestamps remain accurate as they are set at the time of the event.
232        self._create_observations_from_trace_map(
233            event_id=BASE_TRACE_EVENT, trace_map=trace_map
234        )
235        self._update_trace_data(trace_map=trace_map)
236
237    def on_event_start(
238        self,
239        event_type: CBEventType,
240        payload: Optional[Dict[str, Any]] = None,
241        event_id: str = "",
242        parent_id: str = "",
243        **kwargs: Any,
244    ) -> str:
245        """Run when an event starts and return id of event."""
246        start_event = CallbackEvent(
247            event_id=event_id, event_type=event_type, payload=payload
248        )
249        self.event_map[event_id].append(start_event)
250
251        return event_id
252
253    def on_event_end(
254        self,
255        event_type: CBEventType,
256        payload: Optional[Dict[str, Any]] = None,
257        event_id: str = "",
258        **kwargs: Any,
259    ) -> None:
260        """Run when an event ends."""
261        end_event = CallbackEvent(
262            event_id=event_id, event_type=event_type, payload=payload
263        )
264        self.event_map[event_id].append(end_event)
265
266        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
267            generation, trace = self._orphaned_LLM_generations[event_id]
268            self._handle_orphaned_LLM_end_event(
269                end_event, generation=generation, trace=trace
270            )
271            del self._orphaned_LLM_generations[event_id]
272
273    def _create_observations_from_trace_map(
274        self,
275        event_id: str,
276        trace_map: Dict[str, List[str]],
277        parent: Optional[
278            Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient]
279        ] = None,
280    ) -> None:
281        """Recursively create langfuse observations based on the trace_map."""
282        if event_id != BASE_TRACE_EVENT and not self.event_map.get(event_id):
283            return
284
285        if event_id == BASE_TRACE_EVENT:
286            observation = self._get_root_observation()
287        else:
288            observation = self._create_observation(
289                event_id=event_id, parent=parent, trace_id=self.trace.id
290            )
291
292        for child_event_id in trace_map.get(event_id, []):
293            self._create_observations_from_trace_map(
294                event_id=child_event_id, parent=observation, trace_map=trace_map
295            )
296
297    def _get_root_observation(self) -> Union[StatefulTraceClient, StatefulSpanClient]:
298        user_provided_root = context_root.get()
299
300        # Get trace metadata from contextvars or use default values
301        trace_metadata = context_trace_metadata.get()
302        name = (
303            trace_metadata["name"]
304            or self.trace_name
305            or f"LlamaIndex_{self._llama_index_trace_name}"
306        )
307        version = trace_metadata["version"] or self.version
308        release = trace_metadata["release"] or self.release
309        session_id = trace_metadata["session_id"] or self.session_id
310        user_id = trace_metadata["user_id"] or self.user_id
311        metadata = trace_metadata["metadata"] or self.metadata
312        tags = trace_metadata["tags"] or self.tags
313        public = trace_metadata["public"] or None
314
315        # Make sure that if a user-provided root is set, it has been set in the same trace
316        # and it's not a root from a different trace
317        if (
318            user_provided_root is not None
319            and self.trace
320            and self.trace.id == user_provided_root.trace_id
321        ):
322            if self.update_stateful_client:
323                user_provided_root.update(
324                    name=name,
325                    version=version,
326                    session_id=session_id,
327                    user_id=user_id,
328                    metadata=metadata,
329                    tags=tags,
330                    release=release,
331                    public=public,
332                )
333
334            return user_provided_root
335
336        else:
337            self.trace = self.langfuse.trace(
338                id=str(uuid4()),
339                name=name,
340                version=version,
341                session_id=session_id,
342                user_id=user_id,
343                metadata=metadata,
344                tags=tags,
345                release=release,
346                public=public,
347            )
348
349            return self.trace
350
351    def _create_observation(
352        self,
353        event_id: str,
354        parent: Union[
355            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
356        ],
357        trace_id: str,
358    ) -> Union[StatefulSpanClient, StatefulGenerationClient]:
359        event_type = self.event_map[event_id][0].event_type
360
361        if event_type == CBEventType.LLM:
362            return self._handle_LLM_events(event_id, parent, trace_id)
363        elif event_type == CBEventType.EMBEDDING:
364            return self._handle_embedding_events(event_id, parent, trace_id)
365        else:
366            return self._handle_span_events(event_id, parent, trace_id)
367
368    def _handle_LLM_events(
369        self,
370        event_id: str,
371        parent: Union[
372            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
373        ],
374        trace_id: str,
375    ) -> StatefulGenerationClient:
376        events = self.event_map[event_id]
377        start_event, end_event = events[0], events[-1]
378
379        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
380            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
381            name = serialized.get("class_name", "LLM")
382            temperature = serialized.get("temperature", None)
383            max_tokens = serialized.get("max_tokens", None)
384            timeout = serialized.get("timeout", None)
385
386        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
387        parsed_metadata = self._parse_metadata_from_event(end_event)
388
389        generation = parent.generation(
390            id=event_id,
391            trace_id=trace_id,
392            version=self.version,
393            name=name,
394            start_time=start_event.time,
395            metadata=parsed_metadata,
396            model_parameters={
397                "temperature": temperature,
398                "max_tokens": max_tokens,
399                "request_timeout": timeout,
400            },
401            **parsed_end_payload,
402        )
403
404        # Register orphaned LLM event (only start event, no end event) to be later upserted with the correct trace_id
405        if len(events) == 1:
406            self._orphaned_LLM_generations[event_id] = (generation, self.trace)
407
408        return generation
409
410    def _handle_orphaned_LLM_end_event(
411        self,
412        end_event: CallbackEvent,
413        generation: StatefulGenerationClient,
414        trace: StatefulTraceClient,
415    ) -> None:
416        parsed_end_payload = self._parse_LLM_end_event_payload(end_event)
417
418        generation.update(
419            **parsed_end_payload,
420        )
421
422        if generation.trace_id != trace.id:
423            raise ValueError(
424                f"Generation trace_id {generation.trace_id} does not match trace.id {trace.id}"
425            )
426
427        trace.update(output=parsed_end_payload["output"])
428
429    def _parse_LLM_end_event_payload(
430        self, end_event: CallbackEvent
431    ) -> ParsedLLMEndPayload:
432        result: ParsedLLMEndPayload = {
433            "input": None,
434            "output": None,
435            "usage": None,
436            "model": None,
437            "end_time": end_event.time,
438        }
439
440        if not end_event.payload:
441            return result
442
443        result["input"] = self._parse_input_from_event(end_event)
444        result["output"] = self._parse_output_from_event(end_event)
445        result["model"], result["usage"] = self._parse_usage_from_event_payload(
446            end_event.payload
447        )
448
449        return result
450
451    def _parse_usage_from_event_payload(self, event_payload: Dict):
452        model = usage = None
453
454        if not (
455            EventPayload.MESSAGES in event_payload
456            and EventPayload.RESPONSE in event_payload
457        ):
458            return model, usage
459
460        response = event_payload.get(EventPayload.RESPONSE)
461
462        if hasattr(response, "raw") and response.raw is not None:
463            model = response.raw.get("model", None)
464            token_usage = response.raw.get("usage", {})
465
466            if token_usage:
467                usage = {
468                    "input": getattr(token_usage, "prompt_tokens", None),
469                    "output": getattr(token_usage, "completion_tokens", None),
470                    "total": getattr(token_usage, "total_tokens", None),
471                }
472
473        return model, usage
474
475    def _handle_embedding_events(
476        self,
477        event_id: str,
478        parent: Union[
479            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
480        ],
481        trace_id: str,
482    ) -> StatefulGenerationClient:
483        events = self.event_map[event_id]
484        start_event, end_event = events[0], events[-1]
485
486        if start_event.payload and EventPayload.SERIALIZED in start_event.payload:
487            serialized = start_event.payload.get(EventPayload.SERIALIZED, {})
488            name = serialized.get("class_name", "Embedding")
489            model = serialized.get("model_name", None)
490            timeout = serialized.get("timeout", None)
491
492        if end_event.payload:
493            chunks = end_event.payload.get(EventPayload.CHUNKS, [])
494            token_count = sum(
495                self._token_counter.get_string_tokens(chunk) for chunk in chunks
496            )
497
498            usage = {
499                "input": 0,
500                "output": 0,
501                "total": token_count or None,
502            }
503
504        input = self._parse_input_from_event(end_event)
505        output = self._parse_output_from_event(end_event)
506
507        generation = parent.generation(
508            id=event_id,
509            trace_id=trace_id,
510            name=name,
511            start_time=start_event.time,
512            end_time=end_event.time,
513            version=self.version,
514            model=model,
515            input=input,
516            output=output,
517            usage=usage or None,
518            model_parameters={
519                "request_timeout": timeout,
520            },
521        )
522
523        return generation
524
525    def _handle_span_events(
526        self,
527        event_id: str,
528        parent: Union[
529            StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient
530        ],
531        trace_id: str,
532    ) -> StatefulSpanClient:
533        start_event, end_event = self.event_map[event_id]
534
535        extracted_input = self._parse_input_from_event(start_event)
536        extracted_output = self._parse_output_from_event(end_event)
537        extracted_metadata = self._parse_metadata_from_event(end_event)
538
539        metadata = (
540            extracted_metadata if extracted_output != extracted_metadata else None
541        )
542
543        span = parent.span(
544            id=event_id,
545            trace_id=trace_id,
546            start_time=start_event.time,
547            name=start_event.event_type.value,
548            version=self.version,
549            session_id=self.session_id,
550            input=extracted_input,
551            output=extracted_output,
552            metadata=metadata,
553        )
554
555        if end_event:
556            span.end(end_time=end_event.time)
557
558        return span
559
560    def _update_trace_data(self, trace_map):
561        context_root_value = context_root.get()
562        if context_root_value and not self.update_stateful_client:
563            return
564
565        child_event_ids = trace_map.get(BASE_TRACE_EVENT, [])
566        if not child_event_ids:
567            return
568
569        event_pair = self.event_map.get(child_event_ids[0])
570        if not event_pair or len(event_pair) < 2:
571            return
572
573        start_event, end_event = event_pair
574        input = self._parse_input_from_event(start_event)
575        output = self._parse_output_from_event(end_event)
576
577        if input or output:
578            if context_root_value and self.update_stateful_client:
579                context_root_value.update(input=input, output=output)
580            else:
581                self.trace.update(input=input, output=output)
582
583    def _parse_input_from_event(self, event: CallbackEvent):
584        if event.payload is None:
585            return
586
587        payload = event.payload.copy()
588
589        if EventPayload.SERIALIZED in payload:
590            # Always pop Serialized from payload as it may contain LLM api keys
591            payload.pop(EventPayload.SERIALIZED)
592
593        if event.event_type == CBEventType.EMBEDDING and EventPayload.CHUNKS in payload:
594            chunks = payload.get(EventPayload.CHUNKS)
595            return {"num_chunks": len(chunks)}
596
597        if (
598            event.event_type == CBEventType.NODE_PARSING
599            and EventPayload.DOCUMENTS in payload
600        ):
601            documents = payload.pop(EventPayload.DOCUMENTS)
602            payload["documents"] = [doc.metadata for doc in documents]
603            return payload
604
605        for key in [EventPayload.MESSAGES, EventPayload.QUERY_STR, EventPayload.PROMPT]:
606            if key in payload:
607                return payload.get(key)
608
609        return payload or None
610
611    def _parse_output_from_event(self, event: CallbackEvent):
612        if event.payload is None:
613            return
614
615        payload = event.payload.copy()
616
617        if EventPayload.SERIALIZED in payload:
618            # Always pop Serialized from payload as it may contain LLM api keys
619            payload.pop(EventPayload.SERIALIZED)
620
621        if (
622            event.event_type == CBEventType.EMBEDDING
623            and EventPayload.EMBEDDINGS in payload
624        ):
625            embeddings = payload.get(EventPayload.EMBEDDINGS)
626            return {"num_embeddings": len(embeddings)}
627
628        if (
629            event.event_type == CBEventType.NODE_PARSING
630            and EventPayload.NODES in payload
631        ):
632            nodes = payload.pop(EventPayload.NODES)
633            payload["num_nodes"] = len(nodes)
634            return payload
635
636        if event.event_type == CBEventType.CHUNKING and EventPayload.CHUNKS in payload:
637            chunks = payload.pop(EventPayload.CHUNKS)
638            payload["num_chunks"] = len(chunks)
639
640        if EventPayload.COMPLETION in payload:
641            return payload.get(EventPayload.COMPLETION)
642
643        if EventPayload.RESPONSE in payload:
644            response = payload.get(EventPayload.RESPONSE)
645
646            # Skip streaming responses as consuming them would block the user's execution path
647            if "Streaming" in type(response).__name__:
648                return None
649
650            if hasattr(response, "response"):
651                return response.response
652
653            if hasattr(response, "message"):
654                output = dict(response.message)
655                if "additional_kwargs" in output:
656                    if "tool_calls" in output["additional_kwargs"]:
657                        output["tool_calls"] = output["additional_kwargs"]["tool_calls"]
658
659                    del output["additional_kwargs"]
660
661                return output
662
663        return payload or None
664
665    def _parse_metadata_from_event(self, event: CallbackEvent):
666        if event.payload is None:
667            return
668
669        metadata = {}
670
671        for key in event.payload.keys():
672            if key not in [
673                EventPayload.MESSAGES,
674                EventPayload.QUERY_STR,
675                EventPayload.PROMPT,
676                EventPayload.COMPLETION,
677                EventPayload.SERIALIZED,
678                "additional_kwargs",
679            ]:
680                if key != EventPayload.RESPONSE:
681                    metadata[key] = event.payload[key]
682                else:
683                    response = event.payload.get(EventPayload.RESPONSE)
684
685                    if "Streaming" in type(response).__name__:
686                        continue
687
688                    for res_key, value in vars(response).items():
689                        if (
690                            not res_key.startswith("_")
691                            and res_key
692                            not in [
693                                "response",
694                                "response_txt",
695                                "message",
696                                "additional_kwargs",
697                                "delta",
698                                "raw",
699                            ]
700                            and not isinstance(value, Generator)
701                        ):
702                            metadata[res_key] = value
703
704        return metadata or None

LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future.

LlamaIndexCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, version: Optional[str] = None, tags: Optional[List[str]] = None, metadata: Optional[Any] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, event_starts_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, event_ends_to_ignore: Optional[List[llama_index.core.callbacks.schema.CBEventType]] = None, tokenizer: Optional[Callable[[str], list]] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None)
 64    def __init__(
 65        self,
 66        *,
 67        public_key: Optional[str] = None,
 68        secret_key: Optional[str] = None,
 69        host: Optional[str] = None,
 70        debug: bool = False,
 71        session_id: Optional[str] = None,
 72        user_id: Optional[str] = None,
 73        trace_name: Optional[str] = None,
 74        release: Optional[str] = None,
 75        version: Optional[str] = None,
 76        tags: Optional[List[str]] = None,
 77        metadata: Optional[Any] = None,
 78        threads: Optional[int] = None,
 79        flush_at: Optional[int] = None,
 80        flush_interval: Optional[int] = None,
 81        max_retries: Optional[int] = None,
 82        timeout: Optional[int] = None,
 83        event_starts_to_ignore: Optional[List[CBEventType]] = None,
 84        event_ends_to_ignore: Optional[List[CBEventType]] = None,
 85        tokenizer: Optional[Callable[[str], list]] = None,
 86        enabled: Optional[bool] = None,
 87        httpx_client: Optional[httpx.Client] = None,
 88        sdk_integration: Optional[str] = None,
 89    ) -> None:
 90        LlamaIndexBaseCallbackHandler.__init__(
 91            self,
 92            event_starts_to_ignore=event_starts_to_ignore or [],
 93            event_ends_to_ignore=event_ends_to_ignore or [],
 94        )
 95        LangfuseBaseCallbackHandler.__init__(
 96            self,
 97            public_key=public_key,
 98            secret_key=secret_key,
 99            host=host,
100            debug=debug,
101            session_id=session_id,
102            user_id=user_id,
103            trace_name=trace_name,
104            release=release,
105            version=version,
106            tags=tags,
107            metadata=metadata,
108            threads=threads,
109            flush_at=flush_at,
110            flush_interval=flush_interval,
111            max_retries=max_retries,
112            timeout=timeout,
113            enabled=enabled,
114            httpx_client=httpx_client,
115            sdk_integration=sdk_integration or "llama-index_callback",
116        )
117
118        self.event_map: Dict[str, List[CallbackEvent]] = defaultdict(list)
119        self._llama_index_trace_name: Optional[str] = None
120        self._token_counter = TokenCounter(tokenizer)
121
122        # For stream-chat, the last LLM end_event arrives after the trace has ended
123        # Keep track of these orphans to upsert them with the correct trace_id after the trace has ended
124        self._orphaned_LLM_generations: Dict[
125            str, Tuple[StatefulGenerationClient, StatefulTraceClient]
126        ] = {}

Initialize the base callback handler.

log = <Logger langfuse (WARNING)>
event_map: Dict[str, List[langfuse.llama_index.utils.CallbackEvent]]
def set_root( self, root: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType], *, update_root: bool = False) -> None:
128    def set_root(
129        self,
130        root: Optional[Union[StatefulTraceClient, StatefulSpanClient]],
131        *,
132        update_root: bool = False,
133    ) -> None:
134        """Set the root trace or span for the callback handler.
135
136        Args:
137            root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to
138                be used for all following operations.
139
140        Keyword Args:
141            update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.
142
143        Returns:
144            None
145        """
146        context_root.set(root)
147
148        if root is None:
149            self.trace = None
150            self.root_span = None
151            self._task_manager = self.langfuse.task_manager if self.langfuse else None
152
153            return
154
155        if isinstance(root, StatefulTraceClient):
156            self.trace = root
157
158        elif isinstance(root, StatefulSpanClient):
159            self.root_span = root
160            self.trace = StatefulTraceClient(
161                root.client,
162                root.trace_id,
163                StateType.TRACE,
164                root.trace_id,
165                root.task_manager,
166            )
167
168        self._task_manager = root.task_manager
169        self.update_stateful_client = update_root

Set the root trace or span for the callback handler.

Arguments:
  • root (Optional[Union[StatefulTraceClient, StatefulSpanClient]]): The root trace or observation to be used for all following operations.
Keyword Args:

update_root (bool): If True, the root trace or observation will be updated with the outcome of the LlamaIndex run.

Returns:

None

def set_trace_params( self, name: Optional[str] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, version: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, public: Optional[bool] = None):
171    def set_trace_params(
172        self,
173        name: Optional[str] = None,
174        user_id: Optional[str] = None,
175        session_id: Optional[str] = None,
176        version: Optional[str] = None,
177        release: Optional[str] = None,
178        metadata: Optional[Any] = None,
179        tags: Optional[List[str]] = None,
180        public: Optional[bool] = None,
181    ):
182        """Set the trace params that will be used for all following operations.
183
184        Allows setting params of subsequent traces at any point in the code.
185        Overwrites the default params set in the callback constructor.
186
187        Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.
188
189        Attributes:
190            name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
191            user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
192            session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
193            version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
194            metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
195            tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
196            public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
197
198
199        Returns:
200            None
201        """
202        context_trace_metadata.set(
203            {
204                "name": name,
205                "user_id": user_id,
206                "session_id": session_id,
207                "version": version,
208                "release": release,
209                "metadata": metadata,
210                "tags": tags,
211                "public": public,
212            }
213        )

Set the trace params that will be used for all following operations.

Allows setting params of subsequent traces at any point in the code. Overwrites the default params set in the callback constructor.

Attention: If a root trace or span is set on the callback handler, those trace params will be used and NOT those set through this method.

Attributes:
  • name (Optional[str]): Identifier of the trace. Useful for sorting/filtering in the UI.
  • user_id (Optional[str]): The id of the user that triggered the execution. Used to provide user-level analytics.
  • session_id (Optional[str]): Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
  • version (Optional[str]): The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
  • metadata (Optional[Any]): Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
  • tags (Optional[List[str]]): Tags are used to categorize or label traces. Traces can be filtered by tags in the Langfuse UI and GET API.
  • public (Optional[bool]): You can make a trace public to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
Returns:

None

def start_trace(self, trace_id: Optional[str] = None) -> None:
215    def start_trace(self, trace_id: Optional[str] = None) -> None:
216        """Run when an overall trace is launched."""
217        self._llama_index_trace_name = trace_id

Run when an overall trace is launched.

def end_trace( self, trace_id: Optional[str] = None, trace_map: Optional[Dict[str, List[str]]] = None) -> None:
219    def end_trace(
220        self,
221        trace_id: Optional[str] = None,
222        trace_map: Optional[Dict[str, List[str]]] = None,
223    ) -> None:
224        """Run when an overall trace is exited."""
225        if not trace_map:
226            self.log.debug("No events in trace map to create the observation tree.")
227            return
228
229        # Generate Langfuse observations after trace has ended and full trace_map is available.
230        # For long-running traces this leads to events only being sent to Langfuse after the trace has ended.
231        # Timestamps remain accurate as they are set at the time of the event.
232        self._create_observations_from_trace_map(
233            event_id=BASE_TRACE_EVENT, trace_map=trace_map
234        )
235        self._update_trace_data(trace_map=trace_map)

Run when an overall trace is exited.

def on_event_start( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', parent_id: str = '', **kwargs: Any) -> str:
237    def on_event_start(
238        self,
239        event_type: CBEventType,
240        payload: Optional[Dict[str, Any]] = None,
241        event_id: str = "",
242        parent_id: str = "",
243        **kwargs: Any,
244    ) -> str:
245        """Run when an event starts and return id of event."""
246        start_event = CallbackEvent(
247            event_id=event_id, event_type=event_type, payload=payload
248        )
249        self.event_map[event_id].append(start_event)
250
251        return event_id

Run when an event starts and return id of event.

def on_event_end( self, event_type: llama_index.core.callbacks.schema.CBEventType, payload: Optional[Dict[str, Any]] = None, event_id: str = '', **kwargs: Any) -> None:
253    def on_event_end(
254        self,
255        event_type: CBEventType,
256        payload: Optional[Dict[str, Any]] = None,
257        event_id: str = "",
258        **kwargs: Any,
259    ) -> None:
260        """Run when an event ends."""
261        end_event = CallbackEvent(
262            event_id=event_id, event_type=event_type, payload=payload
263        )
264        self.event_map[event_id].append(end_event)
265
266        if event_type == CBEventType.LLM and event_id in self._orphaned_LLM_generations:
267            generation, trace = self._orphaned_LLM_generations[event_id]
268            self._handle_orphaned_LLM_end_event(
269                end_event, generation=generation, trace=trace
270            )
271            del self._orphaned_LLM_generations[event_id]

Run when an event ends.

Inherited Members
llama_index.core.callbacks.base_handler.BaseCallbackHandler
event_starts_to_ignore
event_ends_to_ignore
langfuse.utils.base_callback_handler.LangfuseBaseCallbackHandler
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
trace
get_trace_id
get_trace_url
flush
auth_check