langfuse.utils.base_callback_handler

  1import logging
  2import os
  3import warnings
  4from typing import Any, Callable, List, Optional, Union
  5
  6import httpx
  7
  8from langfuse.client import Langfuse, StatefulSpanClient, StatefulTraceClient, StateType
  9
 10
 11class LangfuseBaseCallbackHandler:
 12    log = logging.getLogger("langfuse")
 13
 14    def __init__(
 15        self,
 16        *,
 17        public_key: Optional[str] = None,
 18        secret_key: Optional[str] = None,
 19        host: Optional[str] = None,
 20        debug: bool = False,
 21        stateful_client: Optional[
 22            Union[StatefulTraceClient, StatefulSpanClient]
 23        ] = None,
 24        update_stateful_client: bool = False,
 25        version: Optional[str] = None,
 26        session_id: Optional[str] = None,
 27        user_id: Optional[str] = None,
 28        trace_name: Optional[str] = None,
 29        release: Optional[str] = None,
 30        metadata: Optional[Any] = None,
 31        tags: Optional[List[str]] = None,
 32        threads: Optional[int] = None,
 33        flush_at: Optional[int] = None,
 34        flush_interval: Optional[int] = None,
 35        max_retries: Optional[int] = None,
 36        timeout: Optional[int] = None,
 37        enabled: Optional[bool] = None,
 38        httpx_client: Optional[httpx.Client] = None,
 39        sdk_integration: str,
 40        sample_rate: Optional[float] = None,
 41        mask: Optional[Callable] = None,
 42        environment: Optional[str] = None,
 43    ) -> None:
 44        self.version = version
 45        self.session_id = session_id
 46        self.user_id = user_id
 47        self.trace_name = trace_name
 48        self.release = release
 49        self.metadata = metadata
 50        self.tags = tags
 51
 52        self.root_span = None
 53        self.update_stateful_client = update_stateful_client
 54        self.langfuse = None
 55
 56        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 57        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 58        prio_host = host or os.environ.get(
 59            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 60        )
 61
 62        prio_sample_rate = (
 63            sample_rate
 64            if sample_rate is not None
 65            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 66        )
 67
 68        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 69            self.trace = stateful_client
 70            self._task_manager = stateful_client.task_manager
 71
 72            return
 73
 74        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 75            self.root_span = stateful_client
 76            self.trace = StatefulTraceClient(
 77                stateful_client.client,
 78                stateful_client.trace_id,
 79                StateType.TRACE,
 80                stateful_client.trace_id,
 81                stateful_client.task_manager,
 82            )
 83            self._task_manager = stateful_client.task_manager
 84
 85            return
 86
 87        args = {
 88            "public_key": prio_public_key,
 89            "secret_key": prio_secret_key,
 90            "host": prio_host,
 91            "debug": debug,
 92        }
 93
 94        if release is not None:
 95            args["release"] = release
 96        if threads is not None:
 97            args["threads"] = threads
 98        if flush_at is not None:
 99            args["flush_at"] = flush_at
