langfuse.utils.langfuse_singleton

 1import httpx
 2import threading
 3from typing import Optional
 4
 5
 6from langfuse import Langfuse
 7
 8
 9class LangfuseSingleton:
10    _instance = None
11    _lock = threading.Lock()
12    _langfuse: Optional[Langfuse] = None
13
14    def __new__(cls):
15        if not cls._instance:
16            with cls._lock:
17                if not cls._instance:
18                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
19        return cls._instance
20
21    def get(
22        self,
23        *,
24        public_key: Optional[str] = None,
25        secret_key: Optional[str] = None,
26        host: Optional[str] = None,
27        release: Optional[str] = None,
28        debug: Optional[bool] = None,
29        threads: Optional[int] = None,
30        flush_at: Optional[int] = None,
31        flush_interval: Optional[int] = None,
32        max_retries: Optional[int] = None,
33        timeout: Optional[int] = None,
34        httpx_client: Optional[httpx.Client] = None,
35        sdk_integration: Optional[str] = None,
36        enabled: Optional[bool] = None,
37        sample_rate: Optional[float] = None,
38    ) -> Langfuse:
39        if self._langfuse:
40            return self._langfuse
41
42        with self._lock:
43            if self._langfuse:
44                return self._langfuse
45
46            langfuse_init_args = {
47                "public_key": public_key,
48                "secret_key": secret_key,
49                "host": host,
50                "release": release,
51                "debug": debug,
52                "threads": threads,
53                "flush_at": flush_at,
54                "flush_interval": flush_interval,
55                "max_retries": max_retries,
56                "timeout": timeout,
57                "httpx_client": httpx_client,
58                "sdk_integration": sdk_integration,
59                "enabled": enabled,
60                "sample_rate": sample_rate,
61            }
62
63            self._langfuse = Langfuse(
64                **{k: v for k, v in langfuse_init_args.items() if v is not None}
65            )
66
67            return self._langfuse
68
69    def reset(self) -> None:
70        with self._lock:
71            if self._langfuse:
72                self._langfuse.flush()
73
74            self._langfuse = None
class LangfuseSingleton:
10class LangfuseSingleton:
11    _instance = None
12    _lock = threading.Lock()
13    _langfuse: Optional[Langfuse] = None
14
15    def __new__(cls):
16        if not cls._instance:
17            with cls._lock:
18                if not cls._instance:
19                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
20        return cls._instance
21
22    def get(
23        self,
24        *,
25        public_key: Optional[str] = None,
26        secret_key: Optional[str] = None,
27        host: Optional[str] = None,
28        release: Optional[str] = None,
29        debug: Optional[bool] = None,
30        threads: Optional[int] = None,
31        flush_at: Optional[int] = None,
32        flush_interval: Optional[int] = None,
33        max_retries: Optional[int] = None,
34        timeout: Optional[int] = None,
35        httpx_client: Optional[httpx.Client] = None,
36        sdk_integration: Optional[str] = None,
37        enabled: Optional[bool] = None,
38        sample_rate: Optional[float] = None,
39    ) -> Langfuse:
40        if self._langfuse:
41            return self._langfuse
42
43        with self._lock:
44            if self._langfuse:
45                return self._langfuse
46
47            langfuse_init_args = {
48                "public_key": public_key,
49                "secret_key": secret_key,
50                "host": host,
51                "release": release,
52                "debug": debug,
53                "threads": threads,
54                "flush_at": flush_at,
55                "flush_interval": flush_interval,
56                "max_retries": max_retries,
57                "timeout": timeout,
58                "httpx_client": httpx_client,
59                "sdk_integration": sdk_integration,
60                "enabled": enabled,
61                "sample_rate": sample_rate,
62            }
63
64            self._langfuse = Langfuse(
65                **{k: v for k, v in langfuse_init_args.items() if v is not None}
66            )
67
68            return self._langfuse
69
70    def reset(self) -> None:
71        with self._lock:
72            if self._langfuse:
73                self._langfuse.flush()
74
75            self._langfuse = None
def get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None) -> langfuse.client.Langfuse:
22    def get(
23        self,
24        *,
25        public_key: Optional[str] = None,
26        secret_key: Optional[str] = None,
27        host: Optional[str] = None,
28        release: Optional[str] = None,
29        debug: Optional[bool] = None,
30        threads: Optional[int] = None,
31        flush_at: Optional[int] = None,
32        flush_interval: Optional[int] = None,
33        max_retries: Optional[int] = None,
34        timeout: Optional[int] = None,
35        httpx_client: Optional[httpx.Client] = None,
36        sdk_integration: Optional[str] = None,
37        enabled: Optional[bool] = None,
38        sample_rate: Optional[float] = None,
39    ) -> Langfuse:
40        if self._langfuse:
41            return self._langfuse
42
43        with self._lock:
44            if self._langfuse:
45                return self._langfuse
46
47            langfuse_init_args = {
48                "public_key": public_key,
49                "secret_key": secret_key,
50                "host": host,
51                "release": release,
52                "debug": debug,
53                "threads": threads,
54                "flush_at": flush_at,
55                "flush_interval": flush_interval,
56                "max_retries": max_retries,
57                "timeout": timeout,
58                "httpx_client": httpx_client,
59                "sdk_integration": sdk_integration,
60                "enabled": enabled,
61                "sample_rate": sample_rate,
62            }
63
64            self._langfuse = Langfuse(
65                **{k: v for k, v in langfuse_init_args.items() if v is not None}
66            )
67
68            return self._langfuse
def reset(self) -> None:
70    def reset(self) -> None:
71        with self._lock:
72            if self._langfuse:
73                self._langfuse.flush()
74
75            self._langfuse = None