langfuse.utils.langfuse_singleton

 1import httpx
 2import threading
 3from typing import Optional
 4
 5
 6from langfuse import Langfuse
 7from langfuse.types import MaskFunction
 8
 9
10class LangfuseSingleton:
11    _instance = None
12    _lock = threading.Lock()
13    _langfuse: Optional[Langfuse] = None
14
15    def __new__(cls):
16        if not cls._instance:
17            with cls._lock:
18                if not cls._instance:
19                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
20        return cls._instance
21
22    def get(
23        self,
24        *,
25        public_key: Optional[str] = None,
26        secret_key: Optional[str] = None,
27        host: Optional[str] = None,
28        release: Optional[str] = None,
29        debug: Optional[bool] = None,
30        threads: Optional[int] = None,
31        flush_at: Optional[int] = None,
32        flush_interval: Optional[int] = None,
33        max_retries: Optional[int] = None,
34        timeout: Optional[int] = None,
35        httpx_client: Optional[httpx.Client] = None,
36        sdk_integration: Optional[str] = None,
37        enabled: Optional[bool] = None,
38        sample_rate: Optional[float] = None,
39        mask: Optional[MaskFunction] = None,
40    ) -> Langfuse:
41        if self._langfuse:
42            return self._langfuse
43
44        with self._lock:
45            if self._langfuse:
46                return self._langfuse
47
48            langfuse_init_args = {
49                "public_key": public_key,
50                "secret_key": secret_key,
51                "host": host,
52                "release": release,
53                "debug": debug,
54                "threads": threads,
55                "flush_at": flush_at,
56                "flush_interval": flush_interval,
57                "max_retries": max_retries,
58                "timeout": timeout,
59                "httpx_client": httpx_client,
60                "sdk_integration": sdk_integration,
61                "enabled": enabled,
62                "sample_rate": sample_rate,
63                "mask": mask,
64            }
65
66            self._langfuse = Langfuse(
67                **{k: v for k, v in langfuse_init_args.items() if v is not None}
68            )
69
70            return self._langfuse
71
72    def reset(self) -> None:
73        with self._lock:
74            if self._langfuse:
75                self._langfuse.flush()
76
77            self._langfuse = None
class LangfuseSingleton:
11class LangfuseSingleton:
12    _instance = None
13    _lock = threading.Lock()
14    _langfuse: Optional[Langfuse] = None
15
16    def __new__(cls):
17        if not cls._instance:
18            with cls._lock:
19                if not cls._instance:
20                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
21        return cls._instance
22
23    def get(
24        self,
25        *,
26        public_key: Optional[str] = None,
27        secret_key: Optional[str] = None,
28        host: Optional[str] = None,
29        release: Optional[str] = None,
30        debug: Optional[bool] = None,
31        threads: Optional[int] = None,
32        flush_at: Optional[int] = None,
33        flush_interval: Optional[int] = None,
34        max_retries: Optional[int] = None,
35        timeout: Optional[int] = None,
36        httpx_client: Optional[httpx.Client] = None,
37        sdk_integration: Optional[str] = None,
38        enabled: Optional[bool] = None,
39        sample_rate: Optional[float] = None,
40        mask: Optional[MaskFunction] = None,
41    ) -> Langfuse:
42        if self._langfuse:
43            return self._langfuse
44
45        with self._lock:
46            if self._langfuse:
47                return self._langfuse
48
49            langfuse_init_args = {
50                "public_key": public_key,
51                "secret_key": secret_key,
52                "host": host,
53                "release": release,
54                "debug": debug,
55                "threads": threads,
56                "flush_at": flush_at,
57                "flush_interval": flush_interval,
58                "max_retries": max_retries,
59                "timeout": timeout,
60                "httpx_client": httpx_client,
61                "sdk_integration": sdk_integration,
62                "enabled": enabled,
63                "sample_rate": sample_rate,
64                "mask": mask,
65            }
66
67            self._langfuse = Langfuse(
68                **{k: v for k, v in langfuse_init_args.items() if v is not None}
69            )
70
71            return self._langfuse
72
73    def reset(self) -> None:
74        with self._lock:
75            if self._langfuse:
76                self._langfuse.flush()
77
78            self._langfuse = None
def get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, mask: Optional[langfuse.types.MaskFunction] = None) -> langfuse.client.Langfuse:
23    def get(
24        self,
25        *,
26        public_key: Optional[str] = None,
27        secret_key: Optional[str] = None,
28        host: Optional[str] = None,
29        release: Optional[str] = None,
30        debug: Optional[bool] = None,
31        threads: Optional[int] = None,
32        flush_at: Optional[int] = None,
33        flush_interval: Optional[int] = None,
34        max_retries: Optional[int] = None,
35        timeout: Optional[int] = None,
36        httpx_client: Optional[httpx.Client] = None,
37        sdk_integration: Optional[str] = None,
38        enabled: Optional[bool] = None,
39        sample_rate: Optional[float] = None,
40        mask: Optional[MaskFunction] = None,
41    ) -> Langfuse:
42        if self._langfuse:
43            return self._langfuse
44
45        with self._lock:
46            if self._langfuse:
47                return self._langfuse
48
49            langfuse_init_args = {
50                "public_key": public_key,
51                "secret_key": secret_key,
52                "host": host,
53                "release": release,
54                "debug": debug,
55                "threads": threads,
56                "flush_at": flush_at,
57                "flush_interval": flush_interval,
58                "max_retries": max_retries,
59                "timeout": timeout,
60                "httpx_client": httpx_client,
61                "sdk_integration": sdk_integration,
62                "enabled": enabled,
63                "sample_rate": sample_rate,
64                "mask": mask,
65            }
66
67            self._langfuse = Langfuse(
68                **{k: v for k, v in langfuse_init_args.items() if v is not None}
69            )
70
71            return self._langfuse
def reset(self) -> None:
73    def reset(self) -> None:
74        with self._lock:
75            if self._langfuse:
76                self._langfuse.flush()
77
78            self._langfuse = None