langfuse.task_manager
@private
1"""@private""" 2 3import atexit 4import json 5import logging 6import queue 7import threading 8from queue import Empty, Queue 9import time 10from typing import List, Any 11import typing 12 13from langfuse.Sampler import Sampler 14from langfuse.parse_error import handle_exception 15from langfuse.request import APIError 16from langfuse.utils import _get_timestamp 17 18try: 19 import pydantic.v1 as pydantic # type: ignore 20except ImportError: 21 import pydantic # type: ignore 22 23 24import backoff 25 26from langfuse.request import LangfuseClient 27from langfuse.serializer import EventSerializer 28 29# largest message size in db is 331_000 bytes right now 30MAX_MSG_SIZE = 1_000_000 31 32# https://vercel.com/docs/functions/serverless-functions/runtimes#request-body-size 33# The maximum payload size for the request body or the response body of a Serverless Function is 4.5 MB 34# 4_500_000 Bytes = 4.5 MB 35# configured to be 3 MB to be safe 36 37BATCH_SIZE_LIMIT = 2_500_000 38 39 40class LangfuseMetadata(pydantic.BaseModel): 41 batch_size: int 42 sdk_integration: typing.Optional[str] = None 43 sdk_name: str = None 44 sdk_version: str = None 45 public_key: str = None 46 47 48class Consumer(threading.Thread): 49 _log = logging.getLogger("langfuse") 50 _queue: Queue 51 _identifier: int 52 _client: LangfuseClient 53 _flush_at: int 54 _flush_interval: float 55 _max_retries: int 56 _public_key: str 57 _sdk_name: str 58 _sdk_version: str 59 _sdk_integration: str 60 61 def __init__( 62 self, 63 queue: Queue, 64 identifier: int, 65 client: LangfuseClient, 66 flush_at: int, 67 flush_interval: float, 68 max_retries: int, 69 public_key: str, 70 sdk_name: str, 71 sdk_version: str, 72 sdk_integration: str, 73 ): 74 """Create a consumer thread.""" 75 threading.Thread.__init__(self) 76 # Make consumer a daemon thread so that it doesn't block program exit 77 self.daemon = True 78 self._queue = queue 79 # It's important to set running in the constructor: if we are asked to 80 # pause immediately after construction, we might set running to True in 81 # run() *after* we set it to False in pause... and keep running 82 # forever. 83 self.running = True 84 self._identifier = identifier 85 self._client = client 86 self._flush_at = flush_at 87 self._flush_interval = flush_interval 88 self._max_retries = max_retries 89 self._public_key = public_key 90 self._sdk_name = sdk_name 91 self._sdk_version = sdk_version 92 self._sdk_integration = sdk_integration 93 94 def _next(self): 95 """Return the next batch of items to upload.""" 96 queue = self._queue 97 items = [] 98 99 start_time = time.monotonic() 100 total_size = 0 101 102 while len(items) < self._flush_at: 103 elapsed = time.monotonic() - start_time 104 if elapsed >= self._flush_interval: 105 break 106 try: 107 item = queue.get(block=True, timeout=self._flush_interval - elapsed) 108 109 item_size = self._truncate_item_in_place( 110 item=item, 111 max_size=MAX_MSG_SIZE, 112 log_message="<truncated due to size exceeding limit>", 113 ) 114 115 items.append(item) 116 total_size += item_size 117 if total_size >= BATCH_SIZE_LIMIT: 118 self._log.debug("hit batch size limit (size: %d)", total_size) 119 break 120 121 except Empty: 122 break 123 self._log.debug("~%d items in the Langfuse queue", self._queue.qsize()) 124 125 return items 126 127 def _truncate_item_in_place( 128 self, 129 *, 130 item: typing.Any, 131 max_size: int, 132 log_message: typing.Optional[str] = None, 133 ) -> int: 134 """Truncate the item in place to fit within the size limit.""" 135 item_size = self._get_item_size(item) 136 self._log.debug(f"item size {item_size}") 137 138 if item_size > max_size: 139 self._log.warning( 140 "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.", 141 item_size, 142 ) 143 144 if "body" in item: 145 drop_candidates = ["input", "output", "metadata"] 146 sorted_field_sizes = sorted( 147 [ 148 ( 149 field, 150 self._get_item_size((item["body"][field])) 151 if field in item["body"] 152 else 0, 153 ) 154 for field in drop_candidates 155 ], 156 key=lambda x: x[1], 157 ) 158 159 # drop the largest field until the item size is within the limit 160 for _ in range(len(sorted_field_sizes)): 161 field_to_drop, size_to_drop = sorted_field_sizes.pop() 162 163 if field_to_drop not in item["body"]: 164 continue 165 166 item["body"][field_to_drop] = log_message 167 item_size -= size_to_drop 168 169 self._log.debug( 170 f"Dropped field {field_to_drop}, new item size {item_size}" 171 ) 172 173 if item_size <= max_size: 174 break 175 176 # if item does not have body or input/output fields, drop the event 177 if "body" not in item or ( 178 "input" not in item["body"] and "output" not in item["body"] 179 ): 180 self._log.warning( 181 "Item does not have body or input/output fields, dropping item." 182 ) 183 self._queue.task_done() 184 return 0 185 186 return self._get_item_size(item) 187 188 def _get_item_size(self, item: typing.Any) -> int: 189 """Return the size of the item in bytes.""" 190 return len(json.dumps(item, cls=EventSerializer).encode()) 191 192 def run(self): 193 """Runs the consumer.""" 194 self._log.debug("consumer is running...") 195 while self.running: 196 self.upload() 197 198 def upload(self): 199 """Upload the next batch of items, return whether successful.""" 200 batch = self._next() 201 if len(batch) == 0: 202 return 203 204 try: 205 self._upload_batch(batch) 206 except Exception as e: 207 handle_exception(e) 208 finally: 209 # mark items as acknowledged from queue 210 for _ in batch: 211 self._queue.task_done() 212 213 def pause(self): 214 """Pause the consumer.""" 215 self.running = False 216 217 def _upload_batch(self, batch: List[Any]): 218 self._log.debug("uploading batch of %d items", len(batch)) 219 220 metadata = LangfuseMetadata( 221 batch_size=len(batch), 222 sdk_integration=self._sdk_integration, 223 sdk_name=self._sdk_name, 224 sdk_version=self._sdk_version, 225 public_key=self._public_key, 226 ).dict() 227 228 @backoff.on_exception( 229 backoff.expo, Exception, max_tries=self._max_retries, logger=None 230 ) 231 def execute_task_with_backoff(batch: List[Any]): 232 try: 233 self._client.batch_post(batch=batch, metadata=metadata) 234 except Exception as e: 235 if ( 236 isinstance(e, APIError) 237 and 400 <= int(e.status) < 500 238 and int(e.status) != 429 # retry if rate-limited 239 ): 240 return 241 242 raise e 243 244 execute_task_with_backoff(batch) 245 self._log.debug("successfully uploaded batch of %d items", len(batch)) 246 247 248class TaskManager(object): 249 _log = logging.getLogger("langfuse") 250 _consumers: List[Consumer] 251 _enabled: bool 252 _threads: int 253 _max_task_queue_size: int 254 _queue: Queue 255 _client: LangfuseClient 256 _flush_at: int 257 _flush_interval: float 258 _max_retries: int 259 _public_key: str 260 _sdk_name: str 261 _sdk_version: str 262 _sdk_integration: str 263 _sampler: Sampler 264 265 def __init__( 266 self, 267 client: LangfuseClient, 268 flush_at: int, 269 flush_interval: float, 270 max_retries: int, 271 threads: int, 272 public_key: str, 273 sdk_name: str, 274 sdk_version: str, 275 sdk_integration: str, 276 enabled: bool = True, 277 max_task_queue_size: int = 100_000, 278 sample_rate: float = 1, 279 ): 280 self._max_task_queue_size = max_task_queue_size 281 self._threads = threads 282 self._queue = queue.Queue(self._max_task_queue_size) 283 self._consumers = [] 284 self._client = client 285 self._flush_at = flush_at 286 self._flush_interval = flush_interval 287 self._max_retries = max_retries 288 self._public_key = public_key 289 self._sdk_name = sdk_name 290 self._sdk_version = sdk_version 291 self._sdk_integration = sdk_integration 292 self._enabled = enabled 293 self._sampler = Sampler(sample_rate) 294 295 self.init_resources() 296 297 # cleans up when the python interpreter closes 298 atexit.register(self.join) 299 300 def init_resources(self): 301 for i in range(self._threads): 302 consumer = Consumer( 303 queue=self._queue, 304 identifier=i, 305 client=self._client, 306 flush_at=self._flush_at, 307 flush_interval=self._flush_interval, 308 max_retries=self._max_retries, 309 public_key=self._public_key, 310 sdk_name=self._sdk_name, 311 sdk_version=self._sdk_version, 312 sdk_integration=self._sdk_integration, 313 ) 314 consumer.start() 315 self._consumers.append(consumer) 316 317 def add_task(self, event: dict): 318 if not self._enabled: 319 return 320 321 try: 322 if not self._sampler.sample_event(event): 323 return # event was sampled out 324 325 json.dumps(event, cls=EventSerializer) 326 event["timestamp"] = _get_timestamp() 327 328 self._queue.put(event, block=False) 329 except queue.Full: 330 self._log.warning("analytics-python queue is full") 331 return False 332 except Exception as e: 333 self._log.exception(f"Exception in adding task {e}") 334 335 return False 336 337 def flush(self): 338 """Forces a flush from the internal queue to the server""" 339 self._log.debug("flushing queue") 340 queue = self._queue 341 size = queue.qsize() 342 queue.join() 343 # Note that this message may not be precise, because of threading. 344 self._log.debug("successfully flushed about %s items.", size) 345 346 def join(self): 347 """Ends the consumer threads once the queue is empty. 348 Blocks execution until finished 349 """ 350 self._log.debug(f"joining {len(self._consumers)} consumer threads") 351 352 # pause all consumers before joining them so we don't have to wait for multiple 353 # flush intervals to join them all. 354 for consumer in self._consumers: 355 consumer.pause() 356 357 for consumer in self._consumers: 358 try: 359 consumer.join() 360 except RuntimeError: 361 # consumer thread has not started 362 pass 363 364 self._log.debug(f"consumer thread {consumer._identifier} joined") 365 366 def shutdown(self): 367 """Flush all messages and cleanly shutdown the client""" 368 self._log.debug("shutdown initiated") 369 370 self.flush() 371 self.join() 372 373 self._log.debug("shutdown completed")
41class LangfuseMetadata(pydantic.BaseModel): 42 batch_size: int 43 sdk_integration: typing.Optional[str] = None 44 sdk_name: str = None 45 sdk_version: str = None 46 public_key: str = None
Inherited Members
- pydantic.v1.main.BaseModel
- BaseModel
- Config
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
49class Consumer(threading.Thread): 50 _log = logging.getLogger("langfuse") 51 _queue: Queue 52 _identifier: int 53 _client: LangfuseClient 54 _flush_at: int 55 _flush_interval: float 56 _max_retries: int 57 _public_key: str 58 _sdk_name: str 59 _sdk_version: str 60 _sdk_integration: str 61 62 def __init__( 63 self, 64 queue: Queue, 65 identifier: int, 66 client: LangfuseClient, 67 flush_at: int, 68 flush_interval: float, 69 max_retries: int, 70 public_key: str, 71 sdk_name: str, 72 sdk_version: str, 73 sdk_integration: str, 74 ): 75 """Create a consumer thread.""" 76 threading.Thread.__init__(self) 77 # Make consumer a daemon thread so that it doesn't block program exit 78 self.daemon = True 79 self._queue = queue 80 # It's important to set running in the constructor: if we are asked to 81 # pause immediately after construction, we might set running to True in 82 # run() *after* we set it to False in pause... and keep running 83 # forever. 84 self.running = True 85 self._identifier = identifier 86 self._client = client 87 self._flush_at = flush_at 88 self._flush_interval = flush_interval 89 self._max_retries = max_retries 90 self._public_key = public_key 91 self._sdk_name = sdk_name 92 self._sdk_version = sdk_version 93 self._sdk_integration = sdk_integration 94 95 def _next(self): 96 """Return the next batch of items to upload.""" 97 queue = self._queue 98 items = [] 99 100 start_time = time.monotonic() 101 total_size = 0 102 103 while len(items) < self._flush_at: 104 elapsed = time.monotonic() - start_time 105 if elapsed >= self._flush_interval: 106 break 107 try: 108 item = queue.get(block=True, timeout=self._flush_interval - elapsed) 109 110 item_size = self._truncate_item_in_place( 111 item=item, 112 max_size=MAX_MSG_SIZE, 113 log_message="<truncated due to size exceeding limit>", 114 ) 115 116 items.append(item) 117 total_size += item_size 118 if total_size >= BATCH_SIZE_LIMIT: 119 self._log.debug("hit batch size limit (size: %d)", total_size) 120 break 121 122 except Empty: 123 break 124 self._log.debug("~%d items in the Langfuse queue", self._queue.qsize()) 125 126 return items 127 128 def _truncate_item_in_place( 129 self, 130 *, 131 item: typing.Any, 132 max_size: int, 133 log_message: typing.Optional[str] = None, 134 ) -> int: 135 """Truncate the item in place to fit within the size limit.""" 136 item_size = self._get_item_size(item) 137 self._log.debug(f"item size {item_size}") 138 139 if item_size > max_size: 140 self._log.warning( 141 "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.", 142 item_size, 143 ) 144 145 if "body" in item: 146 drop_candidates = ["input", "output", "metadata"] 147 sorted_field_sizes = sorted( 148 [ 149 ( 150 field, 151 self._get_item_size((item["body"][field])) 152 if field in item["body"] 153 else 0, 154 ) 155 for field in drop_candidates 156 ], 157 key=lambda x: x[1], 158 ) 159 160 # drop the largest field until the item size is within the limit 161 for _ in range(len(sorted_field_sizes)): 162 field_to_drop, size_to_drop = sorted_field_sizes.pop() 163 164 if field_to_drop not in item["body"]: 165 continue 166 167 item["body"][field_to_drop] = log_message 168 item_size -= size_to_drop 169 170 self._log.debug( 171 f"Dropped field {field_to_drop}, new item size {item_size}" 172 ) 173 174 if item_size <= max_size: 175 break 176 177 # if item does not have body or input/output fields, drop the event 178 if "body" not in item or ( 179 "input" not in item["body"] and "output" not in item["body"] 180 ): 181 self._log.warning( 182 "Item does not have body or input/output fields, dropping item." 183 ) 184 self._queue.task_done() 185 return 0 186 187 return self._get_item_size(item) 188 189 def _get_item_size(self, item: typing.Any) -> int: 190 """Return the size of the item in bytes.""" 191 return len(json.dumps(item, cls=EventSerializer).encode()) 192 193 def run(self): 194 """Runs the consumer.""" 195 self._log.debug("consumer is running...") 196 while self.running: 197 self.upload() 198 199 def upload(self): 200 """Upload the next batch of items, return whether successful.""" 201 batch = self._next() 202 if len(batch) == 0: 203 return 204 205 try: 206 self._upload_batch(batch) 207 except Exception as e: 208 handle_exception(e) 209 finally: 210 # mark items as acknowledged from queue 211 for _ in batch: 212 self._queue.task_done() 213 214 def pause(self): 215 """Pause the consumer.""" 216 self.running = False 217 218 def _upload_batch(self, batch: List[Any]): 219 self._log.debug("uploading batch of %d items", len(batch)) 220 221 metadata = LangfuseMetadata( 222 batch_size=len(batch), 223 sdk_integration=self._sdk_integration, 224 sdk_name=self._sdk_name, 225 sdk_version=self._sdk_version, 226 public_key=self._public_key, 227 ).dict() 228 229 @backoff.on_exception( 230 backoff.expo, Exception, max_tries=self._max_retries, logger=None 231 ) 232 def execute_task_with_backoff(batch: List[Any]): 233 try: 234 self._client.batch_post(batch=batch, metadata=metadata) 235 except Exception as e: 236 if ( 237 isinstance(e, APIError) 238 and 400 <= int(e.status) < 500 239 and int(e.status) != 429 # retry if rate-limited 240 ): 241 return 242 243 raise e 244 245 execute_task_with_backoff(batch) 246 self._log.debug("successfully uploaded batch of %d items", len(batch))
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
62 def __init__( 63 self, 64 queue: Queue, 65 identifier: int, 66 client: LangfuseClient, 67 flush_at: int, 68 flush_interval: float, 69 max_retries: int, 70 public_key: str, 71 sdk_name: str, 72 sdk_version: str, 73 sdk_integration: str, 74 ): 75 """Create a consumer thread.""" 76 threading.Thread.__init__(self) 77 # Make consumer a daemon thread so that it doesn't block program exit 78 self.daemon = True 79 self._queue = queue 80 # It's important to set running in the constructor: if we are asked to 81 # pause immediately after construction, we might set running to True in 82 # run() *after* we set it to False in pause... and keep running 83 # forever. 84 self.running = True 85 self._identifier = identifier 86 self._client = client 87 self._flush_at = flush_at 88 self._flush_interval = flush_interval 89 self._max_retries = max_retries 90 self._public_key = public_key 91 self._sdk_name = sdk_name 92 self._sdk_version = sdk_version 93 self._sdk_integration = sdk_integration
Create a consumer thread.
1200 @property 1201 def daemon(self): 1202 """A boolean value indicating whether this thread is a daemon thread. 1203 1204 This must be set before start() is called, otherwise RuntimeError is 1205 raised. Its initial value is inherited from the creating thread; the 1206 main thread is not a daemon thread and therefore all threads created in 1207 the main thread default to daemon = False. 1208 1209 The entire Python program exits when only daemon threads are left. 1210 1211 """ 1212 assert self._initialized, "Thread.__init__() not called" 1213 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
193 def run(self): 194 """Runs the consumer.""" 195 self._log.debug("consumer is running...") 196 while self.running: 197 self.upload()
Runs the consumer.
199 def upload(self): 200 """Upload the next batch of items, return whether successful.""" 201 batch = self._next() 202 if len(batch) == 0: 203 return 204 205 try: 206 self._upload_batch(batch) 207 except Exception as e: 208 handle_exception(e) 209 finally: 210 # mark items as acknowledged from queue 211 for _ in batch: 212 self._queue.task_done()
Upload the next batch of items, return whether successful.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id
249class TaskManager(object): 250 _log = logging.getLogger("langfuse") 251 _consumers: List[Consumer] 252 _enabled: bool 253 _threads: int 254 _max_task_queue_size: int 255 _queue: Queue 256 _client: LangfuseClient 257 _flush_at: int 258 _flush_interval: float 259 _max_retries: int 260 _public_key: str 261 _sdk_name: str 262 _sdk_version: str 263 _sdk_integration: str 264 _sampler: Sampler 265 266 def __init__( 267 self, 268 client: LangfuseClient, 269 flush_at: int, 270 flush_interval: float, 271 max_retries: int, 272 threads: int, 273 public_key: str, 274 sdk_name: str, 275 sdk_version: str, 276 sdk_integration: str, 277 enabled: bool = True, 278 max_task_queue_size: int = 100_000, 279 sample_rate: float = 1, 280 ): 281 self._max_task_queue_size = max_task_queue_size 282 self._threads = threads 283 self._queue = queue.Queue(self._max_task_queue_size) 284 self._consumers = [] 285 self._client = client 286 self._flush_at = flush_at 287 self._flush_interval = flush_interval 288 self._max_retries = max_retries 289 self._public_key = public_key 290 self._sdk_name = sdk_name 291 self._sdk_version = sdk_version 292 self._sdk_integration = sdk_integration 293 self._enabled = enabled 294 self._sampler = Sampler(sample_rate) 295 296 self.init_resources() 297 298 # cleans up when the python interpreter closes 299 atexit.register(self.join) 300 301 def init_resources(self): 302 for i in range(self._threads): 303 consumer = Consumer( 304 queue=self._queue, 305 identifier=i, 306 client=self._client, 307 flush_at=self._flush_at, 308 flush_interval=self._flush_interval, 309 max_retries=self._max_retries, 310 public_key=self._public_key, 311 sdk_name=self._sdk_name, 312 sdk_version=self._sdk_version, 313 sdk_integration=self._sdk_integration, 314 ) 315 consumer.start() 316 self._consumers.append(consumer) 317 318 def add_task(self, event: dict): 319 if not self._enabled: 320 return 321 322 try: 323 if not self._sampler.sample_event(event): 324 return # event was sampled out 325 326 json.dumps(event, cls=EventSerializer) 327 event["timestamp"] = _get_timestamp() 328 329 self._queue.put(event, block=False) 330 except queue.Full: 331 self._log.warning("analytics-python queue is full") 332 return False 333 except Exception as e: 334 self._log.exception(f"Exception in adding task {e}") 335 336 return False 337 338 def flush(self): 339 """Forces a flush from the internal queue to the server""" 340 self._log.debug("flushing queue") 341 queue = self._queue 342 size = queue.qsize() 343 queue.join() 344 # Note that this message may not be precise, because of threading. 345 self._log.debug("successfully flushed about %s items.", size) 346 347 def join(self): 348 """Ends the consumer threads once the queue is empty. 349 Blocks execution until finished 350 """ 351 self._log.debug(f"joining {len(self._consumers)} consumer threads") 352 353 # pause all consumers before joining them so we don't have to wait for multiple 354 # flush intervals to join them all. 355 for consumer in self._consumers: 356 consumer.pause() 357 358 for consumer in self._consumers: 359 try: 360 consumer.join() 361 except RuntimeError: 362 # consumer thread has not started 363 pass 364 365 self._log.debug(f"consumer thread {consumer._identifier} joined") 366 367 def shutdown(self): 368 """Flush all messages and cleanly shutdown the client""" 369 self._log.debug("shutdown initiated") 370 371 self.flush() 372 self.join() 373 374 self._log.debug("shutdown completed")
266 def __init__( 267 self, 268 client: LangfuseClient, 269 flush_at: int, 270 flush_interval: float, 271 max_retries: int, 272 threads: int, 273 public_key: str, 274 sdk_name: str, 275 sdk_version: str, 276 sdk_integration: str, 277 enabled: bool = True, 278 max_task_queue_size: int = 100_000, 279 sample_rate: float = 1, 280 ): 281 self._max_task_queue_size = max_task_queue_size 282 self._threads = threads 283 self._queue = queue.Queue(self._max_task_queue_size) 284 self._consumers = [] 285 self._client = client 286 self._flush_at = flush_at 287 self._flush_interval = flush_interval 288 self._max_retries = max_retries 289 self._public_key = public_key 290 self._sdk_name = sdk_name 291 self._sdk_version = sdk_version 292 self._sdk_integration = sdk_integration 293 self._enabled = enabled 294 self._sampler = Sampler(sample_rate) 295 296 self.init_resources() 297 298 # cleans up when the python interpreter closes 299 atexit.register(self.join)
301 def init_resources(self): 302 for i in range(self._threads): 303 consumer = Consumer( 304 queue=self._queue, 305 identifier=i, 306 client=self._client, 307 flush_at=self._flush_at, 308 flush_interval=self._flush_interval, 309 max_retries=self._max_retries, 310 public_key=self._public_key, 311 sdk_name=self._sdk_name, 312 sdk_version=self._sdk_version, 313 sdk_integration=self._sdk_integration, 314 ) 315 consumer.start() 316 self._consumers.append(consumer)
318 def add_task(self, event: dict): 319 if not self._enabled: 320 return 321 322 try: 323 if not self._sampler.sample_event(event): 324 return # event was sampled out 325 326 json.dumps(event, cls=EventSerializer) 327 event["timestamp"] = _get_timestamp() 328 329 self._queue.put(event, block=False) 330 except queue.Full: 331 self._log.warning("analytics-python queue is full") 332 return False 333 except Exception as e: 334 self._log.exception(f"Exception in adding task {e}") 335 336 return False
338 def flush(self): 339 """Forces a flush from the internal queue to the server""" 340 self._log.debug("flushing queue") 341 queue = self._queue 342 size = queue.qsize() 343 queue.join() 344 # Note that this message may not be precise, because of threading. 345 self._log.debug("successfully flushed about %s items.", size)
Forces a flush from the internal queue to the server
347 def join(self): 348 """Ends the consumer threads once the queue is empty. 349 Blocks execution until finished 350 """ 351 self._log.debug(f"joining {len(self._consumers)} consumer threads") 352 353 # pause all consumers before joining them so we don't have to wait for multiple 354 # flush intervals to join them all. 355 for consumer in self._consumers: 356 consumer.pause() 357 358 for consumer in self._consumers: 359 try: 360 consumer.join() 361 except RuntimeError: 362 # consumer thread has not started 363 pass 364 365 self._log.debug(f"consumer thread {consumer._identifier} joined")
Ends the consumer threads once the queue is empty. Blocks execution until finished