langfuse._task_manager.task_manager

@private

  1"""@private"""
  2
  3import atexit
  4import logging
  5import queue
  6from queue import Queue
  7from typing import List, Optional
  8
  9from langfuse.api.client import FernLangfuse
 10from langfuse.request import LangfuseClient
 11from langfuse.types import MaskFunction
 12from langfuse.utils import _get_timestamp
 13
 14from .ingestion_consumer import IngestionConsumer
 15from .media_manager import MediaManager
 16from .media_upload_consumer import MediaUploadConsumer
 17
 18
 19class TaskManager(object):
 20    _log = logging.getLogger(__name__)
 21    _ingestion_consumers: List[IngestionConsumer]
 22    _enabled: bool
 23    _threads: int
 24    _max_task_queue_size: int
 25    _ingestion_queue: Queue
 26    _media_upload_queue: Queue
 27    _client: LangfuseClient
 28    _api_client: FernLangfuse
 29    _flush_at: int
 30    _flush_interval: float
 31    _max_retries: int
 32    _public_key: str
 33    _sdk_name: str
 34    _sdk_version: str
 35    _sdk_integration: str
 36    _sample_rate: float
 37    _mask: Optional[MaskFunction]
 38
 39    def __init__(
 40        self,
 41        *,
 42        client: LangfuseClient,
 43        api_client: FernLangfuse,
 44        flush_at: int,
 45        flush_interval: float,
 46        max_retries: int,
 47        threads: int,
 48        public_key: str,
 49        sdk_name: str,
 50        sdk_version: str,
 51        sdk_integration: str,
 52        enabled: bool = True,
 53        max_task_queue_size: int = 100_000,
 54        sample_rate: float = 1,
 55        mask: Optional[MaskFunction] = None,
 56    ):
 57        self._max_task_queue_size = max_task_queue_size
 58        self._threads = threads
 59        self._ingestion_queue = queue.Queue(self._max_task_queue_size)
 60        self._media_upload_queue = Queue(self._max_task_queue_size)
 61        self._media_manager = MediaManager(
 62            api_client=api_client,
 63            media_upload_queue=self._media_upload_queue,
 64            max_retries=max_retries,
 65        )
 66        self._ingestion_consumers = []
 67        self._media_upload_consumers = []
 68        self._client = client
 69        self._api_client = api_client
 70        self._flush_at = flush_at
 71        self._flush_interval = flush_interval
 72        self._max_retries = max_retries
 73        self._public_key = public_key
 74        self._sdk_name = sdk_name
 75        self._sdk_version = sdk_version
 76        self._sdk_integration = sdk_integration
 77        self._enabled = enabled
 78        self._sample_rate = sample_rate
 79        self._mask = mask
 80
 81        self.init_resources()
 82
 83        # cleans up when the python interpreter closes
 84        atexit.register(self.join)
 85
 86    def init_resources(self):
 87        for i in range(self._threads):
 88            ingestion_consumer = IngestionConsumer(
 89                ingestion_queue=self._ingestion_queue,
 90                identifier=i,
 91                client=self._client,
 92                media_manager=self._media_manager,
 93                flush_at=self._flush_at,
 94                flush_interval=self._flush_interval,
 95                max_retries=self._max_retries,
 96                public_key=self._public_key,
 97                sdk_name=self._sdk_name,
 98                sdk_version=self._sdk_version,
 99                sdk_integration=self._sdk_integration,
100                sample_rate=self._sample_rate,
101                mask=self._mask,
102            )
103            ingestion_consumer.start()
104            self._ingestion_consumers.append(ingestion_consumer)
105
106        for i in range(self._threads):
107            media_upload_consumer = MediaUploadConsumer(
108                identifier=i,
109                media_manager=self._media_manager,
110            )
111            media_upload_consumer.start()
112            self._media_upload_consumers.append(media_upload_consumer)
113
114    def add_task(self, event: dict):
115        if not self._enabled:
116            return
117
118        try:
119            event["timestamp"] = _get_timestamp()
120
121            self._ingestion_queue.put(event, block=False)
122        except queue.Full:
123            self._log.warning("analytics-python queue is full")
124            return False
125        except Exception as e:
126            self._log.exception(f"Exception in adding task {e}")
127
128            return False
129
130    def flush(self):
131        """Force a flush from the internal queue to the server."""
132        self._log.debug("flushing ingestion and media upload queues")
133
134        # Ingestion queue
135        ingestion_queue_size = self._ingestion_queue.qsize()
136        self._ingestion_queue.join()
137        self._log.debug(
138            f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue"
139        )
140
141        # Media upload queue
142        media_upload_queue_size = self._media_upload_queue.qsize()
143        self._media_upload_queue.join()
144        self._log.debug(
145            f"Successfully flushed ~{media_upload_queue_size} items from media upload queue"
146        )
147
148    def join(self):
149        """End the consumer threads once the queue is empty.
150
151        Blocks execution until finished
152        """
153        self._log.debug(
154            f"joining {len(self._ingestion_consumers)} ingestion consumer threads"
155        )
156
157        # pause all consumers before joining them so we don't have to wait for multiple
158        # flush intervals to join them all.
159        for ingestion_consumer in self._ingestion_consumers:
160            ingestion_consumer.pause()
161
162        for ingestion_consumer in self._ingestion_consumers:
163            try:
164                ingestion_consumer.join()
165            except RuntimeError:
166                # consumer thread has not started
167                pass
168
169            self._log.debug(
170                f"IngestionConsumer thread {ingestion_consumer._identifier} joined"
171            )
172
173        self._log.debug(
174            f"joining {len(self._media_upload_consumers)} media upload consumer threads"
175        )
176        for media_upload_consumer in self._media_upload_consumers:
177            media_upload_consumer.pause()
178
179        for media_upload_consumer in self._media_upload_consumers:
180            try:
181                media_upload_consumer.join()
182            except RuntimeError:
183                # consumer thread has not started
184                pass
185
186            self._log.debug(
187                f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined"
188            )
189
190    def shutdown(self):
191        """Flush all messages and cleanly shutdown the client."""
192        self._log.debug("shutdown initiated")
193
194        self.flush()
195        self.join()
196
197        self._log.debug("shutdown completed")
class TaskManager:
 20class TaskManager(object):
 21    _log = logging.getLogger(__name__)
 22    _ingestion_consumers: List[IngestionConsumer]
 23    _enabled: bool
 24    _threads: int
 25    _max_task_queue_size: int
 26    _ingestion_queue: Queue
 27    _media_upload_queue: Queue
 28    _client: LangfuseClient
 29    _api_client: FernLangfuse
 30    _flush_at: int
 31    _flush_interval: float
 32    _max_retries: int
 33    _public_key: str
 34    _sdk_name: str
 35    _sdk_version: str
 36    _sdk_integration: str
 37    _sample_rate: float
 38    _mask: Optional[MaskFunction]
 39
 40    def __init__(
 41        self,
 42        *,
 43        client: LangfuseClient,
 44        api_client: FernLangfuse,
 45        flush_at: int,
 46        flush_interval: float,
 47        max_retries: int,
 48        threads: int,
 49        public_key: str,
 50        sdk_name: str,
 51        sdk_version: str,
 52        sdk_integration: str,
 53        enabled: bool = True,
 54        max_task_queue_size: int = 100_000,
 55        sample_rate: float = 1,
 56        mask: Optional[MaskFunction] = None,
 57    ):
 58        self._max_task_queue_size = max_task_queue_size
 59        self._threads = threads
 60        self._ingestion_queue = queue.Queue(self._max_task_queue_size)
 61        self._media_upload_queue = Queue(self._max_task_queue_size)
 62        self._media_manager = MediaManager(
 63            api_client=api_client,
 64            media_upload_queue=self._media_upload_queue,
 65            max_retries=max_retries,
 66        )
 67        self._ingestion_consumers = []
 68        self._media_upload_consumers = []
 69        self._client = client
 70        self._api_client = api_client
 71        self._flush_at = flush_at
 72        self._flush_interval = flush_interval
 73        self._max_retries = max_retries
 74        self._public_key = public_key
 75        self._sdk_name = sdk_name
 76        self._sdk_version = sdk_version
 77        self._sdk_integration = sdk_integration
 78        self._enabled = enabled
 79        self._sample_rate = sample_rate
 80        self._mask = mask
 81
 82        self.init_resources()
 83
 84        # cleans up when the python interpreter closes
 85        atexit.register(self.join)
 86
 87    def init_resources(self):
 88        for i in range(self._threads):
 89            ingestion_consumer = IngestionConsumer(
 90                ingestion_queue=self._ingestion_queue,
 91                identifier=i,
 92                client=self._client,
 93                media_manager=self._media_manager,
 94                flush_at=self._flush_at,
 95                flush_interval=self._flush_interval,
 96                max_retries=self._max_retries,
 97                public_key=self._public_key,
 98                sdk_name=self._sdk_name,
 99                sdk_version=self._sdk_version,
100                sdk_integration=self._sdk_integration,
101                sample_rate=self._sample_rate,
102                mask=self._mask,
103            )
104            ingestion_consumer.start()
105            self._ingestion_consumers.append(ingestion_consumer)
106
107        for i in range(self._threads):
108            media_upload_consumer = MediaUploadConsumer(
109                identifier=i,
110                media_manager=self._media_manager,
111            )
112            media_upload_consumer.start()
113            self._media_upload_consumers.append(media_upload_consumer)
114
115    def add_task(self, event: dict):
116        if not self._enabled:
117            return
118
119        try:
120            event["timestamp"] = _get_timestamp()
121
122            self._ingestion_queue.put(event, block=False)
123        except queue.Full:
124            self._log.warning("analytics-python queue is full")
125            return False
126        except Exception as e:
127            self._log.exception(f"Exception in adding task {e}")
128
129            return False
130
131    def flush(self):
132        """Force a flush from the internal queue to the server."""
133        self._log.debug("flushing ingestion and media upload queues")
134
135        # Ingestion queue
136        ingestion_queue_size = self._ingestion_queue.qsize()
137        self._ingestion_queue.join()
138        self._log.debug(
139            f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue"
140        )
141
142        # Media upload queue
143        media_upload_queue_size = self._media_upload_queue.qsize()
144        self._media_upload_queue.join()
145        self._log.debug(
146            f"Successfully flushed ~{media_upload_queue_size} items from media upload queue"
147        )
148
149    def join(self):
150        """End the consumer threads once the queue is empty.
151
152        Blocks execution until finished
153        """
154        self._log.debug(
155            f"joining {len(self._ingestion_consumers)} ingestion consumer threads"
156        )
157
158        # pause all consumers before joining them so we don't have to wait for multiple
159        # flush intervals to join them all.
160        for ingestion_consumer in self._ingestion_consumers:
161            ingestion_consumer.pause()
162
163        for ingestion_consumer in self._ingestion_consumers:
164            try:
165                ingestion_consumer.join()
166            except RuntimeError:
167                # consumer thread has not started
168                pass
169
170            self._log.debug(
171                f"IngestionConsumer thread {ingestion_consumer._identifier} joined"
172            )
173
174        self._log.debug(
175            f"joining {len(self._media_upload_consumers)} media upload consumer threads"
176        )
177        for media_upload_consumer in self._media_upload_consumers:
178            media_upload_consumer.pause()
179
180        for media_upload_consumer in self._media_upload_consumers:
181            try:
182                media_upload_consumer.join()
183            except RuntimeError:
184                # consumer thread has not started
185                pass
186
187            self._log.debug(
188                f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined"
189            )
190
191    def shutdown(self):
192        """Flush all messages and cleanly shutdown the client."""
193        self._log.debug("shutdown initiated")
194
195        self.flush()
196        self.join()
197
198        self._log.debug("shutdown completed")
TaskManager( *, client: langfuse.request.LangfuseClient, api_client: langfuse.api.client.FernLangfuse, flush_at: int, flush_interval: float, max_retries: int, threads: int, public_key: str, sdk_name: str, sdk_version: str, sdk_integration: str, enabled: bool = True, max_task_queue_size: int = 100000, sample_rate: float = 1, mask: Optional[langfuse.types.MaskFunction] = None)
40    def __init__(
41        self,
42        *,
43        client: LangfuseClient,
44        api_client: FernLangfuse,
45        flush_at: int,
46        flush_interval: float,
47        max_retries: int,
48        threads: int,
49        public_key: str,
50        sdk_name: str,
51        sdk_version: str,
52        sdk_integration: str,
53        enabled: bool = True,
54        max_task_queue_size: int = 100_000,
55        sample_rate: float = 1,
56        mask: Optional[MaskFunction] = None,
57    ):
58        self._max_task_queue_size = max_task_queue_size
59        self._threads = threads
60        self._ingestion_queue = queue.Queue(self._max_task_queue_size)
61        self._media_upload_queue = Queue(self._max_task_queue_size)
62        self._media_manager = MediaManager(
63            api_client=api_client,
64            media_upload_queue=self._media_upload_queue,
65            max_retries=max_retries,
66        )
67        self._ingestion_consumers = []
68        self._media_upload_consumers = []
69        self._client = client
70        self._api_client = api_client
71        self._flush_at = flush_at
72        self._flush_interval = flush_interval
73        self._max_retries = max_retries
74        self._public_key = public_key
75        self._sdk_name = sdk_name
76        self._sdk_version = sdk_version
77        self._sdk_integration = sdk_integration
78        self._enabled = enabled
79        self._sample_rate = sample_rate
80        self._mask = mask
81
82        self.init_resources()
83
84        # cleans up when the python interpreter closes
85        atexit.register(self.join)
def init_resources(self):
 87    def init_resources(self):
 88        for i in range(self._threads):
 89            ingestion_consumer = IngestionConsumer(
 90                ingestion_queue=self._ingestion_queue,
 91                identifier=i,
 92                client=self._client,
 93                media_manager=self._media_manager,
 94                flush_at=self._flush_at,
 95                flush_interval=self._flush_interval,
 96                max_retries=self._max_retries,
 97                public_key=self._public_key,
 98                sdk_name=self._sdk_name,
 99                sdk_version=self._sdk_version,
100                sdk_integration=self._sdk_integration,
101                sample_rate=self._sample_rate,
102                mask=self._mask,
103            )
104            ingestion_consumer.start()
105            self._ingestion_consumers.append(ingestion_consumer)
106
107        for i in range(self._threads):
108            media_upload_consumer = MediaUploadConsumer(
109                identifier=i,
110                media_manager=self._media_manager,
111            )
112            media_upload_consumer.start()
113            self._media_upload_consumers.append(media_upload_consumer)
def add_task(self, event: dict):
115    def add_task(self, event: dict):
116        if not self._enabled:
117            return
118
119        try:
120            event["timestamp"] = _get_timestamp()
121
122            self._ingestion_queue.put(event, block=False)
123        except queue.Full:
124            self._log.warning("analytics-python queue is full")
125            return False
126        except Exception as e:
127            self._log.exception(f"Exception in adding task {e}")
128
129            return False
def flush(self):
131    def flush(self):
132        """Force a flush from the internal queue to the server."""
133        self._log.debug("flushing ingestion and media upload queues")
134
135        # Ingestion queue
136        ingestion_queue_size = self._ingestion_queue.qsize()
137        self._ingestion_queue.join()
138        self._log.debug(
139            f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue"
140        )
141
142        # Media upload queue
143        media_upload_queue_size = self._media_upload_queue.qsize()
144        self._media_upload_queue.join()
145        self._log.debug(
146            f"Successfully flushed ~{media_upload_queue_size} items from media upload queue"
147        )

