langfuse.task_manager

@private

  1"""@private"""
  2
  3import atexit
  4import json
  5import logging
  6import queue
  7import threading
  8from queue import Empty, Queue
  9import time
 10from typing import List, Any
 11import typing
 12
 13from langfuse.Sampler import Sampler
 14from langfuse.parse_error import handle_exception
 15from langfuse.request import APIError
 16from langfuse.utils import _get_timestamp
 17
 18try:
 19    import pydantic.v1 as pydantic  # type: ignore
 20except ImportError:
 21    import pydantic  # type: ignore
 22
 23
 24import backoff
 25
 26from langfuse.request import LangfuseClient
 27from langfuse.serializer import EventSerializer
 28
 29# largest message size in db is 331_000 bytes right now
 30MAX_MSG_SIZE = 1_000_000
 31
 32# https://vercel.com/docs/functions/serverless-functions/runtimes#request-body-size
 33# The maximum payload size for the request body or the response body of a Serverless Function is 4.5 MB
 34# 4_500_000 Bytes = 4.5 MB
 35# configured to be 3 MB to be safe
 36
 37BATCH_SIZE_LIMIT = 2_500_000
 38
 39
 40class LangfuseMetadata(pydantic.BaseModel):
 41    batch_size: int
 42    sdk_integration: typing.Optional[str] = None
 43    sdk_name: str = None
 44    sdk_version: str = None
 45    public_key: str = None
 46
 47
 48class Consumer(threading.Thread):
 49    _log = logging.getLogger("langfuse")
 50    _queue: Queue
 51    _identifier: int
 52    _client: LangfuseClient
 53    _flush_at: int
 54    _flush_interval: float
 55    _max_retries: int
 56    _public_key: str
 57    _sdk_name: str
 58    _sdk_version: str
 59    _sdk_integration: str
 60
 61    def __init__(
 62        self,
 63        queue: Queue,
 64        identifier: int,
 65        client: LangfuseClient,
 66        flush_at: int,
 67        flush_interval: float,
 68        max_retries: int,
 69        public_key: str,
 70        sdk_name: str,
 71        sdk_version: str,
 72        sdk_integration: str,
 73    ):
 74        """Create a consumer thread."""
 75        threading.Thread.__init__(self)
 76        # Make consumer a daemon thread so that it doesn't block program exit
 77        self.daemon = True
 78        self._queue = queue
 79        # It's important to set running in the constructor: if we are asked to
 80        # pause immediately after construction, we might set running to True in
 81        # run() *after* we set it to False in pause... and keep running
 82        # forever.
 83        self.running = True
 84        self._identifier = identifier
 85        self._client = client
 86        self._flush_at = flush_at
 87        self._flush_interval = flush_interval
 88        self._max_retries = max_retries
 89        self._public_key = public_key
 90        self._sdk_name = sdk_name
 91        self._sdk_version = sdk_version
 92        self._sdk_integration = sdk_integration
 93
 94    def _next(self):
 95        """Return the next batch of items to upload."""
 96        queue = self._queue
 97        items = []
 98
 99        start_time = time.monotonic()
