langfuse.utils.base_callback_handler

  1from typing import Optional, Union, List, Any, Callable
  2import httpx
  3import logging
  4import os
  5import warnings
  6
  7from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType
  8
  9
 10class LangfuseBaseCallbackHandler:
 11    log = logging.getLogger("langfuse")
 12
 13    def __init__(
 14        self,
 15        *,
 16        public_key: Optional[str] = None,
 17        secret_key: Optional[str] = None,
 18        host: Optional[str] = None,
 19        debug: bool = False,
 20        stateful_client: Optional[
 21            Union[StatefulTraceClient, StatefulSpanClient]
 22        ] = None,
 23        update_stateful_client: bool = False,
 24        version: Optional[str] = None,
 25        session_id: Optional[str] = None,
 26        user_id: Optional[str] = None,
 27        trace_name: Optional[str] = None,
 28        release: Optional[str] = None,
 29        metadata: Optional[Any] = None,
 30        tags: Optional[List[str]] = None,
 31        threads: Optional[int] = None,
 32        flush_at: Optional[int] = None,
 33        flush_interval: Optional[int] = None,
 34        max_retries: Optional[int] = None,
 35        timeout: Optional[int] = None,
 36        enabled: Optional[bool] = None,
 37        httpx_client: Optional[httpx.Client] = None,
 38        sdk_integration: str,
 39        sample_rate: Optional[float] = None,
 40        mask: Optional[Callable] = None,
 41    ) -> None:
 42        self.version = version
 43        self.session_id = session_id
 44        self.user_id = user_id
 45        self.trace_name = trace_name
 46        self.release = release
 47        self.metadata = metadata
 48        self.tags = tags
 49
 50        self.root_span = None
 51        self.update_stateful_client = update_stateful_client
 52        self.langfuse = None
 53
 54        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 55        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 56        prio_host = host or os.environ.get(
 57            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 58        )
 59
 60        prio_sample_rate = (
 61            sample_rate
 62            if sample_rate is not None
 63            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 64        )
 65
 66        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 67            self.trace = stateful_client
 68            self._task_manager = stateful_client.task_manager
 69
 70            return
 71
 72        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 73            self.root_span = stateful_client
 74            self.trace = StatefulTraceClient(
 75                stateful_client.client,
 76                stateful_client.trace_id,
 77                StateType.TRACE,
 78                stateful_client.trace_id,
 79                stateful_client.task_manager,
 80            )
 81            self._task_manager = stateful_client.task_manager
 82
 83            return
 84
 85        args = {
 86            "public_key": prio_public_key,
 87            "secret_key": prio_secret_key,
 88            "host": prio_host,
 89            "debug": debug,
 90        }
 91
 92        if release is not None:
 93            args["release"] = release
 94        if threads is not None:
 95            args["threads"] = threads
 96        if flush_at is not None:
 97            args["flush_at"] = flush_at
 98        if flush_interval is not None:
 99            args["flush_interval"] = flush_interval
