langfuse.utils.base_callback_handler

  1from typing import Optional, Union, List, Any
  2import httpx
  3import logging
  4import os
  5import warnings
  6
  7from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType
  8
  9
 10class LangfuseBaseCallbackHandler:
 11    log = logging.getLogger("langfuse")
 12
 13    def __init__(
 14        self,
 15        *,
 16        public_key: Optional[str] = None,
 17        secret_key: Optional[str] = None,
 18        host: Optional[str] = None,
 19        debug: bool = False,
 20        stateful_client: Optional[
 21            Union[StatefulTraceClient, StatefulSpanClient]
 22        ] = None,
 23        update_stateful_client: bool = False,
 24        version: Optional[str] = None,
 25        session_id: Optional[str] = None,
 26        user_id: Optional[str] = None,
 27        trace_name: Optional[str] = None,
 28        release: Optional[str] = None,
 29        metadata: Optional[Any] = None,
 30        tags: Optional[List[str]] = None,
 31        threads: Optional[int] = None,
 32        flush_at: Optional[int] = None,
 33        flush_interval: Optional[int] = None,
 34        max_retries: Optional[int] = None,
 35        timeout: Optional[int] = None,
 36        enabled: Optional[bool] = None,
 37        httpx_client: Optional[httpx.Client] = None,
 38        sdk_integration: str,
 39        sample_rate: Optional[float] = None,
 40    ) -> None:
 41        self.version = version
 42        self.session_id = session_id
 43        self.user_id = user_id
 44        self.trace_name = trace_name
 45        self.release = release
 46        self.metadata = metadata
 47        self.tags = tags
 48
 49        self.root_span = None
 50        self.update_stateful_client = update_stateful_client
 51        self.langfuse = None
 52
 53        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 54        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 55        prio_host = host or os.environ.get(
 56            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 57        )
 58
 59        prio_sample_rate = (
 60            sample_rate
 61            if sample_rate is not None
 62            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 63        )
 64
 65        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 66            self.trace = stateful_client
 67            self._task_manager = stateful_client.task_manager
 68
 69            return
 70
 71        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 72            self.root_span = stateful_client
 73            self.trace = StatefulTraceClient(
 74                stateful_client.client,
 75                stateful_client.trace_id,
 76                StateType.TRACE,
 77                stateful_client.trace_id,
 78                stateful_client.task_manager,
 79            )
 80            self._task_manager = stateful_client.task_manager
 81
 82            return
 83
 84        args = {
 85            "public_key": prio_public_key,
 86            "secret_key": prio_secret_key,
 87            "host": prio_host,
 88            "debug": debug,
 89        }
 90
 91        if release is not None:
 92            args["release"] = release
 93        if threads is not None:
 94            args["threads"] = threads
 95        if flush_at is not None:
 96            args["flush_at"] = flush_at
 97        if flush_interval is not None:
 98            args["flush_interval"] = flush_interval
 99        if max_retries is not None:
100            args["max_retries"] = max_retries
101        if timeout is not None:
102            args["timeout"] = timeout
103        if enabled is not None:
104            args["enabled"] = enabled
105        if httpx_client is not None:
106            args["httpx_client"] = httpx_client
107        if prio_sample_rate is not None:
108            args["sample_rate"] = prio_sample_rate
109
110        args["sdk_integration"] = sdk_integration
111
112        self.langfuse = Langfuse(**args)
113        self.trace: Optional[StatefulTraceClient] = None
114        self._task_manager = self.langfuse.task_manager
115
116    def get_trace_id(self):
117        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
118        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
119
120        Returns:
121            The ID of the current/last trace or None if no trace is available.
122        """
123        warnings.warn(
124            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
125            DeprecationWarning,
126        )
127        return self.trace.id if self.trace else None
128
129    def get_trace_url(self):
130        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
131        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
132
133        Returns:
134            The URL of the current/last trace or None if no trace is available.
135        """
136        warnings.warn(
137            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
138            DeprecationWarning,
139        )
140        return self.trace.get_trace_url() if self.trace else None
141
142    def flush(self):
143        if self.trace is not None:
144            self.trace.task_manager.flush()
145        elif self.root_span is not None:
146            self.root_span.task_manager.flush()
147        else:
148            self.log.debug("There was no trace yet, hence no flushing possible.")
149
150    def auth_check(self):
151        if self.langfuse is not None:
152            return self.langfuse.auth_check()
153        elif self.trace is not None:
154            projects = self.trace.client.projects.get()
155            if len(projects.data) == 0:
156                raise Exception("No projects found for the keys.")
157            return True
158        elif self.root_span is not None:
159            projects = self.root_span.client.projects.get()
160            if len(projects) == 0:
161                raise Exception("No projects found for the keys.")
162            return True
163
164        return False
class LangfuseBaseCallbackHandler:
 11class LangfuseBaseCallbackHandler:
 12    log = logging.getLogger("langfuse")
 13
 14    def __init__(
 15        self,
 16        *,
 17        public_key: Optional[str] = None,
 18        secret_key: Optional[str] = None,
 19        host: Optional[str] = None,
 20        debug: bool = False,
 21        stateful_client: Optional[
 22            Union[StatefulTraceClient, StatefulSpanClient]
 23        ] = None,
 24        update_stateful_client: bool = False,
 25        version: Optional[str] = None,
 26        session_id: Optional[str] = None,
 27        user_id: Optional[str] = None,
 28        trace_name: Optional[str] = None,
 29        release: Optional[str] = None,
 30        metadata: Optional[Any] = None,
 31        tags: Optional[List[str]] = None,
 32        threads: Optional[int] = None,
 33        flush_at: Optional[int] = None,
 34        flush_interval: Optional[int] = None,
 35        max_retries: Optional[int] = None,
 36        timeout: Optional[int] = None,
 37        enabled: Optional[bool] = None,
 38        httpx_client: Optional[httpx.Client] = None,
 39        sdk_integration: str,
 40        sample_rate: Optional[float] = None,
 41    ) -> None:
 42        self.version = version
 43        self.session_id = session_id
 44        self.user_id = user_id
 45        self.trace_name = trace_name
 46        self.release = release
 47        self.metadata = metadata
 48        self.tags = tags
 49
 50        self.root_span = None
 51        self.update_stateful_client = update_stateful_client
 52        self.langfuse = None
 53
 54        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 55        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 56        prio_host = host or os.environ.get(
 57            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 58        )
 59
 60        prio_sample_rate = (
 61            sample_rate
 62            if sample_rate is not None
 63            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 64        )
 65
 66        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 67            self.trace = stateful_client
 68            self._task_manager = stateful_client.task_manager
 69
 70            return
 71
 72        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 73            self.root_span = stateful_client
 74            self.trace = StatefulTraceClient(
 75                stateful_client.client,
 76                stateful_client.trace_id,
 77                StateType.TRACE,
 78                stateful_client.trace_id,
 79                stateful_client.task_manager,
 80            )
 81            self._task_manager = stateful_client.task_manager
 82
 83            return
 84
 85        args = {
 86            "public_key": prio_public_key,
 87            "secret_key": prio_secret_key,
 88            "host": prio_host,
 89            "debug": debug,
 90        }
 91
 92        if release is not None:
 93            args["release"] = release
 94        if threads is not None:
 95            args["threads"] = threads
 96        if flush_at is not None:
 97            args["flush_at"] = flush_at
 98        if flush_interval is not None:
 99            args["flush_interval"] = flush_interval