100        total_size = 0
101
102        while len(items) < self._flush_at:
103            elapsed = time.monotonic() - start_time
104            if elapsed >= self._flush_interval:
105                break
106            try:
107                item = queue.get(block=True, timeout=self._flush_interval - elapsed)
108
109                item_size = self._truncate_item_in_place(
110                    item=item,
111                    max_size=MAX_MSG_SIZE,
112                    log_message="<truncated due to size exceeding limit>",
113                )
114
115                items.append(item)
116                total_size += item_size
117                if total_size >= BATCH_SIZE_LIMIT:
118                    self._log.debug("hit batch size limit (size: %d)", total_size)
119                    break
120
121            except Empty:
122                break
123        self._log.debug("~%d items in the Langfuse queue", self._queue.qsize())
124
125        return items
126
127    def _truncate_item_in_place(
128        self,
129        *,
130        item: typing.Any,
131        max_size: int,
132        log_message: typing.Optional[str] = None,
133    ) -> int:
134        """Truncate the item in place to fit within the size limit."""
135        item_size = self._get_item_size(item)
136        self._log.debug(f"item size {item_size}")
137
138        if item_size > max_size:
139            self._log.warning(
140                "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.",
141                item_size,
142            )
143
144            if "body" in item:
145                drop_candidates = ["input", "output", "metadata"]
146                sorted_field_sizes = sorted(
147                    [
148                        (
149                            field,
150                            self._get_item_size((item["body"][field]))
151                            if field in item["body"]
152                            else 0,
153                        )
154                        for field in drop_candidates
155                    ],
156                    key=lambda x: x[1],
157                )
158
159                # drop the largest field until the item size is within the limit
160                for _ in range(len(sorted_field_sizes)):
161                    field_to_drop, size_to_drop = sorted_field_sizes.pop()
162
163                    if field_to_drop not in item["body"]:
164                        continue
165
166                    item["body"][field_to_drop] = log_message
167                    item_size -= size_to_drop
168
169                    self._log.debug(
170                        f"Dropped field {field_to_drop}, new item size {item_size}"
171                    )
172
173                    if item_size <= max_size:
174                        break
175
176            # if item does not have body or input/output fields, drop the event
177            if "body" not in item or (
178                "input" not in item["body"] and "output" not in item["body"]
179            ):
180                self._log.warning(
181                    "Item does not have body or input/output fields, dropping item."
182                )
183                self._queue.task_done()
184                return 0
185
186        return self._get_item_size(item)
187
188    def _get_item_size(self, item: typing.Any) -> int:
189        """Return the size of the item in bytes."""
190        return len(json.dumps(item, cls=EventSerializer).encode())
191
192    def run(self):
193        """Runs the consumer."""
194        self._log.debug("consumer is running...")
195        while self.running:
196            self.upload()
197
198    def upload(self):
199        """Upload the next batch of items, return whether successful."""
200        batch = self._next()
201        if len(batch) == 0:
202            return
203
204        try:
205            self._upload_batch(batch)
206        except Exception as e:
207            handle_exception(e)
208        finally:
209            # mark items as acknowledged from queue
210            for _ in batch:
211                self._queue.task_done()
212
213    def pause(self):
214        """Pause the consumer."""
215        self.running = False
216
217    def _upload_batch(self, batch: List[Any]):
218        self._log.debug("uploading batch of %d items", len(batch))
219
220        metadata = LangfuseMetadata(
221            batch_size=len(batch),
222            sdk_integration=self._sdk_integration,
223            sdk_name=self._sdk_name,
224            sdk_version=self._sdk_version,
225            public_key=self._public_key,
226        ).dict()
227
228        @backoff.on_exception(
229            backoff.expo, Exception, max_tries=self._max_retries, logger=None
230        )
231        def execute_task_with_backoff(batch: List[Any]):
232            try:
233                self._client.batch_post(batch=batch, metadata=metadata)
234            except Exception as e:
235                if (
236                    isinstance(e, APIError)
237                    and 400 <= int(e.status) < 500
238                    and int(e.status) != 429  # retry if rate-limited
239                ):
240                    return
241
242                raise e
243
244        execute_task_with_backoff(batch)
245        self._log.debug("successfully uploaded batch of %d items", len(batch))
246
247
248class TaskManager(object):
249    _log = logging.getLogger("langfuse")
250    _consumers: List[Consumer]
251    _enabled: bool
252    _threads: int
253    _max_task_queue_size: int
254    _queue: Queue
255    _client: LangfuseClient
256    _flush_at: int
257    _flush_interval: float
258    _max_retries: int
259    _public_key: str
260    _sdk_name: str
261    _sdk_version: str
262    _sdk_integration: str
263    _sampler: Sampler
264
265    def __init__(
266        self,
267        client: LangfuseClient,
268        flush_at: int,
269        flush_interval: float,
270        max_retries: int,
271        threads: int,
272        public_key: str,
273        sdk_name: str,
274        sdk_version: str,
275        sdk_integration: str,
276        enabled: bool = True,
277        max_task_queue_size: int = 100_000,
278        sample_rate: float = 1,
279    ):
280        self._max_task_queue_size = max_task_queue_size
281        self._threads = threads
282        self._queue = queue.Queue(self._max_task_queue_size)
283        self._consumers = []
284        self._client = client
285        self._flush_at = flush_at
286        self._flush_interval = flush_interval
287        self._max_retries = max_retries
288        self._public_key = public_key
289        self._sdk_name = sdk_name
290        self._sdk_version = sdk_version
291        self._sdk_integration = sdk_integration
292        self._enabled = enabled
293        self._sampler = Sampler(sample_rate)
294
295        self.init_resources()
296
297        # cleans up when the python interpreter closes
298        atexit.register(self.join)
299
300    def init_resources(self):
301        for i in range(self._threads):
302            consumer = Consumer(
303                queue=self._queue,
304                identifier=i,
305                client=self._client,
306                flush_at=self._flush_at,
307                flush_interval=self._flush_interval,
308                max_retries=self._max_retries,
309                public_key=self._public_key,
310                sdk_name=self._sdk_name,
311                sdk_version=self._sdk_version,
312                sdk_integration=self._sdk_integration,
313            )
314            consumer.start()
315            self._consumers.append(consumer)
316
317    def add_task(self, event: dict):
318        if not self._enabled:
319            return
320
321        try:
322            if not self._sampler.sample_event(event):
323                return  # event was sampled out
324
325            json.dumps(event, cls=EventSerializer)
326            event["timestamp"] = _get_timestamp()
327
328            self._queue.put(event, block=False)
329        except queue.Full:
330            self._log.warning("analytics-python queue is full")
331            return False
332        except Exception as e:
333            self._log.exception(f"Exception in adding task {e}")
334
335            return False
336
337    def flush(self):
338        """Forces a flush from the internal queue to the server"""
339        self._log.debug("flushing queue")
340        queue = self._queue
341        size = queue.qsize()
342        queue.join()
343        # Note that this message may not be precise, because of threading.
344        self._log.debug("successfully flushed about %s items.", size)
345
346    def join(self):
347        """Ends the consumer threads once the queue is empty.
348        Blocks execution until finished
349        """
350        self._log.debug(f"joining {len(self._consumers)} consumer threads")
351
352        # pause all consumers before joining them so we don't have to wait for multiple
353        # flush intervals to join them all.
354        for consumer in self._consumers:
355            consumer.pause()
356
357        for consumer in self._consumers:
358            try:
359                consumer.join()
360            except RuntimeError:
361                # consumer thread has not started
362                pass
363
364            self._log.debug(f"consumer thread {consumer._identifier} joined")
365
366    def shutdown(self):
367        """Flush all messages and cleanly shutdown the client"""
368        self._log.debug("shutdown initiated")
369
370        self.flush()
371        self.join()
372
373        self._log.debug("shutdown completed")
MAX_MSG_SIZE = 1000000
BATCH_SIZE_LIMIT = 2500000
class LangfuseMetadata(pydantic.v1.main.BaseModel):
41class LangfuseMetadata(pydantic.BaseModel):
42    batch_size: int
43    sdk_integration: typing.Optional[str] = None
44    sdk_name: str = None
45    sdk_version: str = None
46    public_key: str = None
batch_size: int
sdk_integration: Optional[str]
sdk_name: str
sdk_version: str
public_key: str
Inherited Members
pydantic.v1.main.BaseModel
BaseModel
Config
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class Consumer(threading.Thread):
 49class Consumer(threading.Thread):
 50    _log = logging.getLogger("langfuse")
 51    _queue: Queue
 52    _identifier: int
 53    _client: LangfuseClient
 54    _flush_at: int
 55    _flush_interval: float
 56    _max_retries: int
 57    _public_key: str
 58    _sdk_name: str
 59    _sdk_version: str
 60    _sdk_integration: str
 61
 62    def __init__(
 63        self,
 64        queue: Queue,
 65        identifier: int,
 66        client: LangfuseClient,
 67        flush_at: int,
 68        flush_interval: float,
 69        max_retries: int,
 70        public_key: str,
 71        sdk_name: str,
 72        sdk_version: str,
 73        sdk_integration: str,
 74    ):
 75        """Create a consumer thread."""
 76        threading.Thread.__init__(self)
 77        # Make consumer a daemon thread so that it doesn't block program exit
 78        self.daemon = True
 79        self._queue = queue
 80        # It's important to set running in the constructor: if we are asked to
 81        # pause immediately after construction, we might set running to True in
 82        # run() *after* we set it to False in pause... and keep running
 83        # forever.
 84        self.running = True
 85        self._identifier = identifier
 86        self._client = client
 87        self._flush_at = flush_at
 88        self._flush_interval = flush_interval
 89        self._max_retries = max_retries
 90        self._public_key = public_key
 91        self._sdk_name = sdk_name
 92        self._sdk_version = sdk_version
 93        self._sdk_integration = sdk_integration
 94
 95    def _next(self):
 96        """Return the next batch of items to upload."""
 97        queue = self._queue
 98        items = []
 99