100        if max_retries is not None:
101            args["max_retries"] = max_retries
102        if timeout is not None:
103            args["timeout"] = timeout
104        if enabled is not None:
105            args["enabled"] = enabled
106        if httpx_client is not None:
107            args["httpx_client"] = httpx_client
108        if prio_sample_rate is not None:
109            args["sample_rate"] = prio_sample_rate
110        if mask is not None:
111            args["mask"] = mask
112
113        args["sdk_integration"] = sdk_integration
114
115        self.langfuse = Langfuse(**args)
116        self.trace: Optional[StatefulTraceClient] = None
117        self._task_manager = self.langfuse.task_manager
118
119    def get_trace_id(self):
120        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
121        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
122
123        Returns:
124            The ID of the current/last trace or None if no trace is available.
125        """
126        warnings.warn(
127            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
128            DeprecationWarning,
129        )
130        return self.trace.id if self.trace else None
131
132    def get_trace_url(self):
133        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
134        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
135
136        Returns:
137            The URL of the current/last trace or None if no trace is available.
138        """
139        warnings.warn(
140            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
141            DeprecationWarning,
142        )
143        return self.trace.get_trace_url() if self.trace else None
144
145    def flush(self):
146        if self.trace is not None:
147            self.trace.task_manager.flush()
148        elif self.root_span is not None:
149            self.root_span.task_manager.flush()
150        else:
151            self.log.debug("There was no trace yet, hence no flushing possible.")
152
153    def auth_check(self):
154        if self.langfuse is not None:
155            return self.langfuse.auth_check()
156        elif self.trace is not None:
157            projects = self.trace.client.projects.get()
158            if len(projects.data) == 0:
159                raise Exception("No projects found for the keys.")
160            return True
161        elif self.root_span is not None:
162            projects = self.root_span.client.projects.get()
163            if len(projects) == 0:
164                raise Exception("No projects found for the keys.")
165            return True
166
167        return False
class LangfuseBaseCallbackHandler:
 11class LangfuseBaseCallbackHandler:
 12    log = logging.getLogger("langfuse")
 13
 14    def __init__(
 15        self,
 16        *,
 17        public_key: Optional[str] = None,
 18        secret_key: Optional[str] = None,
 19        host: Optional[str] = None,
 20        debug: bool = False,
 21        stateful_client: Optional[
 22            Union[StatefulTraceClient, StatefulSpanClient]
 23        ] = None,
 24        update_stateful_client: bool = False,
 25        version: Optional[str] = None,
 26        session_id: Optional[str] = None,
 27        user_id: Optional[str] = None,
 28        trace_name: Optional[str] = None,
 29        release: Optional[str] = None,
 30        metadata: Optional[Any] = None,
 31        tags: Optional[List[str]] = None,
 32        threads: Optional[int] = None,
 33        flush_at: Optional[int] = None,
 34        flush_interval: Optional[int] = None,
 35        max_retries: Optional[int] = None,
 36        timeout: Optional[int] = None,
 37        enabled: Optional[bool] = None,
 38        httpx_client: Optional[httpx.Client] = None,
 39        sdk_integration: str,
 40        sample_rate: Optional[float] = None,
 41        mask: Optional[Callable] = None,
 42    ) -> None:
 43        self.version = version
 44        self.session_id = session_id
 45        self.user_id = user_id
 46        self.trace_name = trace_name
 47        self.release = release
 48        self.metadata = metadata
 49        self.tags = tags
 50
 51        self.root_span = None
 52        self.update_stateful_client = update_stateful_client
 53        self.langfuse = None
 54
 55        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 56        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 57        prio_host = host or os.environ.get(
 58            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 59        )
 60
 61        prio_sample_rate = (
 62            sample_rate
 63            if sample_rate is not None
 64            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 65        )
 66
 67        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 68            self.trace = stateful_client
 69            self._task_manager = stateful_client.task_manager
 70
 71            return
 72
 73        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 74            self.root_span = stateful_client
 75            self.trace = StatefulTraceClient(
 76                stateful_client.client,
 77                stateful_client.trace_id,
 78                StateType.TRACE,
 79                stateful_client.trace_id,
 80                stateful_client.task_manager,
 81            )
 82            self._task_manager = stateful_client.task_manager
 83
 84            return
 85
 86        args = {
 87            "public_key": prio_public_key,
 88            "secret_key": prio_secret_key,
 89            "host": prio_host,
 90            "debug": debug,
 91        }
 92
 93        if release is not None:
 94            args["release"] = release
 95        if threads is not None:
 96            args["threads"] = threads
 97        if flush_at is not None:
 98            args["flush_at"] = flush_at
 99        if flush_interval is not None:
