langfuse.utils.base_callback_handler
1from typing import Optional, Union, List, Any 2import httpx 3import logging 4import os 5import warnings 6 7from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType 8 9 10class LangfuseBaseCallbackHandler: 11 log = logging.getLogger("langfuse") 12 13 def __init__( 14 self, 15 *, 16 public_key: Optional[str] = None, 17 secret_key: Optional[str] = None, 18 host: Optional[str] = None, 19 debug: bool = False, 20 stateful_client: Optional[ 21 Union[StatefulTraceClient, StatefulSpanClient] 22 ] = None, 23 update_stateful_client: bool = False, 24 version: Optional[str] = None, 25 session_id: Optional[str] = None, 26 user_id: Optional[str] = None, 27 trace_name: Optional[str] = None, 28 release: Optional[str] = None, 29 metadata: Optional[Any] = None, 30 tags: Optional[List[str]] = None, 31 threads: Optional[int] = None, 32 flush_at: Optional[int] = None, 33 flush_interval: Optional[int] = None, 34 max_retries: Optional[int] = None, 35 timeout: Optional[int] = None, 36 enabled: Optional[bool] = None, 37 httpx_client: Optional[httpx.Client] = None, 38 sdk_integration: str, 39 sample_rate: Optional[float] = None, 40 ) -> None: 41 self.version = version 42 self.session_id = session_id 43 self.user_id = user_id 44 self.trace_name = trace_name 45 self.release = release 46 self.metadata = metadata 47 self.tags = tags 48 49 self.root_span = None 50 self.update_stateful_client = update_stateful_client 51 self.langfuse = None 52 53 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 54 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 55 prio_host = host or os.environ.get( 56 "LANGFUSE_HOST", "https://cloud.langfuse.com" 57 ) 58 59 prio_sample_rate = ( 60 sample_rate 61 if sample_rate is not None 62 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 63 ) 64 65 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 66 self.trace = stateful_client 67 self._task_manager = stateful_client.task_manager 68 69 return 70 71 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 72 self.root_span = stateful_client 73 self.trace = StatefulTraceClient( 74 stateful_client.client, 75 stateful_client.trace_id, 76 StateType.TRACE, 77 stateful_client.trace_id, 78 stateful_client.task_manager, 79 ) 80 self._task_manager = stateful_client.task_manager 81 82 return 83 84 args = { 85 "public_key": prio_public_key, 86 "secret_key": prio_secret_key, 87 "host": prio_host, 88 "debug": debug, 89 } 90 91 if release is not None: 92 args["release"] = release 93 if threads is not None: 94 args["threads"] = threads 95 if flush_at is not None: 96 args["flush_at"] = flush_at 97 if flush_interval is not None: 98 args["flush_interval"] = flush_interval 99 if max_retries is not None: 100 args["max_retries"] = max_retries 101 if timeout is not None: 102 args["timeout"] = timeout 103 if enabled is not None: 104 args["enabled"] = enabled 105 if httpx_client is not None: 106 args["httpx_client"] = httpx_client 107 if prio_sample_rate is not None: 108 args["sample_rate"] = prio_sample_rate 109 110 args["sdk_integration"] = sdk_integration 111 112 self.langfuse = Langfuse(**args) 113 self.trace: Optional[StatefulTraceClient] = None 114 self._task_manager = self.langfuse.task_manager 115 116 def get_trace_id(self): 117 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 118 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 119 120 Returns: 121 The ID of the current/last trace or None if no trace is available. 122 """ 123 warnings.warn( 124 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 125 DeprecationWarning, 126 ) 127 return self.trace.id if self.trace else None 128 129 def get_trace_url(self): 130 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 131 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 132 133 Returns: 134 The URL of the current/last trace or None if no trace is available. 135 """ 136 warnings.warn( 137 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 138 DeprecationWarning, 139 ) 140 return self.trace.get_trace_url() if self.trace else None 141 142 def flush(self): 143 if self.trace is not None: 144 self.trace.task_manager.flush() 145 elif self.root_span is not None: 146 self.root_span.task_manager.flush() 147 else: 148 self.log.debug("There was no trace yet, hence no flushing possible.") 149 150 def auth_check(self): 151 if self.langfuse is not None: 152 return self.langfuse.auth_check() 153 elif self.trace is not None: 154 projects = self.trace.client.projects.get() 155 if len(projects.data) == 0: 156 raise Exception("No projects found for the keys.") 157 return True 158 elif self.root_span is not None: 159 projects = self.root_span.client.projects.get() 160 if len(projects) == 0: 161 raise Exception("No projects found for the keys.") 162 return True 163 164 return False
class
LangfuseBaseCallbackHandler:
11class LangfuseBaseCallbackHandler: 12 log = logging.getLogger("langfuse") 13 14 def __init__( 15 self, 16 *, 17 public_key: Optional[str] = None, 18 secret_key: Optional[str] = None, 19 host: Optional[str] = None, 20 debug: bool = False, 21 stateful_client: Optional[ 22 Union[StatefulTraceClient, StatefulSpanClient] 23 ] = None, 24 update_stateful_client: bool = False, 25 version: Optional[str] = None, 26 session_id: Optional[str] = None, 27 user_id: Optional[str] = None, 28 trace_name: Optional[str] = None, 29 release: Optional[str] = None, 30 metadata: Optional[Any] = None, 31 tags: Optional[List[str]] = None, 32 threads: Optional[int] = None, 33 flush_at: Optional[int] = None, 34 flush_interval: Optional[int] = None, 35 max_retries: Optional[int] = None, 36 timeout: Optional[int] = None, 37 enabled: Optional[bool] = None, 38 httpx_client: Optional[httpx.Client] = None, 39 sdk_integration: str, 40 sample_rate: Optional[float] = None, 41 ) -> None: 42 self.version = version 43 self.session_id = session_id 44 self.user_id = user_id 45 self.trace_name = trace_name 46 self.release = release 47 self.metadata = metadata 48 self.tags = tags 49 50 self.root_span = None 51 self.update_stateful_client = update_stateful_client 52 self.langfuse = None 53 54 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 55 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 56 prio_host = host or os.environ.get( 57 "LANGFUSE_HOST", "https://cloud.langfuse.com" 58 ) 59 60 prio_sample_rate = ( 61 sample_rate 62 if sample_rate is not None 63 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 64 ) 65 66 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 67 self.trace = stateful_client 68 self._task_manager = stateful_client.task_manager 69 70 return 71 72 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 73 self.root_span = stateful_client 74 self.trace = StatefulTraceClient( 75 stateful_client.client, 76 stateful_client.trace_id, 77 StateType.TRACE, 78 stateful_client.trace_id, 79 stateful_client.task_manager, 80 ) 81 self._task_manager = stateful_client.task_manager 82 83 return 84 85 args = { 86 "public_key": prio_public_key, 87 "secret_key": prio_secret_key, 88 "host": prio_host, 89 "debug": debug, 90 } 91 92 if release is not None: 93 args["release"] = release 94 if threads is not None: 95 args["threads"] = threads 96 if flush_at is not None: 97 args["flush_at"] = flush_at 98 if flush_interval is not None: 99 args["flush_interval"] = flush_interval 100 if max_retries is not None: 101 args["max_retries"] = max_retries 102 if timeout is not None: 103 args["timeout"] = timeout 104 if enabled is not None: 105 args["enabled"] = enabled 106 if httpx_client is not None: 107 args["httpx_client"] = httpx_client 108 if prio_sample_rate is not None: 109 args["sample_rate"] = prio_sample_rate 110 111 args["sdk_integration"] = sdk_integration 112 113 self.langfuse = Langfuse(**args) 114 self.trace: Optional[StatefulTraceClient] = None 115 self._task_manager = self.langfuse.task_manager 116 117 def get_trace_id(self): 118 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 119 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 120 121 Returns: 122 The ID of the current/last trace or None if no trace is available. 123 """ 124 warnings.warn( 125 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 126 DeprecationWarning, 127 ) 128 return self.trace.id if self.trace else None 129 130 def get_trace_url(self): 131 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 132 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 133 134 Returns: 135 The URL of the current/last trace or None if no trace is available. 136 """ 137 warnings.warn( 138 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 139 DeprecationWarning, 140 ) 141 return self.trace.get_trace_url() if self.trace else None 142 143 def flush(self): 144 if self.trace is not None: 145 self.trace.task_manager.flush() 146 elif self.root_span is not None: 147 self.root_span.task_manager.flush() 148 else: 149 self.log.debug("There was no trace yet, hence no flushing possible.") 150 151 def auth_check(self): 152 if self.langfuse is not None: 153 return self.langfuse.auth_check() 154 elif self.trace is not None: 155 projects = self.trace.client.projects.get() 156 if len(projects.data) == 0: 157 raise Exception("No projects found for the keys.") 158 return True 159 elif self.root_span is not None: 160 projects = self.root_span.client.projects.get() 161 if len(projects) == 0: 162 raise Exception("No projects found for the keys.") 163 return True 164 165 return False
LangfuseBaseCallbackHandler( *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, debug: bool = False, stateful_client: Union[langfuse.client.StatefulTraceClient, langfuse.client.StatefulSpanClient, NoneType] = None, update_stateful_client: bool = False, version: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, trace_name: Optional[str] = None, release: Optional[str] = None, metadata: Optional[Any] = None, tags: Optional[List[str]] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, enabled: Optional[bool] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: str, sample_rate: Optional[float] = None)
14 def __init__( 15 self, 16 *, 17 public_key: Optional[str] = None, 18 secret_key: Optional[str] = None, 19 host: Optional[str] = None, 20 debug: bool = False, 21 stateful_client: Optional[ 22 Union[StatefulTraceClient, StatefulSpanClient] 23 ] = None, 24 update_stateful_client: bool = False, 25 version: Optional[str] = None, 26 session_id: Optional[str] = None, 27 user_id: Optional[str] = None, 28 trace_name: Optional[str] = None, 29 release: Optional[str] = None, 30 metadata: Optional[Any] = None, 31 tags: Optional[List[str]] = None, 32 threads: Optional[int] = None, 33 flush_at: Optional[int] = None, 34 flush_interval: Optional[int] = None, 35 max_retries: Optional[int] = None, 36 timeout: Optional[int] = None, 37 enabled: Optional[bool] = None, 38 httpx_client: Optional[httpx.Client] = None, 39 sdk_integration: str, 40 sample_rate: Optional[float] = None, 41 ) -> None: 42 self.version = version 43 self.session_id = session_id 44 self.user_id = user_id 45 self.trace_name = trace_name 46 self.release = release 47 self.metadata = metadata 48 self.tags = tags 49 50 self.root_span = None 51 self.update_stateful_client = update_stateful_client 52 self.langfuse = None 53 54 prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") 55 prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") 56 prio_host = host or os.environ.get( 57 "LANGFUSE_HOST", "https://cloud.langfuse.com" 58 ) 59 60 prio_sample_rate = ( 61 sample_rate 62 if sample_rate is not None 63 else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0)) 64 ) 65 66 if stateful_client and isinstance(stateful_client, StatefulTraceClient): 67 self.trace = stateful_client 68 self._task_manager = stateful_client.task_manager 69 70 return 71 72 elif stateful_client and isinstance(stateful_client, StatefulSpanClient): 73 self.root_span = stateful_client 74 self.trace = StatefulTraceClient( 75 stateful_client.client, 76 stateful_client.trace_id, 77 StateType.TRACE, 78 stateful_client.trace_id, 79 stateful_client.task_manager, 80 ) 81 self._task_manager = stateful_client.task_manager 82 83 return 84 85 args = { 86 "public_key": prio_public_key, 87 "secret_key": prio_secret_key, 88 "host": prio_host, 89 "debug": debug, 90 } 91 92 if release is not None: 93 args["release"] = release 94 if threads is not None: 95 args["threads"] = threads 96 if flush_at is not None: 97 args["flush_at"] = flush_at 98 if flush_interval is not None: 99 args["flush_interval"] = flush_interval 100 if max_retries is not None: 101 args["max_retries"] = max_retries 102 if timeout is not None: 103 args["timeout"] = timeout 104 if enabled is not None: 105 args["enabled"] = enabled 106 if httpx_client is not None: 107 args["httpx_client"] = httpx_client 108 if prio_sample_rate is not None: 109 args["sample_rate"] = prio_sample_rate 110 111 args["sdk_integration"] = sdk_integration 112 113 self.langfuse = Langfuse(**args) 114 self.trace: Optional[StatefulTraceClient] = None 115 self._task_manager = self.langfuse.task_manager
trace: Optional[langfuse.client.StatefulTraceClient]
def
get_trace_id(self):
117 def get_trace_id(self): 118 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 119 Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace. 120 121 Returns: 122 The ID of the current/last trace or None if no trace is available. 123 """ 124 warnings.warn( 125 "get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 126 DeprecationWarning, 127 ) 128 return self.trace.id if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation on how to use interop with the Langfuse SDK to get the id of a trace.
Returns:
The ID of the current/last trace or None if no trace is available.
def
get_trace_url(self):
130 def get_trace_url(self): 131 """This method is deprecated and will be removed in a future version as it is not concurrency-safe. 132 Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information. 133 134 Returns: 135 The URL of the current/last trace or None if no trace is available. 136 """ 137 warnings.warn( 138 "get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.", 139 DeprecationWarning, 140 ) 141 return self.trace.get_trace_url() if self.trace else None
This method is deprecated and will be removed in a future version as it is not concurrency-safe. Please refer to the documentation for more information.
Returns:
The URL of the current/last trace or None if no trace is available.
def
auth_check(self):
151 def auth_check(self): 152 if self.langfuse is not None: 153 return self.langfuse.auth_check() 154 elif self.trace is not None: 155 projects = self.trace.client.projects.get() 156 if len(projects.data) == 0: 157 raise Exception("No projects found for the keys.") 158 return True 159 elif self.root_span is not None: 160 projects = self.root_span.client.projects.get() 161 if len(projects) == 0: 162 raise Exception("No projects found for the keys.") 163 return True 164 165 return False