100        start_time = time.monotonic()
101        total_size = 0
102
103        while len(items) < self._flush_at:
104            elapsed = time.monotonic() - start_time
105            if elapsed >= self._flush_interval:
106                break
107            try:
108                item = queue.get(block=True, timeout=self._flush_interval - elapsed)
109
110                item_size = self._truncate_item_in_place(
111                    item=item,
112                    max_size=MAX_MSG_SIZE,
113                    log_message="<truncated due to size exceeding limit>",
114                )
115
116                items.append(item)
117                total_size += item_size
118                if total_size >= BATCH_SIZE_LIMIT:
119                    self._log.debug("hit batch size limit (size: %d)", total_size)
120                    break
121
122            except Empty:
123                break
124        self._log.debug("~%d items in the Langfuse queue", self._queue.qsize())
125
126        return items
127
128    def _truncate_item_in_place(
129        self,
130        *,
131        item: typing.Any,
132        max_size: int,
133        log_message: typing.Optional[str] = None,
134    ) -> int:
135        """Truncate the item in place to fit within the size limit."""
136        item_size = self._get_item_size(item)
137        self._log.debug(f"item size {item_size}")
138
139        if item_size > max_size:
140            self._log.warning(
141                "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.",
142                item_size,
143            )
144
145            if "body" in item:
146                drop_candidates = ["input", "output", "metadata"]
147                sorted_field_sizes = sorted(
148                    [
149                        (
150                            field,
151                            self._get_item_size((item["body"][field]))
152                            if field in item["body"]
153                            else 0,
154                        )
155                        for field in drop_candidates
156                    ],
157                    key=lambda x: x[1],
158                )
159
160                # drop the largest field until the item size is within the limit
161                for _ in range(len(sorted_field_sizes)):
162                    field_to_drop, size_to_drop = sorted_field_sizes.pop()
163
164                    if field_to_drop not in item["body"]:
165                        continue
166
167                    item["body"][field_to_drop] = log_message
168                    item_size -= size_to_drop
169
170                    self._log.debug(
171                        f"Dropped field {field_to_drop}, new item size {item_size}"
172                    )
173
174                    if item_size <= max_size:
175                        break
176
177            # if item does not have body or input/output fields, drop the event
178            if "body" not in item or (
179                "input" not in item["body"] and "output" not in item["body"]
180            ):
181                self._log.warning(
182                    "Item does not have body or input/output fields, dropping item."
183                )
184                self._queue.task_done()
185                return 0
186
187        return self._get_item_size(item)
188
189    def _get_item_size(self, item: typing.Any) -> int:
190        """Return the size of the item in bytes."""
191        return len(json.dumps(item, cls=EventSerializer).encode())
192
193    def run(self):
194        """Runs the consumer."""
195        self._log.debug("consumer is running...")
196        while self.running:
197            self.upload()
198
199    def upload(self):
200        """Upload the next batch of items, return whether successful."""
201        batch = self._next()
202        if len(batch) == 0:
203            return
204
205        try:
206            self._upload_batch(batch)
207        except Exception as e:
208            handle_exception(e)
209        finally:
210            # mark items as acknowledged from queue
211            for _ in batch:
212                self._queue.task_done()
213
214    def pause(self):
215        """Pause the consumer."""
216        self.running = False
217
218    def _upload_batch(self, batch: List[Any]):
219        self._log.debug("uploading batch of %d items", len(batch))
220
221        metadata = LangfuseMetadata(
222            batch_size=len(batch),
223            sdk_integration=self._sdk_integration,
224            sdk_name=self._sdk_name,
225            sdk_version=self._sdk_version,
226            public_key=self._public_key,
227        ).dict()
228
229        @backoff.on_exception(
230            backoff.expo, Exception, max_tries=self._max_retries, logger=None
231        )
232        def execute_task_with_backoff(batch: List[Any]):
233            try:
234                self._client.batch_post(batch=batch, metadata=metadata)
235            except Exception as e:
236                if (
237                    isinstance(e, APIError)
238                    and 400 <= int(e.status) < 500
239                    and int(e.status) != 429  # retry if rate-limited
240                ):
241                    return
242
243                raise e
244
245        execute_task_with_backoff(batch)
246        self._log.debug("successfully uploaded batch of %d items", len(batch))

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