100        if flush_interval is not None:
101            args["flush_interval"] = flush_interval
102        if max_retries is not None:
103            args["max_retries"] = max_retries
104        if timeout is not None:
105            args["timeout"] = timeout
106        if enabled is not None:
107            args["enabled"] = enabled
108        if httpx_client is not None:
109            args["httpx_client"] = httpx_client
110        if prio_sample_rate is not None:
111            args["sample_rate"] = prio_sample_rate
112        if mask is not None:
113            args["mask"] = mask
114        if environment is not None:
115            args["environment"] = environment
116
117        args["sdk_integration"] = sdk_integration
118
119        self.langfuse = Langfuse(**args)
120        self.trace: Optional[StatefulTraceClient] = None
121        self._task_manager = self.langfuse.task_manager
122
123    def get_trace_id(self):
124        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
125        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
126
127        Returns:
128            The ID of the current/last trace or None if no trace is available.
129        """
130        warnings.warn(
131            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
132            DeprecationWarning,
133        )
134        return self.trace.id if self.trace else None
135
136    def get_trace_url(self):
137        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
138        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
139
140        Returns:
141            The URL of the current/last trace or None if no trace is available.
142        """
143        warnings.warn(
144            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
145            DeprecationWarning,
146        )
147        return self.trace.get_trace_url() if self.trace else None
148
149    def flush(self):
150        if self.trace is not None:
151            self.trace.task_manager.flush()
152        elif self.root_span is not None:
153            self.root_span.task_manager.flush()
154        else:
155            self.log.debug("There was no trace yet, hence no flushing possible.")
156
157    def auth_check(self):
158        if self.langfuse is not None:
159            return self.langfuse.auth_check()
160        elif self.trace is not None:
161            projects = self.trace.client.projects.get()
162            if len(projects.data) == 0:
163                raise Exception("No projects found for the keys.")
164            return True
165        elif self.root_span is not None:
166            projects = self.root_span.client.projects.get()
167            if len(projects) == 0:
168                raise Exception("No projects found for the keys.")
169            return True
170
171        return False
class LangfuseBaseCallbackHandler:
 12class LangfuseBaseCallbackHandler:
 13    log = logging.getLogger("langfuse")
 14
 15    def __init__(
 16        self,
 17        *,
 18        public_key: Optional[str] = None,
 19        secret_key: Optional[str] = None,
 20        host: Optional[str] = None,
 21        debug: bool = False,
 22        stateful_client: Optional[
 23            Union[StatefulTraceClient, StatefulSpanClient]
 24        ] = None,
 25        update_stateful_client: bool = False,
 26        version: Optional[str] = None,
 27        session_id: Optional[str] = None,
 28        user_id: Optional[str] = None,
 29        trace_name: Optional[str] = None,
 30        release: Optional[str] = None,
 31        metadata: Optional[Any] = None,
 32        tags: Optional[List[str]] = None,
 33        threads: Optional[int] = None,
 34        flush_at: Optional[int] = None,
 35        flush_interval: Optional[int] = None,
 36        max_retries: Optional[int] = None,
 37        timeout: Optional[int] = None,
 38        enabled: Optional[bool] = None,
 39        httpx_client: Optional[httpx.Client] = None,
 40        sdk_integration: str,
 41        sample_rate: Optional[float] = None,
 42        mask: Optional[Callable] = None,
 43        environment: Optional[str] = None,
 44    ) -> None:
 45        self.version = version
 46        self.session_id = session_id
 47        self.user_id = user_id
 48        self.trace_name = trace_name
 49        self.release = release
 50        self.metadata = metadata
 51        self.tags = tags
 52
 53        self.root_span = None
 54        self.update_stateful_client = update_stateful_client
 55        self.langfuse = None
 56
 57        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 58        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 59        prio_host = host or os.environ.get(
 60            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 61        )
 62
 63        prio_sample_rate = (
 64            sample_rate
 65            if sample_rate is not None
 66            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 67        )
 68
 69        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 70            self.trace = stateful_client
 71            self._task_manager = stateful_client.task_manager
 72
 73            return
 74
 75        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 76            self.root_span = stateful_client
 77            self.trace = StatefulTraceClient(
 78                stateful_client.client,
 79                stateful_client.trace_id,
 80                StateType.TRACE,
 81                stateful_client.trace_id,
 82                stateful_client.task_manager,
 83            )
 84            self._task_manager = stateful_client.task_manager
 85
 86            return
 87
 88        args = {
 89            "public_key": prio_public_key,
 90            "secret_key": prio_secret_key,
 91            "host": prio_host,
 92            "debug": debug,
 93        }
 94
 95        if release is not None:
 96            args["release"] = release
 97        if threads is not None:
 98            args["threads"] = threads
 99        if flush_at is not None:
100            args["flush_at"] = flush_at
101        if flush_interval is not None:
102            args["flush_interval"] = flush_interval
103        if max_retries is not None:
104            args["max_retries"] = max_retries
105        if timeout is not None:
106            args["timeout"] = timeout
107        if enabled is not None:
108            args["enabled"] = enabled
109        if httpx_client is not None:
110            args["httpx_client"] = httpx_client
111        if prio_sample_rate is not None:
112            args["sample_rate"] = prio_sample_rate
113        if mask is not None:
114            args["mask"] = mask
115        if environment is not None:
116            args["environment"] = environment
117
118        args["sdk_integration"] = sdk_integration
119
120        self.langfuse = Langfuse(**args)
121        self.trace: Optional[StatefulTraceClient] = None
122        self._task_manager = self.langfuse.task_manager
123
124    def get_trace_id(self):
125        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
126        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
127
128        Returns:
129            The ID of the current/last trace or None if no trace is available.
130        """
131        warnings.warn(
132            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
133            DeprecationWarning,
134        )
135        return self.trace.id if self.trace else None
136
137    def get_trace_url(self):
138        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
139        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
140
141        Returns:
142            The URL of the current/last trace or None if no trace is available.
143        """
144        warnings.warn(
145            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
146            DeprecationWarning,
147        )
148        return self.trace.get_trace_url() if self.trace else None
149
150    def flush(self):
151        if self.trace is not None:
152            self.trace.task_manager.flush()
153        elif self.root_span is not None:
154            self.root_span.task_manager.flush()
155        else:
156            self.log.debug("There was no trace yet, hence no flushing possible.")
157
158    def auth_check(self):
159        if self.langfuse is not None:
160            return self.langfuse.auth_check()
161        elif self.trace is not None:
162            projects = self.trace.client.projects.get()
163            if len(projects.data) == 0:
164                raise Exception("No projects found for the keys.")
165            return True
166        elif self.root_span is not None:
167            projects = self.root_span.client.projects.get()
168            if len(projects) == 0:
169                raise Exception("No projects found for the keys.")
170            return True
171
172        return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None, mask: Optional[Callable] = None, environment: Optional[str] = None)
 15    def __init__(
 16        self,
 17        *,
 18        public_key: Optional[str] = None,
 19        secret_key: Optional[str] = None,
 20        host: Optional[str] = None,
 21        debug: bool = False,
 22        stateful_client: Optional[
 23            Union[StatefulTraceClient, StatefulSpanClient]
 24        ] = None,
 25        update_stateful_client: bool = False,
 26        version: Optional[str] = None,
 27        session_id: Optional[str] = None,
 28        user_id: Optional[str] = None,
 29        trace_name: Optional[str] = None,
 30        release: Optional[str] = None,
 31        metadata: Optional[Any] = None,
 32        tags: Optional[List[str]] = None,
 33        threads: Optional[int] = None,
 34        flush_at: Optional[int] = None,
 35        flush_interval: Optional[int] = None,
 36        max_retries: Optional[int] = None,
 37        timeout: Optional[int] = None,
 38        enabled: Optional[bool] = None,
 39        httpx_client: Optional[httpx.Client] = None,
 40        sdk_integration: str,
 41        sample_rate: Optional[float] = None,
 42        mask: Optional[Callable] = None,
 43        environment: Optional[str] = None,
 44    ) -> None:
 45        self.version = version
 46        self.session_id = session_id
 47        self.user_id = user_id
 48        self.trace_name = trace_name
 49        self.release = release
 50        self.metadata = metadata
 51        self.tags = tags
 52
 53        self.root_span = None
 54        self.update_stateful_client = update_stateful_client
 55        self.langfuse = None
 56
 57        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 58        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 59        prio_host = host or os.environ.get(
 60            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 61        )
 62
 63        prio_sample_rate = (
 64            sample_rate
 65            if sample_rate is not None
 66            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 67        )
 68
 69        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 70            self.trace = stateful_client
 71            self._task_manager = stateful_client.task_manager
 72
 73            return
 74
 75        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 76            self.root_span = stateful_client
 77            self.trace = StatefulTraceClient(
 78                stateful_client.client,
 79                stateful_client.trace_id,
 80                StateType.TRACE,
 81                stateful_client.trace_id,
 82                stateful_client.task_manager,
 83            )
 84            self._task_manager = stateful_client.task_manager
 85
 86            return
 87
 88        args = {
 89            "public_key": prio_public_key,
 90            "secret_key": prio_secret_key,
 91            "host": prio_host,
 92            "debug": debug,
 93        }
 94
 95        if release is not None:
 96            args["release"] = release
 97        if threads is not None:
 98            args["threads"] = threads
 99        if flush_at is not None:
100            args["flush_at"] = flush_at
101        if flush_interval is not None:
102            args["flush_interval"] = flush_interval
103        if max_retries is not None:
104            args["max_retries"] = max_retries
105        if timeout is not None:
106            args["timeout"] = timeout
107        if enabled is not None:
108            args["enabled"] = enabled
109        if httpx_client is not None:
110            args["httpx_client"] = httpx_client
111        if prio_sample_rate is not None:
112            args["sample_rate"] = prio_sample_rate
113        if mask is not None:
114            args["mask"] = mask
115        if environment is not None:
116            args["environment"] = environment
117
118        args["sdk_integration"] = sdk_integration
119
120        self.langfuse = Langfuse(**args)
121        self.trace: Optional[StatefulTraceClient] = None
122        self._task_manager = self.langfuse.task_manager
log = <Logger langfuse (WARNING)>
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
def get_trace_id(self):
124    def get_trace_id(self):
125        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
126        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
127
128        Returns:
129            The ID of the current/last trace or None if no trace is available.
130        """
131        warnings.warn(
132            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
133            DeprecationWarning,
134        )
135        return self.trace.id if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.

Returns:

The ID of the current/last trace or None if no trace is available.

def get_trace_url(self):
137    def get_trace_url(self):
138        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
139        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
140
141        Returns:
142            The URL of the current/last trace or None if no trace is available.
143        """
144        warnings.warn(
145            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
146            DeprecationWarning,
147        )
148        return self.trace.get_trace_url() if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.

Returns:

The URL of the current/last trace or None if no trace is available.

def flush(self):
150    def flush(self):
151        if self.trace is not None:
152            self.trace.task_manager.flush()
153        elif self.root_span is not None:
154            self.root_span.task_manager.flush()
155        else:
156            self.log.debug("There was no trace yet, hence no flushing possible.")
def auth_check(self):
158    def auth_check(self):
159        if self.langfuse is not None:
160            return self.langfuse.auth_check()
161        elif self.trace is not None:
162            projects = self.trace.client.projects.get()
163            if len(projects.data) == 0:
164                raise Exception("No projects found for the keys.")
165            return True
166        elif self.root_span is not None:
167            projects = self.root_span.client.projects.get()
168            if len(projects) == 0:
169                raise Exception("No projects found for the keys.")
170            return True
171
172        return False