langfuse.utils.langfuse_singleton
1import httpx 2import threading 3from typing import Optional 4 5 6from langfuse import Langfuse 7 8 9class LangfuseSingleton: 10 _instance = None 11 _lock = threading.Lock() 12 _langfuse: Optional[Langfuse] = None 13 14 def __new__(cls): 15 if not cls._instance: 16 with cls._lock: 17 if not cls._instance: 18 cls._instance = super(LangfuseSingleton, cls).__new__(cls) 19 return cls._instance 20 21 def get( 22 self, 23 *, 24 public_key: Optional[str] = None, 25 secret_key: Optional[str] = None, 26 host: Optional[str] = None, 27 release: Optional[str] = None, 28 debug: Optional[bool] = None, 29 threads: Optional[int] = None, 30 flush_at: Optional[int] = None, 31 flush_interval: Optional[int] = None, 32 max_retries: Optional[int] = None, 33 timeout: Optional[int] = None, 34 httpx_client: Optional[httpx.Client] = None, 35 sdk_integration: Optional[str] = None, 36 enabled: Optional[bool] = None, 37 sample_rate: Optional[float] = None, 38 ) -> Langfuse: 39 if self._langfuse: 40 return self._langfuse 41 42 with self._lock: 43 if self._langfuse: 44 return self._langfuse 45 46 langfuse_init_args = { 47 "public_key": public_key, 48 "secret_key": secret_key, 49 "host": host, 50 "release": release, 51 "debug": debug, 52 "threads": threads, 53 "flush_at": flush_at, 54 "flush_interval": flush_interval, 55 "max_retries": max_retries, 56 "timeout": timeout, 57 "httpx_client": httpx_client, 58 "sdk_integration": sdk_integration, 59 "enabled": enabled, 60 "sample_rate": sample_rate, 61 } 62 63 self._langfuse = Langfuse( 64 **{k: v for k, v in langfuse_init_args.items() if v is not None} 65 ) 66 67 return self._langfuse 68 69 def reset(self) -> None: 70 with self._lock: 71 if self._langfuse: 72 self._langfuse.flush() 73 74 self._langfuse = None
class
LangfuseSingleton:
10class LangfuseSingleton: 11 _instance = None 12 _lock = threading.Lock() 13 _langfuse: Optional[Langfuse] = None 14 15 def __new__(cls): 16 if not cls._instance: 17 with cls._lock: 18 if not cls._instance: 19 cls._instance = super(LangfuseSingleton, cls).__new__(cls) 20 return cls._instance 21 22 def get( 23 self, 24 *, 25 public_key: Optional[str] = None, 26 secret_key: Optional[str] = None, 27 host: Optional[str] = None, 28 release: Optional[str] = None, 29 debug: Optional[bool] = None, 30 threads: Optional[int] = None, 31 flush_at: Optional[int] = None, 32 flush_interval: Optional[int] = None, 33 max_retries: Optional[int] = None, 34 timeout: Optional[int] = None, 35 httpx_client: Optional[httpx.Client] = None, 36 sdk_integration: Optional[str] = None, 37 enabled: Optional[bool] = None, 38 sample_rate: Optional[float] = None, 39 ) -> Langfuse: 40 if self._langfuse: 41 return self._langfuse 42 43 with self._lock: 44 if self._langfuse: 45 return self._langfuse 46 47 langfuse_init_args = { 48 "public_key": public_key, 49 "secret_key": secret_key, 50 "host": host, 51 "release": release, 52 "debug": debug, 53 "threads": threads, 54 "flush_at": flush_at, 55 "flush_interval": flush_interval, 56 "max_retries": max_retries, 57 "timeout": timeout, 58 "httpx_client": httpx_client, 59 "sdk_integration": sdk_integration, 60 "enabled": enabled, 61 "sample_rate": sample_rate, 62 } 63 64 self._langfuse = Langfuse( 65 **{k: v for k, v in langfuse_init_args.items() if v is not None} 66 ) 67 68 return self._langfuse 69 70 def reset(self) -> None: 71 with self._lock: 72 if self._langfuse: 73 self._langfuse.flush() 74 75 self._langfuse = None
def
get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None) -> langfuse.client.Langfuse:
22 def get( 23 self, 24 *, 25 public_key: Optional[str] = None, 26 secret_key: Optional[str] = None, 27 host: Optional[str] = None, 28 release: Optional[str] = None, 29 debug: Optional[bool] = None, 30 threads: Optional[int] = None, 31 flush_at: Optional[int] = None, 32 flush_interval: Optional[int] = None, 33 max_retries: Optional[int] = None, 34 timeout: Optional[int] = None, 35 httpx_client: Optional[httpx.Client] = None, 36 sdk_integration: Optional[str] = None, 37 enabled: Optional[bool] = None, 38 sample_rate: Optional[float] = None, 39 ) -> Langfuse: 40 if self._langfuse: 41 return self._langfuse 42 43 with self._lock: 44 if self._langfuse: 45 return self._langfuse 46 47 langfuse_init_args = { 48 "public_key": public_key, 49 "secret_key": secret_key, 50 "host": host, 51 "release": release, 52 "debug": debug, 53 "threads": threads, 54 "flush_at": flush_at, 55 "flush_interval": flush_interval, 56 "max_retries": max_retries, 57 "timeout": timeout, 58 "httpx_client": httpx_client, 59 "sdk_integration": sdk_integration, 60 "enabled": enabled, 61 "sample_rate": sample_rate, 62 } 63 64 self._langfuse = Langfuse( 65 **{k: v for k, v in langfuse_init_args.items() if v is not None} 66 ) 67 68 return self._langfuse