Consumer( queue: queue.Queue, identifier: int, client: langfuse.request.LangfuseClient, flush_at: int, flush_interval: float, max_retries: int, public_key: str, sdk_name: str, sdk_version: str, sdk_integration: str)
62    def __init__(
63        self,
64        queue: Queue,
65        identifier: int,
66        client: LangfuseClient,
67        flush_at: int,
68        flush_interval: float,
69        max_retries: int,
70        public_key: str,
71        sdk_name: str,
72        sdk_version: str,
73        sdk_integration: str,
74    ):
75        """Create a consumer thread."""
76        threading.Thread.__init__(self)
77        # Make consumer a daemon thread so that it doesn't block program exit
78        self.daemon = True
79        self._queue = queue
80        # It's important to set running in the constructor: if we are asked to
81        # pause immediately after construction, we might set running to True in
82        # run() *after* we set it to False in pause... and keep running
83        # forever.
84        self.running = True
85        self._identifier = identifier
86        self._client = client
87        self._flush_at = flush_at
88        self._flush_interval = flush_interval
89        self._max_retries = max_retries
90        self._public_key = public_key
91        self._sdk_name = sdk_name
92        self._sdk_version = sdk_version
93        self._sdk_integration = sdk_integration

Create a consumer thread.

daemon
1200    @property
1201    def daemon(self):
1202        """A boolean value indicating whether this thread is a daemon thread.
1203
1204        This must be set before start() is called, otherwise RuntimeError is
1205        raised. Its initial value is inherited from the creating thread; the
1206        main thread is not a daemon thread and therefore all threads created in
1207        the main thread default to daemon = False.
1208
1209        The entire Python program exits when only daemon threads are left.
1210
1211        """
1212        assert self._initialized, "Thread.__init__() not called"
1213        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

