langfuse._task_manager.ingestion_consumer
1import json 2import logging 3import threading 4import time 5 6from queue import Empty, Queue 7from typing import Any, List, Optional 8 9import backoff 10 11try: 12 import pydantic.v1 as pydantic 13except ImportError: 14 import pydantic 15 16from langfuse.parse_error import handle_exception 17from langfuse.request import APIError, LangfuseClient 18from langfuse.Sampler import Sampler 19from langfuse.serializer import EventSerializer 20from langfuse.types import MaskFunction 21 22from .media_manager import MediaManager 23 24MAX_EVENT_SIZE_BYTES = 1_000_000 25MAX_BATCH_SIZE_BYTES = 2_500_000 26 27 28class IngestionMetadata(pydantic.BaseModel): 29 batch_size: int 30 sdk_integration: str 31 sdk_name: str 32 sdk_version: str 33 public_key: str 34 35 36class IngestionConsumer(threading.Thread): 37 _log = logging.getLogger("langfuse") 38 _ingestion_queue: Queue 39 _identifier: int 40 _client: LangfuseClient 41 _flush_at: int 42 _flush_interval: float 43 _max_retries: int 44 _public_key: str 45 _sdk_name: str 46 _sdk_version: str 47 _sdk_integration: str 48 _mask: Optional[MaskFunction] 49 _sampler: Sampler 50 _media_manager: MediaManager 51 52 def __init__( 53 self, 54 *, 55 ingestion_queue: Queue, 56 identifier: int, 57 client: LangfuseClient, 58 flush_at: int, 59 flush_interval: float, 60 max_retries: int, 61 public_key: str, 62 media_manager: MediaManager, 63 sdk_name: str, 64 sdk_version: str, 65 sdk_integration: str, 66 sample_rate: float, 67 mask: Optional[MaskFunction] = None, 68 ): 69 """Create a consumer thread.""" 70 super().__init__() 71 # It's important to set running in the constructor: if we are asked to 72 # pause immediately after construction, we might set running to True in 73 # run() *after* we set it to False in pause... and keep running 74 # forever. 75 self.running = True 76 # Make consumer a daemon thread so that it doesn't block program exit 77 self.daemon = True 78 self._ingestion_queue = ingestion_queue 79 self._identifier = identifier 80 self._client = client 81 self._flush_at = flush_at 82 self._flush_interval = flush_interval 83 self._max_retries = max_retries 84 self._public_key = public_key 85 self._sdk_name = sdk_name 86 self._sdk_version = sdk_version 87 self._sdk_integration = sdk_integration 88 self._mask = mask 89 self._sampler = Sampler(sample_rate) 90 self._media_manager = media_manager 91 92 def _next(self): 93 """Return the next batch of items to upload.""" 94 events = [] 95 96 start_time = time.monotonic() 97 total_size = 0 98 99 while len(events) < self._flush_at: 100 elapsed = time.monotonic() - start_time 101 if elapsed >= self._flush_interval: 102 break 103 try: 104 event = self._ingestion_queue.get( 105 block=True, timeout=self._flush_interval - elapsed 106 ) 107 108 # convert pydantic models to dicts 109 if "body" in event and isinstance(event["body"], pydantic.BaseModel): 110 event["body"] = event["body"].dict(exclude_none=True) 111 112 # sample event 113 if not self._sampler.sample_event(event): 114 self._ingestion_queue.task_done() 115 116 continue 117 118 # handle multimodal data 119 self._media_manager.process_media_in_event(event) 120 121 # truncate item if it exceeds size limit 122 item_size = self._truncate_item_in_place( 123 event=event, 124 max_size=MAX_EVENT_SIZE_BYTES, 125 log_message="<truncated due to size exceeding limit>", 126 ) 127 128 # apply mask 129 self._apply_mask_in_place(event) 130 131 # check for serialization errors 132 try: 133 json.dumps(event, cls=EventSerializer) 134 except Exception as e: 135 self._log.error(f"Error serializing item, skipping: {e}") 136 self._ingestion_queue.task_done() 137 138 continue 139 140 events.append(event) 141 142 total_size += item_size 143 if total_size >= MAX_BATCH_SIZE_BYTES: 144 self._log.debug("hit batch size limit (size: %d)", total_size) 145 break 146 147 except Empty: 148 break 149 150 except Exception as e: 151 self._log.warning( 152 "Failed to process event in IngestionConsumer, skipping", 153 exc_info=e, 154 ) 155 self._ingestion_queue.task_done() 156 157 self._log.debug( 158 "~%d items in the Langfuse queue", self._ingestion_queue.qsize() 159 ) 160 161 return events 162 163 def _truncate_item_in_place( 164 self, 165 *, 166 event: Any, 167 max_size: int, 168 log_message: Optional[str] = None, 169 ) -> int: 170 """Truncate the item in place to fit within the size limit.""" 171 item_size = self._get_item_size(event) 172 self._log.debug(f"item size {item_size}") 173 174 if item_size > max_size: 175 self._log.warning( 176 "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.", 177 item_size, 178 ) 179 180 if "body" in event: 181 drop_candidates = ["input", "output", "metadata"] 182 sorted_field_sizes = sorted( 183 [ 184 ( 185 field, 186 self._get_item_size((event["body"][field])) 187 if field in event["body"] 188 else 0, 189 ) 190 for field in drop_candidates 191 ], 192 key=lambda x: x[1], 193 ) 194 195 # drop the largest field until the item size is within the limit 196 for _ in range(len(sorted_field_sizes)): 197 field_to_drop, size_to_drop = sorted_field_sizes.pop() 198 199 if field_to_drop not in event["body"]: 200 continue 201 202 event["body"][field_to_drop] = log_message 203 item_size -= size_to_drop 204 205 self._log.debug( 206 f"Dropped field {field_to_drop}, new item size {item_size}" 207 ) 208 209 if item_size <= max_size: 210 break 211 212 # if item does not have body or input/output fields, drop the event 213 if "body" not in event or ( 214 "input" not in event["body"] and "output" not in event["body"] 215 ): 216 self._log.warning( 217 "Item does not have body or input/output fields, dropping item." 218 ) 219 self._ingestion_queue.task_done() 220 return 0 221 222 return self._get_item_size(event) 223 224 def _get_item_size(self, item: Any) -> int: 225 """Return the size of the item in bytes.""" 226 return len(json.dumps(item, cls=EventSerializer).encode()) 227 228 def _apply_mask_in_place(self, event: dict): 229 """Apply the mask function to the event. This is done in place.""" 230 if not self._mask: 231 return 232 233 body = event["body"] if "body" in event else {} 234 for key in ("input", "output"): 235 if key in body: 236 try: 237 body[key] = self._mask(data=body[key]) 238 except Exception as e: 239 self._log.error(f"Mask function failed with error: {e}") 240 body[key] = "<fully masked due to failed mask function>" 241 242 def run(self): 243 """Run the consumer.""" 244 self._log.debug("consumer is running...") 245 while self.running: 246 self.upload() 247 248 def upload(self): 249 """Upload the next batch of items, return whether successful.""" 250 batch = self._next() 251 if len(batch) == 0: 252 return 253 254 try: 255 self._upload_batch(batch) 256 except Exception as e: 257 handle_exception(e) 258 finally: 259 # mark items as acknowledged from queue 260 for _ in batch: 261 self._ingestion_queue.task_done() 262 263 def pause(self): 264 """Pause the consumer.""" 265 self.running = False 266 267 def _upload_batch(self, batch: List[Any]): 268 self._log.debug("uploading batch of %d items", len(batch)) 269 270 metadata = IngestionMetadata( 271 batch_size=len(batch), 272 sdk_integration=self._sdk_integration, 273 sdk_name=self._sdk_name, 274 sdk_version=self._sdk_version, 275 public_key=self._public_key, 276 ).dict() 277 278 @backoff.on_exception( 279 backoff.expo, Exception, max_tries=self._max_retries, logger=None 280 ) 281 def execute_task_with_backoff(batch: List[Any]): 282 try: 283 self._client.batch_post(batch=batch, metadata=metadata) 284 except Exception as e: 285 if ( 286 isinstance(e, APIError) 287 and 400 <= int(e.status) < 500 288 and int(e.status) != 429 # retry if rate-limited 289 ): 290 return 291 292 raise e 293 294 execute_task_with_backoff(batch) 295 self._log.debug("successfully uploaded batch of %d events", len(batch))
MAX_EVENT_SIZE_BYTES =
1000000
MAX_BATCH_SIZE_BYTES =
2500000
class
IngestionMetadata(pydantic.v1.main.BaseModel):
29class IngestionMetadata(pydantic.BaseModel): 30 batch_size: int 31 sdk_integration: str 32 sdk_name: str 33 sdk_version: str 34 public_key: str
Inherited Members
- pydantic.v1.main.BaseModel
- BaseModel
- Config
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
class
IngestionConsumer(threading.Thread):
37class IngestionConsumer(threading.Thread): 38 _log = logging.getLogger("langfuse") 39 _ingestion_queue: Queue 40 _identifier: int 41 _client: LangfuseClient 42 _flush_at: int 43 _flush_interval: float 44 _max_retries: int 45 _public_key: str 46 _sdk_name: str 47 _sdk_version: str 48 _sdk_integration: str 49 _mask: Optional[MaskFunction] 50 _sampler: Sampler 51 _media_manager: MediaManager 52 53 def __init__( 54 self, 55 *, 56 ingestion_queue: Queue, 57 identifier: int, 58 client: LangfuseClient, 59 flush_at: int, 60 flush_interval: float, 61 max_retries: int, 62 public_key: str, 63 media_manager: MediaManager, 64 sdk_name: str, 65 sdk_version: str, 66 sdk_integration: str, 67 sample_rate: float, 68 mask: Optional[MaskFunction] = None, 69 ): 70 """Create a consumer thread.""" 71 super().__init__() 72 # It's important to set running in the constructor: if we are asked to 73 # pause immediately after construction, we might set running to True in 74 # run() *after* we set it to False in pause... and keep running 75 # forever. 76 self.running = True 77 # Make consumer a daemon thread so that it doesn't block program exit 78 self.daemon = True 79 self._ingestion_queue = ingestion_queue 80 self._identifier = identifier 81 self._client = client 82 self._flush_at = flush_at 83 self._flush_interval = flush_interval 84 self._max_retries = max_retries 85 self._public_key = public_key 86 self._sdk_name = sdk_name 87 self._sdk_version = sdk_version 88 self._sdk_integration = sdk_integration 89 self._mask = mask 90 self._sampler = Sampler(sample_rate) 91 self._media_manager = media_manager 92 93 def _next(self): 94 """Return the next batch of items to upload.""" 95 events = [] 96 97 start_time = time.monotonic() 98 total_size = 0 99 100 while len(events) < self._flush_at: 101 elapsed = time.monotonic() - start_time 102 if elapsed >= self._flush_interval: 103 break 104 try: 105 event = self._ingestion_queue.get( 106 block=True, timeout=self._flush_interval - elapsed 107 ) 108 109 # convert pydantic models to dicts 110 if "body" in event and isinstance(event["body"], pydantic.BaseModel): 111 event["body"] = event["body"].dict(exclude_none=True) 112 113 # sample event 114 if not self._sampler.sample_event(event): 115 self._ingestion_queue.task_done() 116 117 continue 118 119 # handle multimodal data 120 self._media_manager.process_media_in_event(event) 121 122 # truncate item if it exceeds size limit 123 item_size = self._truncate_item_in_place( 124 event=event, 125 max_size=MAX_EVENT_SIZE_BYTES, 126 log_message="<truncated due to size exceeding limit>", 127 ) 128 129 # apply mask 130 self._apply_mask_in_place(event) 131 132 # check for serialization errors 133 try: 134 json.dumps(event, cls=EventSerializer) 135 except Exception as e: 136 self._log.error(f"Error serializing item, skipping: {e}") 137 self._ingestion_queue.task_done() 138 139 continue 140 141 events.append(event) 142 143 total_size += item_size 144 if total_size >= MAX_BATCH_SIZE_BYTES: 145 self._log.debug("hit batch size limit (size: %d)", total_size) 146 break 147 148 except Empty: 149 break 150 151 except Exception as e: 152 self._log.warning( 153 "Failed to process event in IngestionConsumer, skipping", 154 exc_info=e, 155 ) 156 self._ingestion_queue.task_done() 157 158 self._log.debug( 159 "~%d items in the Langfuse queue", self._ingestion_queue.qsize() 160 ) 161 162 return events 163 164 def _truncate_item_in_place( 165 self, 166 *, 167 event: Any, 168 max_size: int, 169 log_message: Optional[str] = None, 170 ) -> int: 171 """Truncate the item in place to fit within the size limit.""" 172 item_size = self._get_item_size(event) 173 self._log.debug(f"item size {item_size}") 174 175 if item_size > max_size: 176 self._log.warning( 177 "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.", 178 item_size, 179 ) 180 181 if "body" in event: 182 drop_candidates = ["input", "output", "metadata"] 183 sorted_field_sizes = sorted( 184 [ 185 ( 186 field, 187 self._get_item_size((event["body"][field])) 188 if field in event["body"] 189 else 0, 190 ) 191 for field in drop_candidates 192 ], 193 key=lambda x: x[1], 194 ) 195 196 # drop the largest field until the item size is within the limit 197 for _ in range(len(sorted_field_sizes)): 198 field_to_drop, size_to_drop = sorted_field_sizes.pop() 199 200 if field_to_drop not in event["body"]: 201 continue 202 203 event["body"][field_to_drop] = log_message 204 item_size -= size_to_drop 205 206 self._log.debug( 207 f"Dropped field {field_to_drop}, new item size {item_size}" 208 ) 209 210 if item_size <= max_size: 211 break 212 213 # if item does not have body or input/output fields, drop the event 214 if "body" not in event or ( 215 "input" not in event["body"] and "output" not in event["body"] 216 ): 217 self._log.warning( 218 "Item does not have body or input/output fields, dropping item." 219 ) 220 self._ingestion_queue.task_done() 221 return 0 222 223 return self._get_item_size(event) 224 225 def _get_item_size(self, item: Any) -> int: 226 """Return the size of the item in bytes.""" 227 return len(json.dumps(item, cls=EventSerializer).encode()) 228 229 def _apply_mask_in_place(self, event: dict): 230 """Apply the mask function to the event. This is done in place.""" 231 if not self._mask: 232 return 233 234 body = event["body"] if "body" in event else {} 235 for key in ("input", "output"): 236 if key in body: 237 try: 238 body[key] = self._mask(data=body[key]) 239 except Exception as e: 240 self._log.error(f"Mask function failed with error: {e}") 241 body[key] = "<fully masked due to failed mask function>" 242 243 def run(self): 244 """Run the consumer.""" 245 self._log.debug("consumer is running...") 246 while self.running: 247 self.upload() 248 249 def upload(self): 250 """Upload the next batch of items, return whether successful.""" 251 batch = self._next() 252 if len(batch) == 0: 253 return 254 255 try: 256 self._upload_batch(batch) 257 except Exception as e: 258 handle_exception(e) 259 finally: 260 # mark items as acknowledged from queue 261 for _ in batch: 262 self._ingestion_queue.task_done() 263 264 def pause(self): 265 """Pause the consumer.""" 266 self.running = False 267 268 def _upload_batch(self, batch: List[Any]): 269 self._log.debug("uploading batch of %d items", len(batch)) 270 271 metadata = IngestionMetadata( 272 batch_size=len(batch), 273 sdk_integration=self._sdk_integration, 274 sdk_name=self._sdk_name, 275 sdk_version=self._sdk_version, 276 public_key=self._public_key, 277 ).dict() 278 279 @backoff.on_exception( 280 backoff.expo, Exception, max_tries=self._max_retries, logger=None 281 ) 282 def execute_task_with_backoff(batch: List[Any]): 283 try: 284 self._client.batch_post(batch=batch, metadata=metadata) 285 except Exception as e: 286 if ( 287 isinstance(e, APIError) 288 and 400 <= int(e.status) < 500 289 and int(e.status) != 429 # retry if rate-limited 290 ): 291 return 292 293 raise e 294 295 execute_task_with_backoff(batch) 296 self._log.debug("successfully uploaded batch of %d events", len(batch))
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
IngestionConsumer( *, ingestion_queue: queue.Queue, identifier: int, client: langfuse.request.LangfuseClient, flush_at: int, flush_interval: float, max_retries: int, public_key: str, media_manager: langfuse._task_manager.media_manager.MediaManager, sdk_name: str, sdk_version: str, sdk_integration: str, sample_rate: float, mask: Optional[langfuse.types.MaskFunction] = None)
53 def __init__( 54 self, 55 *, 56 ingestion_queue: Queue, 57 identifier: int, 58 client: LangfuseClient, 59 flush_at: int, 60 flush_interval: float, 61 max_retries: int, 62 public_key: str, 63 media_manager: MediaManager, 64 sdk_name: str, 65 sdk_version: str, 66 sdk_integration: str, 67 sample_rate: float, 68 mask: Optional[MaskFunction] = None, 69 ): 70 """Create a consumer thread.""" 71 super().__init__() 72 # It's important to set running in the constructor: if we are asked to 73 # pause immediately after construction, we might set running to True in 74 # run() *after* we set it to False in pause... and keep running 75 # forever. 76 self.running = True 77 # Make consumer a daemon thread so that it doesn't block program exit 78 self.daemon = True 79 self._ingestion_queue = ingestion_queue 80 self._identifier = identifier 81 self._client = client 82 self._flush_at = flush_at 83 self._flush_interval = flush_interval 84 self._max_retries = max_retries 85 self._public_key = public_key 86 self._sdk_name = sdk_name 87 self._sdk_version = sdk_version 88 self._sdk_integration = sdk_integration 89 self._mask = mask 90 self._sampler = Sampler(sample_rate) 91 self._media_manager = media_manager
Create a consumer thread.
daemon
1200 @property 1201 def daemon(self): 1202 """A boolean value indicating whether this thread is a daemon thread. 1203 1204 This must be set before start() is called, otherwise RuntimeError is 1205 raised. Its initial value is inherited from the creating thread; the 1206 main thread is not a daemon thread and therefore all threads created in 1207 the main thread default to daemon = False. 1208 1209 The entire Python program exits when only daemon threads are left. 1210 1211 """ 1212 assert self._initialized, "Thread.__init__() not called" 1213 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
def
run(self):
243 def run(self): 244 """Run the consumer.""" 245 self._log.debug("consumer is running...") 246 while self.running: 247 self.upload()
Run the consumer.
def
upload(self):
249 def upload(self): 250 """Upload the next batch of items, return whether successful.""" 251 batch = self._next() 252 if len(batch) == 0: 253 return 254 255 try: 256 self._upload_batch(batch) 257 except Exception as e: 258 handle_exception(e) 259 finally: 260 # mark items as acknowledged from queue 261 for _ in batch: 262 self._ingestion_queue.task_done()
Upload the next batch of items, return whether successful.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id