langfuse.utils.langfuse_singleton

 1import threading
 2from typing import Optional
 3
 4import httpx
 5
 6from langfuse import Langfuse
 7from langfuse.types import MaskFunction
 8
 9
10class LangfuseSingleton:
11    _instance = None
12    _lock = threading.Lock()
13    _langfuse: Optional[Langfuse] = None
14
15    def __new__(cls):
16        if not cls._instance:
17            with cls._lock:
18                if not cls._instance:
19                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
20        return cls._instance
21
22    def get(
23        self,
24        *,
25        public_key: Optional[str] = None,
26        secret_key: Optional[str] = None,
27        host: Optional[str] = None,
28        release: Optional[str] = None,
29        debug: Optional[bool] = None,
30        threads: Optional[int] = None,
31        flush_at: Optional[int] = None,
32        flush_interval: Optional[int] = None,
33        max_retries: Optional[int] = None,
34        timeout: Optional[int] = None,
35        httpx_client: Optional[httpx.Client] = None,
36        sdk_integration: Optional[str] = None,
37        enabled: Optional[bool] = None,
38        sample_rate: Optional[float] = None,
39        mask: Optional[MaskFunction] = None,
40        environment: Optional[str] = None,
41    ) -> Langfuse:
42        if self._langfuse:
43            return self._langfuse
44
45        with self._lock:
46            if self._langfuse:
47                return self._langfuse
48
49            langfuse_init_args = {
50                "public_key": public_key,
51                "secret_key": secret_key,
52                "host": host,
53                "release": release,
54                "debug": debug,
55                "threads": threads,
56                "flush_at": flush_at,
57                "flush_interval": flush_interval,
58                "max_retries": max_retries,
59                "timeout": timeout,
60                "httpx_client": httpx_client,
61                "sdk_integration": sdk_integration,
62                "enabled": enabled,
63                "sample_rate": sample_rate,
64                "mask": mask,
65                "environment": environment,
66            }
67
68            self._langfuse = Langfuse(
69                **{k: v for k, v in langfuse_init_args.items() if v is not None}
70            )
71
72            return self._langfuse
73
74    def reset(self) -> None:
75        with self._lock:
76            if self._langfuse:
77                self._langfuse.shutdown()
78
79            self._langfuse = None
class LangfuseSingleton:
11class LangfuseSingleton:
12    _instance = None
13    _lock = threading.Lock()
14    _langfuse: Optional[Langfuse] = None
15
16    def __new__(cls):
17        if not cls._instance:
18            with cls._lock:
19                if not cls._instance:
20                    cls._instance = super(LangfuseSingleton, cls).__new__(cls)
21        return cls._instance
22
23    def get(
24        self,
25        *,
26        public_key: Optional[str] = None,
27        secret_key: Optional[str] = None,
28        host: Optional[str] = None,
29        release: Optional[str] = None,
30        debug: Optional[bool] = None,
31        threads: Optional[int] = None,
32        flush_at: Optional[int] = None,
33        flush_interval: Optional[int] = None,
34        max_retries: Optional[int] = None,
35        timeout: Optional[int] = None,
36        httpx_client: Optional[httpx.Client] = None,
37        sdk_integration: Optional[str] = None,
38        enabled: Optional[bool] = None,
39        sample_rate: Optional[float] = None,
40        mask: Optional[MaskFunction] = None,
41        environment: Optional[str] = None,
42    ) -> Langfuse:
43        if self._langfuse:
44            return self._langfuse
45
46        with self._lock:
47            if self._langfuse:
48                return self._langfuse
49
50            langfuse_init_args = {
51                "public_key": public_key,
52                "secret_key": secret_key,
53                "host": host,
54                "release": release,
55                "debug": debug,
56                "threads": threads,
57                "flush_at": flush_at,
58                "flush_interval": flush_interval,
59                "max_retries": max_retries,
60                "timeout": timeout,
61                "httpx_client": httpx_client,
62                "sdk_integration": sdk_integration,
63                "enabled": enabled,
64                "sample_rate": sample_rate,
65                "mask": mask,
66                "environment": environment,
67            }
68
69            self._langfuse = Langfuse(
70                **{k: v for k, v in langfuse_init_args.items() if v is not None}
71            )
72
73            return self._langfuse
74
75    def reset(self) -> None:
76        with self._lock:
77            if self._langfuse:
78                self._langfuse.shutdown()
79
80            self._langfuse = None
def get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, mask: Optional[langfuse.types.MaskFunction] = None, environment: Optional[str] = None) -> langfuse.client.Langfuse:
23    def get(
24        self,
25        *,
26        public_key: Optional[str] = None,
27        secret_key: Optional[str] = None,
28        host: Optional[str] = None,
29        release: Optional[str] = None,
30        debug: Optional[bool] = None,
31        threads: Optional[int] = None,
32        flush_at: Optional[int] = None,
33        flush_interval: Optional[int] = None,
34        max_retries: Optional[int] = None,
35        timeout: Optional[int] = None,
36        httpx_client: Optional[httpx.Client] = None,
37        sdk_integration: Optional[str] = None,
38        enabled: Optional[bool] = None,
39        sample_rate: Optional[float] = None,
40        mask: Optional[MaskFunction] = None,
41        environment: Optional[str] = None,
42    ) -> Langfuse:
43        if self._langfuse:
44            return self._langfuse
45
46        with self._lock:
47            if self._langfuse:
48                return self._langfuse
49
50            langfuse_init_args = {
51                "public_key": public_key,
52                "secret_key": secret_key,
53                "host": host,
54                "release": release,
55                "debug": debug,
56                "threads": threads,
57                "flush_at": flush_at,
58                "flush_interval": flush_interval,
59                "max_retries": max_retries,
60                "timeout": timeout,
61                "httpx_client": httpx_client,
62                "sdk_integration": sdk_integration,
63                "enabled": enabled,
64                "sample_rate": sample_rate,
65                "mask": mask,
66                "environment": environment,
67            }
68
69            self._langfuse = Langfuse(
70                **{k: v for k, v in langfuse_init_args.items() if v is not None}
71            )
72
73            return self._langfuse
def reset(self) -> None:
75    def reset(self) -> None:
76        with self._lock:
77            if self._langfuse:
78                self._langfuse.shutdown()
79
80            self._langfuse = None