Force a flush from the internal queue to the server.

def join(self):
149    def join(self):
150        """End the consumer threads once the queue is empty.
151
152        Blocks execution until finished
153        """
154        self._log.debug(
155            f"joining {len(self._ingestion_consumers)} ingestion consumer threads"
156        )
157
158        # pause all consumers before joining them so we don't have to wait for multiple
159        # flush intervals to join them all.
160        for ingestion_consumer in self._ingestion_consumers:
161            ingestion_consumer.pause()
162
163        for ingestion_consumer in self._ingestion_consumers:
164            try:
165                ingestion_consumer.join()
166            except RuntimeError:
167                # consumer thread has not started
168                pass
169
170            self._log.debug(
171                f"IngestionConsumer thread {ingestion_consumer._identifier} joined"
172            )
173
174        self._log.debug(
175            f"joining {len(self._media_upload_consumers)} media upload consumer threads"
176        )
177        for media_upload_consumer in self._media_upload_consumers:
178            media_upload_consumer.pause()
179
180        for media_upload_consumer in self._media_upload_consumers:
181            try:
182                media_upload_consumer.join()
183            except RuntimeError:
184                # consumer thread has not started
185                pass
186
187            self._log.debug(
188                f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined"
189            )

End the consumer threads once the queue is empty.

Blocks execution until finished

def shutdown(self):
191    def shutdown(self):
192        """Flush all messages and cleanly shutdown the client."""
193        self._log.debug("shutdown initiated")
194
195        self.flush()
196        self.join()
197
198        self._log.debug("shutdown completed")

Flush all messages and cleanly shutdown the client.