langfuse.utils.base_callback_handler
1import logging 2import os 3import warnings 4from typing import Any, Callable, List, Optional, Union 5 6import httpx 7 8from langfuse.client import Langfuse, StatefulSpanClient, StatefulTraceClient, StateType 9 10 11class LangfuseBaseCallbackHandler: 12 log = logging.getLogger("langfuse") 13 14 def __init__( 15 self, 16 *, 17 public_key: Optional[str] = None, 18 secret_key: Optional[str] = None, 19 host: Optional[str] = None, 20 debug: bool = False, 21 stateful_client: Optional[ 22 Union[StatefulTraceClient, StatefulSpanClient] 23 ] = None, 24 update_stateful_client: bool = False, 25 version: Optional[str] = None, 26 session_id: Optional[str] = None, 27 user_id: Optional[str] = None, 28 trace_name: Optional[str] = None, 29 release: Optional[str] = None, 30 metadata: Optional[Any] = None, 31 tags: Optional[List[str]] = None, 32 threads: Optional[int] = None, 33 flush_at: Optional[int] = None, 34 flush_interval: Optional[int] = None, 35 max_retries: Optional[int] = None, 36 timeout: Optional[int] = None, 37 enabled: Optional[bool] = None, 38 httpx_client: Optional[httpx.Client] = None, 39 sdk_integration: str, 40 sample_rate: Optional[float] = None, 41 mask: Optional[Callable] = None, 42 environment: Optional[str] = None, 43 ) -> None: 44 self.version = version 45 self.session_id = session_id 46 self.user_id = user_id 47 self.trace_name = trace_name 48 self.release = release 49 self.metadata = metadata 50 self.tags = tags 51 52 self.root_span = None 53 self.update_stateful_client = update_stateful_client 54 self.langfuse = None 55 56 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 57 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 58 prio_host = host or os.environ.get( 59 "LANGFUSE_HOST", "https://cloud.langfuse.com" 60 ) 61 62 prio_sample_rate = ( 63 sample_rate 64 if sample_rate is not None 65 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 66 ) 67 68 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 69 self.trace = stateful_client 70 self._task_manager = stateful_client.task_manager 71 72 return 73 74 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 75 self.root_span = stateful_client 76 self.trace = StatefulTraceClient( 77 stateful_client.client, 78 stateful_client.trace_id, 79 StateType.TRACE, 80 stateful_client.trace_id, 81 stateful_client.task_manager, 82 ) 83 self._task_manager = stateful_client.task_manager 84 85 return 86 87 args = { 88 "public_key": prio_public_key, 89 "secret_key": prio_secret_key, 90 "host": prio_host, 91 "debug": debug, 92 } 93 94 if release is not None: 95 args["release"] = release 96 if threads is not None: 97 args["threads"] = threads 98 if flush_at is not None: 99 args["flush_at"] = flush_at 100 if flush_interval is not None: 101 args["flush_interval"] = flush_interval 102 if max_retries is not None: 103 args["max_retries"] = max_retries 104 if timeout is not None: 105 args["timeout"] = timeout 106 if enabled is not None: 107 args["enabled"] = enabled 108 if httpx_client is not None: 109 args["httpx_client"] = httpx_client 110 if prio_sample_rate is not None: 111 args["sample_rate"] = prio_sample_rate 112 if mask is not None: 113 args["mask"] = mask 114 if environment is not None: 115 args["environment"] = environment 116 117 args["sdk_integration"] = sdk_integration 118 119 self.langfuse = Langfuse(**args) 120 self.trace: Optional[StatefulTraceClient] = None 121 self._task_manager = self.langfuse.task_manager 122 123 def get_trace_id(self): 124 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 125 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 126 127 Returns: 128 The ID of the current/last trace or None if no trace is available. 129 """ 130 warnings.warn( 131 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 132 DeprecationWarning, 133 ) 134 return self.trace.id if self.trace else None 135 136 def get_trace_url(self): 137 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 138 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 139 140 Returns: 141 The URL of the current/last trace or None if no trace is available. 142 """ 143 warnings.warn( 144 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 145 DeprecationWarning, 146 ) 147 return self.trace.get_trace_url() if self.trace else None 148 149 def flush(self): 150 if self.trace is not None: 151 self.trace.task_manager.flush() 152 elif self.root_span is not None: 153 self.root_span.task_manager.flush() 154 else: 155 self.log.debug("There was no trace yet, hence no flushing possible.") 156 157 def auth_check(self): 158 if self.langfuse is not None: 159 return self.langfuse.auth_check() 160 elif self.trace is not None: 161 projects = self.trace.client.projects.get() 162 if len(projects.data) == 0: 163 raise Exception("No projects found for the keys.") 164 return True 165 elif self.root_span is not None: 166 projects = self.root_span.client.projects.get() 167 if len(projects) == 0: 168 raise Exception("No projects found for the keys.") 169 return True 170 171 return False
class
LangfuseBaseCallbackHandler:
12class LangfuseBaseCallbackHandler: 13 log = logging.getLogger("langfuse") 14 15 def __init__( 16 self, 17 *, 18 public_key: Optional[str] = None, 19 secret_key: Optional[str] = None, 20 host: Optional[str] = None, 21 debug: bool = False, 22 stateful_client: Optional[ 23 Union[StatefulTraceClient, StatefulSpanClient] 24 ] = None, 25 update_stateful_client: bool = False, 26 version: Optional[str] = None, 27 session_id: Optional[str] = None, 28 user_id: Optional[str] = None, 29 trace_name: Optional[str] = None, 30 release: Optional[str] = None, 31 metadata: Optional[Any] = None, 32 tags: Optional[List[str]] = None, 33 threads: Optional[int] = None, 34 flush_at: Optional[int] = None, 35 flush_interval: Optional[int] = None, 36 max_retries: Optional[int] = None, 37 timeout: Optional[int] = None, 38 enabled: Optional[bool] = None, 39 httpx_client: Optional[httpx.Client] = None, 40 sdk_integration: str, 41 sample_rate: Optional[float] = None, 42 mask: Optional[Callable] = None, 43 environment: Optional[str] = None, 44 ) -> None: 45 self.version = version 46 self.session_id = session_id 47 self.user_id = user_id 48 self.trace_name = trace_name 49 self.release = release 50 self.metadata = metadata 51 self.tags = tags 52 53 self.root_span = None 54 self.update_stateful_client = update_stateful_client 55 self.langfuse = None 56 57 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 58 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 59 prio_host = host or os.environ.get( 60 "LANGFUSE_HOST", "https://cloud.langfuse.com" 61 ) 62 63 prio_sample_rate = ( 64 sample_rate 65 if sample_rate is not None 66 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 67 ) 68 69 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 70 self.trace = stateful_client 71 self._task_manager = stateful_client.task_manager 72 73 return 74 75 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 76 self.root_span = stateful_client 77 self.trace = StatefulTraceClient( 78 stateful_client.client, 79 stateful_client.trace_id, 80 StateType.TRACE, 81 stateful_client.trace_id, 82 stateful_client.task_manager, 83 ) 84 self._task_manager = stateful_client.task_manager 85 86 return 87 88 args = { 89 "public_key": prio_public_key, 90 "secret_key": prio_secret_key, 91 "host": prio_host, 92 "debug": debug, 93 } 94 95 if release is not None: 96 args["release"] = release 97 if threads is not None: 98 args["threads"] = threads 99 if flush_at is not None: 100 args["flush_at"] = flush_at 101 if flush_interval is not None: 102 args["flush_interval"] = flush_interval 103 if max_retries is not None: 104 args["max_retries"] = max_retries 105 if timeout is not None: 106 args["timeout"] = timeout 107 if enabled is not None: 108 args["enabled"] = enabled 109 if httpx_client is not None: 110 args["httpx_client"] = httpx_client 111 if prio_sample_rate is not None: 112 args["sample_rate"] = prio_sample_rate 113 if mask is not None: 114 args["mask"] = mask 115 if environment is not None: 116 args["environment"] = environment 117 118 args["sdk_integration"] = sdk_integration 119 120 self.langfuse = Langfuse(**args) 121 self.trace: Optional[StatefulTraceClient] = None 122 self._task_manager = self.langfuse.task_manager 123 124 def get_trace_id(self): 125 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 126 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 127 128 Returns: 129 The ID of the current/last trace or None if no trace is available. 130 """ 131 warnings.warn( 132 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 133 DeprecationWarning, 134 ) 135 return self.trace.id if self.trace else None 136 137 def get_trace_url(self): 138 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 139 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 140 141 Returns: 142 The URL of the current/last trace or None if no trace is available. 143 """ 144 warnings.warn( 145 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 146 DeprecationWarning, 147 ) 148 return self.trace.get_trace_url() if self.trace else None 149 150 def flush(self): 151 if self.trace is not None: 152 self.trace.task_manager.flush() 153 elif self.root_span is not None: 154 self.root_span.task_manager.flush() 155 else: 156 self.log.debug("There was no trace yet, hence no flushing possible.") 157 158 def auth_check(self): 159 if self.langfuse is not None: 160 return self.langfuse.auth_check() 161 elif self.trace is not None: 162 projects = self.trace.client.projects.get() 163 if len(projects.data) == 0: 164 raise Exception("No projects found for the keys.") 165 return True 166 elif self.root_span is not None: 167 projects = self.root_span.client.projects.get() 168 if len(projects) == 0: 169 raise Exception("No projects found for the keys.") 170 return True 171 172 return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None, mask: Optional[Callable] = None, environment: Optional[str] = None)
15 def __init__( 16 self, 17 *, 18 public_key: Optional[str] = None, 19 secret_key: Optional[str] = None, 20 host: Optional[str] = None, 21 debug: bool = False, 22 stateful_client: Optional[ 23 Union[StatefulTraceClient, StatefulSpanClient] 24 ] = None, 25 update_stateful_client: bool = False, 26 version: Optional[str] = None, 27 session_id: Optional[str] = None, 28 user_id: Optional[str] = None, 29 trace_name: Optional[str] = None, 30 release: Optional[str] = None, 31 metadata: Optional[Any] = None, 32 tags: Optional[List[str]] = None, 33 threads: Optional[int] = None, 34 flush_at: Optional[int] = None, 35 flush_interval: Optional[int] = None, 36 max_retries: Optional[int] = None, 37 timeout: Optional[int] = None, 38 enabled: Optional[bool] = None, 39 httpx_client: Optional[httpx.Client] = None, 40 sdk_integration: str, 41 sample_rate: Optional[float] = None, 42 mask: Optional[Callable] = None, 43 environment: Optional[str] = None, 44 ) -> None: 45 self.version = version 46 self.session_id = session_id 47 self.user_id = user_id 48 self.trace_name = trace_name 49 self.release = release 50 self.metadata = metadata 51 self.tags = tags 52 53 self.root_span = None 54 self.update_stateful_client = update_stateful_client 55 self.langfuse = None 56 57 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 58 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 59 prio_host = host or os.environ.get( 60 "LANGFUSE_HOST", "https://cloud.langfuse.com" 61 ) 62 63 prio_sample_rate = ( 64 sample_rate 65 if sample_rate is not None 66 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 67 ) 68 69 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 70 self.trace = stateful_client 71 self._task_manager = stateful_client.task_manager 72 73 return 74 75 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 76 self.root_span = stateful_client 77 self.trace = StatefulTraceClient( 78 stateful_client.client, 79 stateful_client.trace_id, 80 StateType.TRACE, 81 stateful_client.trace_id, 82 stateful_client.task_manager, 83 ) 84 self._task_manager = stateful_client.task_manager 85 86 return 87 88 args = { 89 "public_key": prio_public_key, 90 "secret_key": prio_secret_key, 91 "host": prio_host, 92 "debug": debug, 93 } 94 95 if release is not None: 96 args["release"] = release 97 if threads is not None: 98 args["threads"] = threads 99 if flush_at is not None: 100 args["flush_at"] = flush_at 101 if flush_interval is not None: 102 args["flush_interval"] = flush_interval 103 if max_retries is not None: 104 args["max_retries"] = max_retries 105 if timeout is not None: 106 args["timeout"] = timeout 107 if enabled is not None: 108 args["enabled"] = enabled 109 if httpx_client is not None: 110 args["httpx_client"] = httpx_client 111 if prio_sample_rate is not None: 112 args["sample_rate"] = prio_sample_rate 113 if mask is not None: 114 args["mask"] = mask 115 if environment is not None: 116 args["environment"] = environment 117 118 args["sdk_integration"] = sdk_integration 119 120 self.langfuse = Langfuse(**args) 121 self.trace: Optional[StatefulTraceClient] = None 122 self._task_manager = self.langfuse.task_manager
trace: Optional[langfuse.client.StatefulTraceClient]
def
get_trace_id(self):
124 def get_trace_id(self): 125 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 126 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 127 128 Returns: 129 The ID of the current/last trace or None if no trace is available. 130 """ 131 warnings.warn( 132 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 133 DeprecationWarning, 134 ) 135 return self.trace.id if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.
Returns:
The ID of the current/last trace or None if no trace is available.
def
get_trace_url(self):
137 def get_trace_url(self): 138 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 139 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 140 141 Returns: 142 The URL of the current/last trace or None if no trace is available. 143 """ 144 warnings.warn( 145 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 146 DeprecationWarning, 147 ) 148 return self.trace.get_trace_url() if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.
Returns:
The URL of the current/last trace or None if no trace is available.
def
auth_check(self):
158 def auth_check(self): 159 if self.langfuse is not None: 160 return self.langfuse.auth_check() 161 elif self.trace is not None: 162 projects = self.trace.client.projects.get() 163 if len(projects.data) == 0: 164 raise Exception("No projects found for the keys.") 165 return True 166 elif self.root_span is not None: 167 projects = self.root_span.client.projects.get() 168 if len(projects) == 0: 169 raise Exception("No projects found for the keys.") 170 return True 171 172 return False