100            args["flush_interval"] = flush_interval
101        if max_retries is not None:
102            args["max_retries"] = max_retries
103        if timeout is not None:
104            args["timeout"] = timeout
105        if enabled is not None:
106            args["enabled"] = enabled
107        if httpx_client is not None:
108            args["httpx_client"] = httpx_client
109        if prio_sample_rate is not None:
110            args["sample_rate"] = prio_sample_rate
111        if mask is not None:
112            args["mask"] = mask
113
114        args["sdk_integration"] = sdk_integration
115
116        self.langfuse = Langfuse(**args)
117        self.trace: Optional[StatefulTraceClient] = None
118        self._task_manager = self.langfuse.task_manager
119
120    def get_trace_id(self):
121        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
122        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
123
124        Returns:
125            The ID of the current/last trace or None if no trace is available.
126        """
127        warnings.warn(
128            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
129            DeprecationWarning,
130        )
131        return self.trace.id if self.trace else None
132
133    def get_trace_url(self):
134        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
135        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
136
137        Returns:
138            The URL of the current/last trace or None if no trace is available.
139        """
140        warnings.warn(
141            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
142            DeprecationWarning,
143        )
144        return self.trace.get_trace_url() if self.trace else None
145
146    def flush(self):
147        if self.trace is not None:
148            self.trace.task_manager.flush()
149        elif self.root_span is not None:
150            self.root_span.task_manager.flush()
151        else:
152            self.log.debug("There was no trace yet, hence no flushing possible.")
153
154    def auth_check(self):
155        if self.langfuse is not None:
156            return self.langfuse.auth_check()
157        elif self.trace is not None:
158            projects = self.trace.client.projects.get()
159            if len(projects.data) == 0:
160                raise Exception("No projects found for the keys.")
161            return True
162        elif self.root_span is not None:
163            projects = self.root_span.client.projects.get()
164            if len(projects) == 0:
165                raise Exception("No projects found for the keys.")
166            return True
167
168        return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None, mask: Optional[Callable] = None)
 14    def __init__(
 15        self,
 16        *,
 17        public_key: Optional[str] = None,
 18        secret_key: Optional[str] = None,
 19        host: Optional[str] = None,
 20        debug: bool = False,
 21        stateful_client: Optional[
 22            Union[StatefulTraceClient, StatefulSpanClient]
 23        ] = None,
 24        update_stateful_client: bool = False,
 25        version: Optional[str] = None,
 26        session_id: Optional[str] = None,
 27        user_id: Optional[str] = None,
 28        trace_name: Optional[str] = None,
 29        release: Optional[str] = None,
 30        metadata: Optional[Any] = None,
 31        tags: Optional[List[str]] = None,
 32        threads: Optional[int] = None,
 33        flush_at: Optional[int] = None,
 34        flush_interval: Optional[int] = None,
 35        max_retries: Optional[int] = None,
 36        timeout: Optional[int] = None,
 37        enabled: Optional[bool] = None,
 38        httpx_client: Optional[httpx.Client] = None,
 39        sdk_integration: str,
 40        sample_rate: Optional[float] = None,
 41        mask: Optional[Callable] = None,
 42    ) -> None:
 43        self.version = version
 44        self.session_id = session_id
 45        self.user_id = user_id
 46        self.trace_name = trace_name
 47        self.release = release
 48        self.metadata = metadata
 49        self.tags = tags
 50
 51        self.root_span = None
 52        self.update_stateful_client = update_stateful_client
 53        self.langfuse = None
 54
 55        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 56        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 57        prio_host = host or os.environ.get(
 58            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 59        )
 60
 61        prio_sample_rate = (
 62            sample_rate
 63            if sample_rate is not None
 64            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 65        )
 66
 67        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 68            self.trace = stateful_client
 69            self._task_manager = stateful_client.task_manager
 70
 71            return
 72
 73        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 74            self.root_span = stateful_client
 75            self.trace = StatefulTraceClient(
 76                stateful_client.client,
 77                stateful_client.trace_id,
 78                StateType.TRACE,
 79                stateful_client.trace_id,
 80                stateful_client.task_manager,
 81            )
 82            self._task_manager = stateful_client.task_manager
 83
 84            return
 85
 86        args = {
 87            "public_key": prio_public_key,
 88            "secret_key": prio_secret_key,
 89            "host": prio_host,
 90            "debug": debug,
 91        }
 92
 93        if release is not None:
 94            args["release"] = release
 95        if threads is not None:
 96            args["threads"] = threads
 97        if flush_at is not None:
 98            args["flush_at"] = flush_at
 99        if flush_interval is not None:
100            args["flush_interval"] = flush_interval
101        if max_retries is not None:
102            args["max_retries"] = max_retries
103        if timeout is not None:
104            args["timeout"] = timeout
105        if enabled is not None:
106            args["enabled"] = enabled
107        if httpx_client is not None:
108            args["httpx_client"] = httpx_client
109        if prio_sample_rate is not None:
110            args["sample_rate"] = prio_sample_rate
111        if mask is not None:
112            args["mask"] = mask
113
114        args["sdk_integration"] = sdk_integration
115
116        self.langfuse = Langfuse(**args)
117        self.trace: Optional[StatefulTraceClient] = None
118        self._task_manager = self.langfuse.task_manager
log = <Logger langfuse (WARNING)>
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
def get_trace_id(self):
120    def get_trace_id(self):
121        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
122        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
123
124        Returns:
125            The ID of the current/last trace or None if no trace is available.
126        """
127        warnings.warn(
128            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
129            DeprecationWarning,
130        )
131        return self.trace.id if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.

Returns:

The ID of the current/last trace or None if no trace is available.

def get_trace_url(self):
133    def get_trace_url(self):
134        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
135        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
136
137        Returns:
138            The URL of the current/last trace or None if no trace is available.
139        """
140        warnings.warn(
141            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
142            DeprecationWarning,
143        )
144        return self.trace.get_trace_url() if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.

Returns:

The URL of the current/last trace or None if no trace is available.

def flush(self):
146    def flush(self):
147        if self.trace is not None:
148            self.trace.task_manager.flush()
149        elif self.root_span is not None:
150            self.root_span.task_manager.flush()
151        else:
152            self.log.debug("There was no trace yet, hence no flushing possible.")
def auth_check(self):
154    def auth_check(self):
155        if self.langfuse is not None:
156            return self.langfuse.auth_check()
157        elif self.trace is not None:
158            projects = self.trace.client.projects.get()
159            if len(projects.data) == 0:
160                raise Exception("No projects found for the keys.")
161            return True
162        elif self.root_span is not None:
163            projects = self.root_span.client.projects.get()
164            if len(projects) == 0:
165                raise Exception("No projects found for the keys.")
166            return True
167
168        return False