langfuse.utils.base_callback_handler
1from typing import Optional, Union, List, Any, Callable 2import httpx 3import logging 4import os 5import warnings 6 7from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType 8 9 10class LangfuseBaseCallbackHandler: 11 log = logging.getLogger("langfuse") 12 13 def __init__( 14 self, 15 *, 16 public_key: Optional[str] = None, 17 secret_key: Optional[str] = None, 18 host: Optional[str] = None, 19 debug: bool = False, 20 stateful_client: Optional[ 21 Union[StatefulTraceClient, StatefulSpanClient] 22 ] = None, 23 update_stateful_client: bool = False, 24 version: Optional[str] = None, 25 session_id: Optional[str] = None, 26 user_id: Optional[str] = None, 27 trace_name: Optional[str] = None, 28 release: Optional[str] = None, 29 metadata: Optional[Any] = None, 30 tags: Optional[List[str]] = None, 31 threads: Optional[int] = None, 32 flush_at: Optional[int] = None, 33 flush_interval: Optional[int] = None, 34 max_retries: Optional[int] = None, 35 timeout: Optional[int] = None, 36 enabled: Optional[bool] = None, 37 httpx_client: Optional[httpx.Client] = None, 38 sdk_integration: str, 39 sample_rate: Optional[float] = None, 40 mask: Optional[Callable] = None, 41 ) -> None: 42 self.version = version 43 self.session_id = session_id 44 self.user_id = user_id 45 self.trace_name = trace_name 46 self.release = release 47 self.metadata = metadata 48 self.tags = tags 49 50 self.root_span = None 51 self.update_stateful_client = update_stateful_client 52 self.langfuse = None 53 54 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 55 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 56 prio_host = host or os.environ.get( 57 "LANGFUSE_HOST", "https://cloud.langfuse.com" 58 ) 59 60 prio_sample_rate = ( 61 sample_rate 62 if sample_rate is not None 63 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 64 ) 65 66 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 67 self.trace = stateful_client 68 self._task_manager = stateful_client.task_manager 69 70 return 71 72 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 73 self.root_span = stateful_client 74 self.trace = StatefulTraceClient( 75 stateful_client.client, 76 stateful_client.trace_id, 77 StateType.TRACE, 78 stateful_client.trace_id, 79 stateful_client.task_manager, 80 ) 81 self._task_manager = stateful_client.task_manager 82 83 return 84 85 args = { 86 "public_key": prio_public_key, 87 "secret_key": prio_secret_key, 88 "host": prio_host, 89 "debug": debug, 90 } 91 92 if release is not None: 93 args["release"] = release 94 if threads is not None: 95 args["threads"] = threads 96 if flush_at is not None: 97 args["flush_at"] = flush_at 98 if flush_interval is not None: 99 args["flush_interval"] = flush_interval 100 if max_retries is not None: 101 args["max_retries"] = max_retries 102 if timeout is not None: 103 args["timeout"] = timeout 104 if enabled is not None: 105 args["enabled"] = enabled 106 if httpx_client is not None: 107 args["httpx_client"] = httpx_client 108 if prio_sample_rate is not None: 109 args["sample_rate"] = prio_sample_rate 110 if mask is not None: 111 args["mask"] = mask 112 113 args["sdk_integration"] = sdk_integration 114 115 self.langfuse = Langfuse(**args) 116 self.trace: Optional[StatefulTraceClient] = None 117 self._task_manager = self.langfuse.task_manager 118 119 def get_trace_id(self): 120 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 121 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 122 123 Returns: 124 The ID of the current/last trace or None if no trace is available. 125 """ 126 warnings.warn( 127 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 128 DeprecationWarning, 129 ) 130 return self.trace.id if self.trace else None 131 132 def get_trace_url(self): 133 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 134 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 135 136 Returns: 137 The URL of the current/last trace or None if no trace is available. 138 """ 139 warnings.warn( 140 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 141 DeprecationWarning, 142 ) 143 return self.trace.get_trace_url() if self.trace else None 144 145 def flush(self): 146 if self.trace is not None: 147 self.trace.task_manager.flush() 148 elif self.root_span is not None: 149 self.root_span.task_manager.flush() 150 else: 151 self.log.debug("There was no trace yet, hence no flushing possible.") 152 153 def auth_check(self): 154 if self.langfuse is not None: 155 return self.langfuse.auth_check() 156 elif self.trace is not None: 157 projects = self.trace.client.projects.get() 158 if len(projects.data) == 0: 159 raise Exception("No projects found for the keys.") 160 return True 161 elif self.root_span is not None: 162 projects = self.root_span.client.projects.get() 163 if len(projects) == 0: 164 raise Exception("No projects found for the keys.") 165 return True 166 167 return False
class
LangfuseBaseCallbackHandler:
11class LangfuseBaseCallbackHandler: 12 log = logging.getLogger("langfuse") 13 14 def __init__( 15 self, 16 *, 17 public_key: Optional[str] = None, 18 secret_key: Optional[str] = None, 19 host: Optional[str] = None, 20 debug: bool = False, 21 stateful_client: Optional[ 22 Union[StatefulTraceClient, StatefulSpanClient] 23 ] = None, 24 update_stateful_client: bool = False, 25 version: Optional[str] = None, 26 session_id: Optional[str] = None, 27 user_id: Optional[str] = None, 28 trace_name: Optional[str] = None, 29 release: Optional[str] = None, 30 metadata: Optional[Any] = None, 31 tags: Optional[List[str]] = None, 32 threads: Optional[int] = None, 33 flush_at: Optional[int] = None, 34 flush_interval: Optional[int] = None, 35 max_retries: Optional[int] = None, 36 timeout: Optional[int] = None, 37 enabled: Optional[bool] = None, 38 httpx_client: Optional[httpx.Client] = None, 39 sdk_integration: str, 40 sample_rate: Optional[float] = None, 41 mask: Optional[Callable] = None, 42 ) -> None: 43 self.version = version 44 self.session_id = session_id 45 self.user_id = user_id 46 self.trace_name = trace_name 47 self.release = release 48 self.metadata = metadata 49 self.tags = tags 50 51 self.root_span = None 52 self.update_stateful_client = update_stateful_client 53 self.langfuse = None 54 55 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 56 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 57 prio_host = host or os.environ.get( 58 "LANGFUSE_HOST", "https://cloud.langfuse.com" 59 ) 60 61 prio_sample_rate = ( 62 sample_rate 63 if sample_rate is not None 64 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 65 ) 66 67 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 68 self.trace = stateful_client 69 self._task_manager = stateful_client.task_manager 70 71 return 72 73 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 74 self.root_span = stateful_client 75 self.trace = StatefulTraceClient( 76 stateful_client.client, 77 stateful_client.trace_id, 78 StateType.TRACE, 79 stateful_client.trace_id, 80 stateful_client.task_manager, 81 ) 82 self._task_manager = stateful_client.task_manager 83 84 return 85 86 args = { 87 "public_key": prio_public_key, 88 "secret_key": prio_secret_key, 89 "host": prio_host, 90 "debug": debug, 91 } 92 93 if release is not None: 94 args["release"] = release 95 if threads is not None: 96 args["threads"] = threads 97 if flush_at is not None: 98 args["flush_at"] = flush_at 99 if flush_interval is not None: 100 args["flush_interval"] = flush_interval 101 if max_retries is not None: 102 args["max_retries"] = max_retries 103 if timeout is not None: 104 args["timeout"] = timeout 105 if enabled is not None: 106 args["enabled"] = enabled 107 if httpx_client is not None: 108 args["httpx_client"] = httpx_client 109 if prio_sample_rate is not None: 110 args["sample_rate"] = prio_sample_rate 111 if mask is not None: 112 args["mask"] = mask 113 114 args["sdk_integration"] = sdk_integration 115 116 self.langfuse = Langfuse(**args) 117 self.trace: Optional[StatefulTraceClient] = None 118 self._task_manager = self.langfuse.task_manager 119 120 def get_trace_id(self): 121 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 122 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 123 124 Returns: 125 The ID of the current/last trace or None if no trace is available. 126 """ 127 warnings.warn( 128 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 129 DeprecationWarning, 130 ) 131 return self.trace.id if self.trace else None 132 133 def get_trace_url(self): 134 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 135 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 136 137 Returns: 138 The URL of the current/last trace or None if no trace is available. 139 """ 140 warnings.warn( 141 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 142 DeprecationWarning, 143 ) 144 return self.trace.get_trace_url() if self.trace else None 145 146 def flush(self): 147 if self.trace is not None: 148 self.trace.task_manager.flush() 149 elif self.root_span is not None: 150 self.root_span.task_manager.flush() 151 else: 152 self.log.debug("There was no trace yet, hence no flushing possible.") 153 154 def auth_check(self): 155 if self.langfuse is not None: 156 return self.langfuse.auth_check() 157 elif self.trace is not None: 158 projects = self.trace.client.projects.get() 159 if len(projects.data) == 0: 160 raise Exception("No projects found for the keys.") 161 return True 162 elif self.root_span is not None: 163 projects = self.root_span.client.projects.get() 164 if len(projects) == 0: 165 raise Exception("No projects found for the keys.") 166 return True 167 168 return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None, mask: Optional[Callable] = None)
14 def __init__( 15 self, 16 *, 17 public_key: Optional[str] = None, 18 secret_key: Optional[str] = None, 19 host: Optional[str] = None, 20 debug: bool = False, 21 stateful_client: Optional[ 22 Union[StatefulTraceClient, StatefulSpanClient] 23 ] = None, 24 update_stateful_client: bool = False, 25 version: Optional[str] = None, 26 session_id: Optional[str] = None, 27 user_id: Optional[str] = None, 28 trace_name: Optional[str] = None, 29 release: Optional[str] = None, 30 metadata: Optional[Any] = None, 31 tags: Optional[List[str]] = None, 32 threads: Optional[int] = None, 33 flush_at: Optional[int] = None, 34 flush_interval: Optional[int] = None, 35 max_retries: Optional[int] = None, 36 timeout: Optional[int] = None, 37 enabled: Optional[bool] = None, 38 httpx_client: Optional[httpx.Client] = None, 39 sdk_integration: str, 40 sample_rate: Optional[float] = None, 41 mask: Optional[Callable] = None, 42 ) -> None: 43 self.version = version 44 self.session_id = session_id 45 self.user_id = user_id 46 self.trace_name = trace_name 47 self.release = release 48 self.metadata = metadata 49 self.tags = tags 50 51 self.root_span = None 52 self.update_stateful_client = update_stateful_client 53 self.langfuse = None 54 55 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 56 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 57 prio_host = host or os.environ.get( 58 "LANGFUSE_HOST", "https://cloud.langfuse.com" 59 ) 60 61 prio_sample_rate = ( 62 sample_rate 63 if sample_rate is not None 64 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 65 ) 66 67 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 68 self.trace = stateful_client 69 self._task_manager = stateful_client.task_manager 70 71 return 72 73 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 74 self.root_span = stateful_client 75 self.trace = StatefulTraceClient( 76 stateful_client.client, 77 stateful_client.trace_id, 78 StateType.TRACE, 79 stateful_client.trace_id, 80 stateful_client.task_manager, 81 ) 82 self._task_manager = stateful_client.task_manager 83 84 return 85 86 args = { 87 "public_key": prio_public_key, 88 "secret_key": prio_secret_key, 89 "host": prio_host, 90 "debug": debug, 91 } 92 93 if release is not None: 94 args["release"] = release 95 if threads is not None: 96 args["threads"] = threads 97 if flush_at is not None: 98 args["flush_at"] = flush_at 99 if flush_interval is not None: 100 args["flush_interval"] = flush_interval 101 if max_retries is not None: 102 args["max_retries"] = max_retries 103 if timeout is not None: 104 args["timeout"] = timeout 105 if enabled is not None: 106 args["enabled"] = enabled 107 if httpx_client is not None: 108 args["httpx_client"] = httpx_client 109 if prio_sample_rate is not None: 110 args["sample_rate"] = prio_sample_rate 111 if mask is not None: 112 args["mask"] = mask 113 114 args["sdk_integration"] = sdk_integration 115 116 self.langfuse = Langfuse(**args) 117 self.trace: Optional[StatefulTraceClient] = None 118 self._task_manager = self.langfuse.task_manager
trace: Optional[langfuse.client.StatefulTraceClient]
def
get_trace_id(self):
120 def get_trace_id(self): 121 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 122 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 123 124 Returns: 125 The ID of the current/last trace or None if no trace is available. 126 """ 127 warnings.warn( 128 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 129 DeprecationWarning, 130 ) 131 return self.trace.id if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.
Returns:
The ID of the current/last trace or None if no trace is available.
def
get_trace_url(self):
133 def get_trace_url(self): 134 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 135 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 136 137 Returns: 138 The URL of the current/last trace or None if no trace is available. 139 """ 140 warnings.warn( 141 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 142 DeprecationWarning, 143 ) 144 return self.trace.get_trace_url() if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.
Returns:
The URL of the current/last trace or None if no trace is available.
def
auth_check(self):
154 def auth_check(self): 155 if self.langfuse is not None: 156 return self.langfuse.auth_check() 157 elif self.trace is not None: 158 projects = self.trace.client.projects.get() 159 if len(projects.data) == 0: 160 raise Exception("No projects found for the keys.") 161 return True 162 elif self.root_span is not None: 163 projects = self.root_span.client.projects.get() 164 if len(projects) == 0: 165 raise Exception("No projects found for the keys.") 166 return True 167 168 return False