100        if max_retries is not None:
101            args["max_retries"] = max_retries
102        if timeout is not None:
103            args["timeout"] = timeout
104        if enabled is not None:
105            args["enabled"] = enabled
106        if httpx_client is not None:
107            args["httpx_client"] = httpx_client
108        if prio_sample_rate is not None:
109            args["sample_rate"] = prio_sample_rate
110
111        args["sdk_integration"] = sdk_integration
112
113        self.langfuse = Langfuse(**args)
114        self.trace: Optional[StatefulTraceClient] = None
115        self._task_manager = self.langfuse.task_manager
116
117    def get_trace_id(self):
118        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
119        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
120
121        Returns:
122            The ID of the current/last trace or None if no trace is available.
123        """
124        warnings.warn(
125            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
126            DeprecationWarning,
127        )
128        return self.trace.id if self.trace else None
129
130    def get_trace_url(self):
131        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
132        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
133
134        Returns:
135            The URL of the current/last trace or None if no trace is available.
136        """
137        warnings.warn(
138            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
139            DeprecationWarning,
140        )
141        return self.trace.get_trace_url() if self.trace else None
142
143    def flush(self):
144        if self.trace is not None:
145            self.trace.task_manager.flush()
146        elif self.root_span is not None:
147            self.root_span.task_manager.flush()
148        else:
149            self.log.debug("There was no trace yet, hence no flushing possible.")
150
151    def auth_check(self):
152        if self.langfuse is not None:
153            return self.langfuse.auth_check()
154        elif self.trace is not None:
155            projects = self.trace.client.projects.get()
156            if len(projects.data) == 0:
157                raise Exception("No projects found for the keys.")
158            return True
159        elif self.root_span is not None:
160            projects = self.root_span.client.projects.get()
161            if len(projects) == 0:
162                raise Exception("No projects found for the keys.")
163            return True
164
165        return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None)
 14    def __init__(
 15        self,
 16        *,
 17        public_key: Optional[str] = None,
 18        secret_key: Optional[str] = None,
 19        host: Optional[str] = None,
 20        debug: bool = False,
 21        stateful_client: Optional[
 22            Union[StatefulTraceClient, StatefulSpanClient]
 23        ] = None,
 24        update_stateful_client: bool = False,
 25        version: Optional[str] = None,
 26        session_id: Optional[str] = None,
 27        user_id: Optional[str] = None,
 28        trace_name: Optional[str] = None,
 29        release: Optional[str] = None,
 30        metadata: Optional[Any] = None,
 31        tags: Optional[List[str]] = None,
 32        threads: Optional[int] = None,
 33        flush_at: Optional[int] = None,
 34        flush_interval: Optional[int] = None,
 35        max_retries: Optional[int] = None,
 36        timeout: Optional[int] = None,
 37        enabled: Optional[bool] = None,
 38        httpx_client: Optional[httpx.Client] = None,
 39        sdk_integration: str,
 40        sample_rate: Optional[float] = None,
 41    ) -> None:
 42        self.version = version
 43        self.session_id = session_id
 44        self.user_id = user_id
 45        self.trace_name = trace_name
 46        self.release = release
 47        self.metadata = metadata
 48        self.tags = tags
 49
 50        self.root_span = None
 51        self.update_stateful_client = update_stateful_client
 52        self.langfuse = None
 53
 54        prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
 55        prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
 56        prio_host = host or os.environ.get(
 57            "LANGFUSE_HOST", "https://cloud.langfuse.com"
 58        )
 59
 60        prio_sample_rate = (
 61            sample_rate
 62            if sample_rate is not None
 63            else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
 64        )
 65
 66        if stateful_client and isinstance(stateful_client, StatefulTraceClient):
 67            self.trace = stateful_client
 68            self._task_manager = stateful_client.task_manager
 69
 70            return
 71
 72        elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
 73            self.root_span = stateful_client
 74            self.trace = StatefulTraceClient(
 75                stateful_client.client,
 76                stateful_client.trace_id,
 77                StateType.TRACE,
 78                stateful_client.trace_id,
 79                stateful_client.task_manager,
 80            )
 81            self._task_manager = stateful_client.task_manager
 82
 83            return
 84
 85        args = {
 86            "public_key": prio_public_key,
 87            "secret_key": prio_secret_key,
 88            "host": prio_host,
 89            "debug": debug,
 90        }
 91
 92        if release is not None:
 93            args["release"] = release
 94        if threads is not None:
 95            args["threads"] = threads
 96        if flush_at is not None:
 97            args["flush_at"] = flush_at
 98        if flush_interval is not None:
 99            args["flush_interval"] = flush_interval
100        if max_retries is not None:
101            args["max_retries"] = max_retries
102        if timeout is not None:
103            args["timeout"] = timeout
104        if enabled is not None:
105            args["enabled"] = enabled
106        if httpx_client is not None:
107            args["httpx_client"] = httpx_client
108        if prio_sample_rate is not None:
109            args["sample_rate"] = prio_sample_rate
110
111        args["sdk_integration"] = sdk_integration
112
113        self.langfuse = Langfuse(**args)
114        self.trace: Optional[StatefulTraceClient] = None
115        self._task_manager = self.langfuse.task_manager
log = <Logger langfuse (WARNING)>
version
session_id
user_id
trace_name
release
metadata
tags
root_span
update_stateful_client
langfuse
def get_trace_id(self):
117    def get_trace_id(self):
118        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
119        Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
120
121        Returns:
122            The ID of the current/last trace or None if no trace is available.
123        """
124        warnings.warn(
125            "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
126            DeprecationWarning,
127        )
128        return self.trace.id if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.

Returns:

The ID of the current/last trace or None if no trace is available.

def get_trace_url(self):
130    def get_trace_url(self):
131        """This method is deprecated and will be removed in a future version as it is not concurrency-safe.
132        Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
133
134        Returns:
135            The URL of the current/last trace or None if no trace is available.
136        """
137        warnings.warn(
138            "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
139            DeprecationWarning,
140        )
141        return self.trace.get_trace_url() if self.trace else None

This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.

Returns:

The URL of the current/last trace or None if no trace is available.

def flush(self):
143    def flush(self):
144        if self.trace is not None:
145            self.trace.task_manager.flush()
146        elif self.root_span is not None:
147            self.root_span.task_manager.flush()
148        else:
149            self.log.debug("There was no trace yet, hence no flushing possible.")
def auth_check(self):
151    def auth_check(self):
152        if self.langfuse is not None:
153            return self.langfuse.auth_check()
154        elif self.trace is not None:
155            projects = self.trace.client.projects.get()
156            if len(projects.data) == 0:
157                raise Exception("No projects found for the keys.")
158            return True
159        elif self.root_span is not None:
160            projects = self.root_span.client.projects.get()
161            if len(projects) == 0:
162                raise Exception("No projects found for the keys.")
163            return True
164
165        return False