langfuse.utils.langfuse_singleton
1import httpx 2import threading 3from typing import Optional 4 5 6from langfuse import Langfuse 7from langfuse.types import MaskFunction 8 9 10class LangfuseSingleton: 11 _instance = None 12 _lock = threading.Lock() 13 _langfuse: Optional[Langfuse] = None 14 15 def __new__(cls): 16 if not cls._instance: 17 with cls._lock: 18 if not cls._instance: 19 cls._instance = super(LangfuseSingleton, cls).__new__(cls) 20 return cls._instance 21 22 def get( 23 self, 24 *, 25 public_key: Optional[str] = None, 26 secret_key: Optional[str] = None, 27 host: Optional[str] = None, 28 release: Optional[str] = None, 29 debug: Optional[bool] = None, 30 threads: Optional[int] = None, 31 flush_at: Optional[int] = None, 32 flush_interval: Optional[int] = None, 33 max_retries: Optional[int] = None, 34 timeout: Optional[int] = None, 35 httpx_client: Optional[httpx.Client] = None, 36 sdk_integration: Optional[str] = None, 37 enabled: Optional[bool] = None, 38 sample_rate: Optional[float] = None, 39 mask: Optional[MaskFunction] = None, 40 ) -> Langfuse: 41 if self._langfuse: 42 return self._langfuse 43 44 with self._lock: 45 if self._langfuse: 46 return self._langfuse 47 48 langfuse_init_args = { 49 "public_key": public_key, 50 "secret_key": secret_key, 51 "host": host, 52 "release": release, 53 "debug": debug, 54 "threads": threads, 55 "flush_at": flush_at, 56 "flush_interval": flush_interval, 57 "max_retries": max_retries, 58 "timeout": timeout, 59 "httpx_client": httpx_client, 60 "sdk_integration": sdk_integration, 61 "enabled": enabled, 62 "sample_rate": sample_rate, 63 "mask": mask, 64 } 65 66 self._langfuse = Langfuse( 67 **{k: v for k, v in langfuse_init_args.items() if v is not None} 68 ) 69 70 return self._langfuse 71 72 def reset(self) -> None: 73 with self._lock: 74 if self._langfuse: 75 self._langfuse.flush() 76 77 self._langfuse = None
class
LangfuseSingleton:
11class LangfuseSingleton: 12 _instance = None 13 _lock = threading.Lock() 14 _langfuse: Optional[Langfuse] = None 15 16 def __new__(cls): 17 if not cls._instance: 18 with cls._lock: 19 if not cls._instance: 20 cls._instance = super(LangfuseSingleton, cls).__new__(cls) 21 return cls._instance 22 23 def get( 24 self, 25 *, 26 public_key: Optional[str] = None, 27 secret_key: Optional[str] = None, 28 host: Optional[str] = None, 29 release: Optional[str] = None, 30 debug: Optional[bool] = None, 31 threads: Optional[int] = None, 32 flush_at: Optional[int] = None, 33 flush_interval: Optional[int] = None, 34 max_retries: Optional[int] = None, 35 timeout: Optional[int] = None, 36 httpx_client: Optional[httpx.Client] = None, 37 sdk_integration: Optional[str] = None, 38 enabled: Optional[bool] = None, 39 sample_rate: Optional[float] = None, 40 mask: Optional[MaskFunction] = None, 41 ) -> Langfuse: 42 if self._langfuse: 43 return self._langfuse 44 45 with self._lock: 46 if self._langfuse: 47 return self._langfuse 48 49 langfuse_init_args = { 50 "public_key": public_key, 51 "secret_key": secret_key, 52 "host": host, 53 "release": release, 54 "debug": debug, 55 "threads": threads, 56 "flush_at": flush_at, 57 "flush_interval": flush_interval, 58 "max_retries": max_retries, 59 "timeout": timeout, 60 "httpx_client": httpx_client, 61 "sdk_integration": sdk_integration, 62 "enabled": enabled, 63 "sample_rate": sample_rate, 64 "mask": mask, 65 } 66 67 self._langfuse = Langfuse( 68 **{k: v for k, v in langfuse_init_args.items() if v is not None} 69 ) 70 71 return self._langfuse 72 73 def reset(self) -> None: 74 with self._lock: 75 if self._langfuse: 76 self._langfuse.flush() 77 78 self._langfuse = None
def
get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, mask: Optional[langfuse.types.MaskFunction] = None) -> langfuse.client.Langfuse:
23 def get( 24 self, 25 *, 26 public_key: Optional[str] = None, 27 secret_key: Optional[str] = None, 28 host: Optional[str] = None, 29 release: Optional[str] = None, 30 debug: Optional[bool] = None, 31 threads: Optional[int] = None, 32 flush_at: Optional[int] = None, 33 flush_interval: Optional[int] = None, 34 max_retries: Optional[int] = None, 35 timeout: Optional[int] = None, 36 httpx_client: Optional[httpx.Client] = None, 37 sdk_integration: Optional[str] = None, 38 enabled: Optional[bool] = None, 39 sample_rate: Optional[float] = None, 40 mask: Optional[MaskFunction] = None, 41 ) -> Langfuse: 42 if self._langfuse: 43 return self._langfuse 44 45 with self._lock: 46 if self._langfuse: 47 return self._langfuse 48 49 langfuse_init_args = { 50 "public_key": public_key, 51 "secret_key": secret_key, 52 "host": host, 53 "release": release, 54 "debug": debug, 55 "threads": threads, 56 "flush_at": flush_at, 57 "flush_interval": flush_interval, 58 "max_retries": max_retries, 59 "timeout": timeout, 60 "httpx_client": httpx_client, 61 "sdk_integration": sdk_integration, 62 "enabled": enabled, 63 "sample_rate": sample_rate, 64 "mask": mask, 65 } 66 67 self._langfuse = Langfuse( 68 **{k: v for k, v in langfuse_init_args.items() if v is not None} 69 ) 70 71 return self._langfuse