running
def run(self):
193    def run(self):
194        """Runs the consumer."""
195        self._log.debug("consumer is running...")
196        while self.running:
197            self.upload()

Runs the consumer.

def upload(self):
199    def upload(self):
200        """Upload the next batch of items, return whether successful."""
201        batch = self._next()
202        if len(batch) == 0:
203            return
204
205        try:
206            self._upload_batch(batch)
207        except Exception as e:
208            handle_exception(e)
209        finally:
210            # mark items as acknowledged from queue
211            for _ in batch:
212                self._queue.task_done()

Upload the next batch of items, return whether successful.

def pause(self):
214    def pause(self):
215        """Pause the consumer."""
216        self.running = False

Pause the consumer.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id
class TaskManager:
249class TaskManager(object):
250    _log = logging.getLogger("langfuse")
251    _consumers: List[Consumer]
252    _enabled: bool
253    _threads: int
254    _max_task_queue_size: int
255    _queue: Queue
256    _client: LangfuseClient
257    _flush_at: int
258    _flush_interval: float
259    _max_retries: int
260    _public_key: str
261    _sdk_name: str
262    _sdk_version: str
263    _sdk_integration: str
264    _sampler: Sampler
265
266    def __init__(
267        self,
268        client: LangfuseClient,
269        flush_at: int,
270        flush_interval: float,
271        max_retries: int,
272        threads: int,
273        public_key: str,
274        sdk_name: str,
275        sdk_version: str,
276        sdk_integration: str,
277        enabled: bool = True,
278        max_task_queue_size: int = 100_000,
279        sample_rate: float = 1,
280    ):
281        self._max_task_queue_size = max_task_queue_size
282        self._threads = threads
283        self._queue = queue.Queue(self._max_task_queue_size)
284        self._consumers = []
285        self._client = client
286        self._flush_at = flush_at
287        self._flush_interval = flush_interval
288        self._max_retries = max_retries
289        self._public_key = public_key
290        self._sdk_name = sdk_name
291        self._sdk_version = sdk_version
292        self._sdk_integration = sdk_integration
293        self._enabled = enabled
294        self._sampler = Sampler(sample_rate)
295
296        self.init_resources()
297
298        # cleans up when the python interpreter closes
299        atexit.register(self.join)
300
301    def init_resources(self):
302        for i in range(self._threads):
303            consumer = Consumer(
304                queue=self._queue,
305                identifier=i,
306                client=self._client,
307                flush_at=self._flush_at,
308                flush_interval=self._flush_interval,
309                max_retries=self._max_retries,
310                public_key=self._public_key,
311                sdk_name=self._sdk_name,
312                sdk_version=self._sdk_version,
313                sdk_integration=self._sdk_integration,
314            )
315            consumer.start()
316            self._consumers.append(consumer)
317
318    def add_task(self, event: dict):
319        if not self._enabled:
320            return
321
322        try:
323            if not self._sampler.sample_event(event):
324                return  # event was sampled out
325
326            json.dumps(event, cls=EventSerializer)
327            event["timestamp"] = _get_timestamp()
328
329            self._queue.put(event, block=False)
330        except queue.Full:
331            self._log.warning("analytics-python queue is full")
332            return False
333        except Exception as e:
334            self._log.exception(f"Exception in adding task {e}")
335
336            return False
337
338    def flush(self):
339        """Forces a flush from the internal queue to the server"""
340        self._log.debug("flushing queue")
341        queue = self._queue
342        size = queue.qsize()
343        queue.join()
344        # Note that this message may not be precise, because of threading.
345        self._log.debug("successfully flushed about %s items.", size)
346
347    def join(self):
348        """Ends the consumer threads once the queue is empty.
349        Blocks execution until finished
350        """
351        self._log.debug(f"joining {len(self._consumers)} consumer threads")
352
353        # pause all consumers before joining them so we don't have to wait for multiple
354        # flush intervals to join them all.
355        for consumer in self._consumers:
356            consumer.pause()
357
358        for consumer in self._consumers:
359            try:
360                consumer.join()
361            except RuntimeError:
362                # consumer thread has not started
363                pass
364
365            self._log.debug(f"consumer thread {consumer._identifier} joined")
366
367    def shutdown(self):
368        """Flush all messages and cleanly shutdown the client"""
369        self._log.debug("shutdown initiated")
370
371        self.flush()
372        self.join()
373
374        self._log.debug("shutdown completed")
TaskManager( client: langfuse.request.LangfuseClient, flush_at: int, flush_interval: float, max_retries: int, threads: int, public_key: str, sdk_name: str, sdk_version: str, sdk_integration: str, enabled: bool = True, max_task_queue_size: int = 100000, sample_rate: float = 1)
266    def __init__(
267        self,
268        client: LangfuseClient,
269        flush_at: int,
270        flush_interval: float,
271        max_retries: int,
272        threads: int,
273        public_key: str,
274        sdk_name: str,
275        sdk_version: str,
276        sdk_integration: str,
277        enabled: bool = True,
278        max_task_queue_size: int = 100_000,
279        sample_rate: float = 1,
280    ):
281        self._max_task_queue_size = max_task_queue_size
282        self._threads = threads
283        self._queue = queue.Queue(self._max_task_queue_size)
284        self._consumers = []
285        self._client = client
286        self._flush_at = flush_at
287        self._flush_interval = flush_interval
288        self._max_retries = max_retries
289        self._public_key = public_key
290        self._sdk_name = sdk_name
291        self._sdk_version = sdk_version
292        self._sdk_integration = sdk_integration
293        self._enabled = enabled
294        self._sampler = Sampler(sample_rate)
295
296        self.init_resources()
297
298        # cleans up when the python interpreter closes
299        atexit.register(self.join)
def init_resources(self):
301    def init_resources(self):
302        for i in range(self._threads):
303            consumer = Consumer(
304                queue=self._queue,
305                identifier=i,
306                client=self._client,
307                flush_at=self._flush_at,
308                flush_interval=self._flush_interval,
309                max_retries=self._max_retries,
310                public_key=self._public_key,
311                sdk_name=self._sdk_name,
312                sdk_version=self._sdk_version,
313                sdk_integration=self._sdk_integration,
314            )
315            consumer.start()
316            self._consumers.append(consumer)
def add_task(self, event: dict):
318    def add_task(self, event: dict):
319        if not self._enabled:
320            return
321
322        try:
323            if not self._sampler.sample_event(event):
324                return  # event was sampled out
325
326            json.dumps(event, cls=EventSerializer)
327            event["timestamp"] = _get_timestamp()
328
329            self._queue.put(event, block=False)
330        except queue.Full:
331            self._log.warning("analytics-python queue is full")
332            return False
333        except Exception as e:
334            self._log.exception(f"Exception in adding task {e}")
335
336            return False
def flush(self):
338    def flush(self):
339        """Forces a flush from the internal queue to the server"""
340        self._log.debug("flushing queue")
341        queue = self._queue
342        size = queue.qsize()
343        queue.join()
344        # Note that this message may not be precise, because of threading.
345        self._log.debug("successfully flushed about %s items.", size)

Forces a flush from the internal queue to the server

def join(self):
347    def join(self):
348        """Ends the consumer threads once the queue is empty.
349        Blocks execution until finished
350        """
351        self._log.debug(f"joining {len(self._consumers)} consumer threads")
352
353        # pause all consumers before joining them so we don't have to wait for multiple
354        # flush intervals to join them all.
355        for consumer in self._consumers:
356            consumer.pause()
357
358        for consumer in self._consumers:
359            try:
360                consumer.join()
361            except RuntimeError:
362                # consumer thread has not started
363                pass
364
365            self._log.debug(f"consumer thread {consumer._identifier} joined")

Ends the consumer threads once the queue is empty. Blocks execution until finished

def shutdown(self):
367    def shutdown(self):
368        """Flush all messages and cleanly shutdown the client"""
369        self._log.debug("shutdown initiated")
370
371        self.flush()
372        self.join()
373
374        self._log.debug("shutdown completed")

Flush all messages and cleanly shutdown the client