langfuse._task_manager.ingestion_consumer

  1import json
  2import logging
  3import threading
  4import time
  5
  6from queue import Empty, Queue
  7from typing import Any, List, Optional
  8
  9import backoff
 10
 11try:
 12    import pydantic.v1 as pydantic
 13except ImportError:
 14    import pydantic
 15
 16from langfuse.parse_error import handle_exception
 17from langfuse.request import APIError, LangfuseClient
 18from langfuse.Sampler import Sampler
 19from langfuse.serializer import EventSerializer
 20from langfuse.types import MaskFunction
 21
 22from .media_manager import MediaManager
 23
 24MAX_EVENT_SIZE_BYTES = 1_000_000
 25MAX_BATCH_SIZE_BYTES = 2_500_000
 26
 27
 28class IngestionMetadata(pydantic.BaseModel):
 29    batch_size: int
 30    sdk_integration: str
 31    sdk_name: str
 32    sdk_version: str
 33    public_key: str
 34
 35
 36class IngestionConsumer(threading.Thread):
 37    _log = logging.getLogger("langfuse")
 38    _ingestion_queue: Queue
 39    _identifier: int
 40    _client: LangfuseClient
 41    _flush_at: int
 42    _flush_interval: float
 43    _max_retries: int
 44    _public_key: str
 45    _sdk_name: str
 46    _sdk_version: str
 47    _sdk_integration: str
 48    _mask: Optional[MaskFunction]
 49    _sampler: Sampler
 50    _media_manager: MediaManager
 51
 52    def __init__(
 53        self,
 54        *,
 55        ingestion_queue: Queue,
 56        identifier: int,
 57        client: LangfuseClient,
 58        flush_at: int,
 59        flush_interval: float,
 60        max_retries: int,
 61        public_key: str,
 62        media_manager: MediaManager,
 63        sdk_name: str,
 64        sdk_version: str,
 65        sdk_integration: str,
 66        sample_rate: float,
 67        mask: Optional[MaskFunction] = None,
 68    ):
 69        """Create a consumer thread."""
 70        super().__init__()
 71        # It's important to set running in the constructor: if we are asked to
 72        # pause immediately after construction, we might set running to True in
 73        # run() *after* we set it to False in pause... and keep running
 74        # forever.
 75        self.running = True
 76        # Make consumer a daemon thread so that it doesn't block program exit
 77        self.daemon = True
 78        self._ingestion_queue = ingestion_queue
 79        self._identifier = identifier
 80        self._client = client
 81        self._flush_at = flush_at
 82        self._flush_interval = flush_interval
 83        self._max_retries = max_retries
 84        self._public_key = public_key
 85        self._sdk_name = sdk_name
 86        self._sdk_version = sdk_version
 87        self._sdk_integration = sdk_integration
 88        self._mask = mask
 89        self._sampler = Sampler(sample_rate)
 90        self._media_manager = media_manager
 91
 92    def _next(self):
 93        """Return the next batch of items to upload."""
 94        events = []
 95
 96        start_time = time.monotonic()
 97        total_size = 0
 98
 99        while len(events) < self._flush_at:
100            elapsed = time.monotonic() - start_time
101            if elapsed >= self._flush_interval:
102                break
103            try:
104                event = self._ingestion_queue.get(
105                    block=True, timeout=self._flush_interval - elapsed
106                )
107
108                # convert pydantic models to dicts
109                if "body" in event and isinstance(event["body"], pydantic.BaseModel):
110                    event["body"] = event["body"].dict(exclude_none=True)
111
112                # sample event
113                if not self._sampler.sample_event(event):
114                    self._ingestion_queue.task_done()
115
116                    continue
117
118                # handle multimodal data
119                self._media_manager.process_media_in_event(event)
120
121                # truncate item if it exceeds size limit
122                item_size = self._truncate_item_in_place(
123                    event=event,
124                    max_size=MAX_EVENT_SIZE_BYTES,
125                    log_message="<truncated due to size exceeding limit>",
126                )
127
128                # apply mask
129                self._apply_mask_in_place(event)
130
131                # check for serialization errors
132                try:
133                    json.dumps(event, cls=EventSerializer)
134                except Exception as e:
135                    self._log.error(f"Error serializing item, skipping: {e}")
136                    self._ingestion_queue.task_done()
137
138                    continue
139
140                events.append(event)
141
142                total_size += item_size
143                if total_size >= MAX_BATCH_SIZE_BYTES:
144                    self._log.debug("hit batch size limit (size: %d)", total_size)
145                    break
146
147            except Empty:
148                break
149
150            except Exception as e:
151                self._log.warning(
152                    "Failed to process event in IngestionConsumer, skipping",
153                    exc_info=e,
154                )
155                self._ingestion_queue.task_done()
156
157        self._log.debug(
158            "~%d items in the Langfuse queue", self._ingestion_queue.qsize()
159        )
160
161        return events
162
163    def _truncate_item_in_place(
164        self,
165        *,
166        event: Any,
167        max_size: int,
168        log_message: Optional[str] = None,
169    ) -> int:
170        """Truncate the item in place to fit within the size limit."""
171        item_size = self._get_item_size(event)
172        self._log.debug(f"item size {item_size}")
173
174        if item_size > max_size:
175            self._log.warning(
176                "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.",
177                item_size,
178            )
179
180            if "body" in event:
181                drop_candidates = ["input", "output", "metadata"]
182                sorted_field_sizes = sorted(
183                    [
184                        (
185                            field,
186                            self._get_item_size((event["body"][field]))
187                            if field in event["body"]
188                            else 0,
189                        )
190                        for field in drop_candidates
191                    ],
192                    key=lambda x: x[1],
193                )
194
195                # drop the largest field until the item size is within the limit
196                for _ in range(len(sorted_field_sizes)):
197                    field_to_drop, size_to_drop = sorted_field_sizes.pop()
198
199                    if field_to_drop not in event["body"]:
200                        continue
201
202                    event["body"][field_to_drop] = log_message
203                    item_size -= size_to_drop
204
205                    self._log.debug(
206                        f"Dropped field {field_to_drop}, new item size {item_size}"
207                    )
208
209                    if item_size <= max_size:
210                        break
211
212            # if item does not have body or input/output fields, drop the event
213            if "body" not in event or (
214                "input" not in event["body"] and "output" not in event["body"]
215            ):
216                self._log.warning(
217                    "Item does not have body or input/output fields, dropping item."
218                )
219                self._ingestion_queue.task_done()
220                return 0
221
222        return self._get_item_size(event)
223
224    def _get_item_size(self, item: Any) -> int:
225        """Return the size of the item in bytes."""
226        return len(json.dumps(item, cls=EventSerializer).encode())
227
228    def _apply_mask_in_place(self, event: dict):
229        """Apply the mask function to the event. This is done in place."""
230        if not self._mask:
231            return
232
233        body = event["body"] if "body" in event else {}
234        for key in ("input", "output"):
235            if key in body:
236                try:
237                    body[key] = self._mask(data=body[key])
238                except Exception as e:
239                    self._log.error(f"Mask function failed with error: {e}")
240                    body[key] = "<fully masked due to failed mask function>"
241
242    def run(self):
243        """Run the consumer."""
244        self._log.debug("consumer is running...")
245        while self.running:
246            self.upload()
247
248    def upload(self):
249        """Upload the next batch of items, return whether successful."""
250        batch = self._next()
251        if len(batch) == 0:
252            return
253
254        try:
255            self._upload_batch(batch)
256        except Exception as e:
257            handle_exception(e)
258        finally:
259            # mark items as acknowledged from queue
260            for _ in batch:
261                self._ingestion_queue.task_done()
262
263    def pause(self):
264        """Pause the consumer."""
265        self.running = False
266
267    def _upload_batch(self, batch: List[Any]):
268        self._log.debug("uploading batch of %d items", len(batch))
269
270        metadata = IngestionMetadata(
271            batch_size=len(batch),
272            sdk_integration=self._sdk_integration,
273            sdk_name=self._sdk_name,
274            sdk_version=self._sdk_version,
275            public_key=self._public_key,
276        ).dict()
277
278        @backoff.on_exception(
279            backoff.expo, Exception, max_tries=self._max_retries, logger=None
280        )
281        def execute_task_with_backoff(batch: List[Any]):
282            try:
283                self._client.batch_post(batch=batch, metadata=metadata)
284            except Exception as e:
285                if (
286                    isinstance(e, APIError)
287                    and 400 <= int(e.status) < 500
288                    and int(e.status) != 429  # retry if rate-limited
289                ):
290                    return
291
292                raise e
293
294        execute_task_with_backoff(batch)
295        self._log.debug("successfully uploaded batch of %d events", len(batch))
MAX_EVENT_SIZE_BYTES = 1000000
MAX_BATCH_SIZE_BYTES = 2500000
class IngestionMetadata(pydantic.v1.main.BaseModel):
29class IngestionMetadata(pydantic.BaseModel):
30    batch_size: int
31    sdk_integration: str
32    sdk_name: str
33    sdk_version: str
34    public_key: str
batch_size: int
sdk_integration: str
sdk_name: str
sdk_version: str
public_key: str
Inherited Members
pydantic.v1.main.BaseModel
BaseModel
Config
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class IngestionConsumer(threading.Thread):
 37class IngestionConsumer(threading.Thread):
 38    _log = logging.getLogger("langfuse")
 39    _ingestion_queue: Queue
 40    _identifier: int
 41    _client: LangfuseClient
 42    _flush_at: int
 43    _flush_interval: float
 44    _max_retries: int
 45    _public_key: str
 46    _sdk_name: str
 47    _sdk_version: str
 48    _sdk_integration: str
 49    _mask: Optional[MaskFunction]
 50    _sampler: Sampler
 51    _media_manager: MediaManager
 52
 53    def __init__(
 54        self,
 55        *,
 56        ingestion_queue: Queue,
 57        identifier: int,
 58        client: LangfuseClient,
 59        flush_at: int,
 60        flush_interval: float,
 61        max_retries: int,
 62        public_key: str,
 63        media_manager: MediaManager,
 64        sdk_name: str,
 65        sdk_version: str,
 66        sdk_integration: str,
 67        sample_rate: float,
 68        mask: Optional[MaskFunction] = None,
 69    ):
 70        """Create a consumer thread."""
 71        super().__init__()
 72        # It's important to set running in the constructor: if we are asked to
 73        # pause immediately after construction, we might set running to True in
 74        # run() *after* we set it to False in pause... and keep running
 75        # forever.
 76        self.running = True
 77        # Make consumer a daemon thread so that it doesn't block program exit
 78        self.daemon = True
 79        self._ingestion_queue = ingestion_queue
 80        self._identifier = identifier
 81        self._client = client
 82        self._flush_at = flush_at
 83        self._flush_interval = flush_interval
 84        self._max_retries = max_retries
 85        self._public_key = public_key
 86        self._sdk_name = sdk_name
 87        self._sdk_version = sdk_version
 88        self._sdk_integration = sdk_integration
 89        self._mask = mask
 90        self._sampler = Sampler(sample_rate)
 91        self._media_manager = media_manager
 92
 93    def _next(self):
 94        """Return the next batch of items to upload."""
 95        events = []
 96
 97        start_time = time.monotonic()
 98        total_size = 0
 99
100        while len(events) < self._flush_at:
101            elapsed = time.monotonic() - start_time
102            if elapsed >= self._flush_interval:
103                break
104            try:
105                event = self._ingestion_queue.get(
106                    block=True, timeout=self._flush_interval - elapsed
107                )
108
109                # convert pydantic models to dicts
110                if "body" in event and isinstance(event["body"], pydantic.BaseModel):
111                    event["body"] = event["body"].dict(exclude_none=True)
112
113                # sample event
114                if not self._sampler.sample_event(event):
115                    self._ingestion_queue.task_done()
116
117                    continue
118
119                # handle multimodal data
120                self._media_manager.process_media_in_event(event)
121
122                # truncate item if it exceeds size limit
123                item_size = self._truncate_item_in_place(
124                    event=event,
125                    max_size=MAX_EVENT_SIZE_BYTES,
126                    log_message="<truncated due to size exceeding limit>",
127                )
128
129                # apply mask
130                self._apply_mask_in_place(event)
131
132                # check for serialization errors
133                try:
134                    json.dumps(event, cls=EventSerializer)
135                except Exception as e:
136                    self._log.error(f"Error serializing item, skipping: {e}")
137                    self._ingestion_queue.task_done()
138
139                    continue
140
141                events.append(event)
142
143                total_size += item_size
144                if total_size >= MAX_BATCH_SIZE_BYTES:
145                    self._log.debug("hit batch size limit (size: %d)", total_size)
146                    break
147
148            except Empty:
149                break
150
151            except Exception as e:
152                self._log.warning(
153                    "Failed to process event in IngestionConsumer, skipping",
154                    exc_info=e,
155                )
156                self._ingestion_queue.task_done()
157
158        self._log.debug(
159            "~%d items in the Langfuse queue", self._ingestion_queue.qsize()
160        )
161
162        return events
163
164    def _truncate_item_in_place(
165        self,
166        *,
167        event: Any,
168        max_size: int,
169        log_message: Optional[str] = None,
170    ) -> int:
171        """Truncate the item in place to fit within the size limit."""
172        item_size = self._get_item_size(event)
173        self._log.debug(f"item size {item_size}")
174
175        if item_size > max_size:
176            self._log.warning(
177                "Item exceeds size limit (size: %s), dropping input / output / metadata of item until it fits.",
178                item_size,
179            )
180
181            if "body" in event:
182                drop_candidates = ["input", "output", "metadata"]
183                sorted_field_sizes = sorted(
184                    [
185                        (
186                            field,
187                            self._get_item_size((event["body"][field]))
188                            if field in event["body"]
189                            else 0,
190                        )
191                        for field in drop_candidates
192                    ],
193                    key=lambda x: x[1],
194                )
195
196                # drop the largest field until the item size is within the limit
197                for _ in range(len(sorted_field_sizes)):
198                    field_to_drop, size_to_drop = sorted_field_sizes.pop()
199
200                    if field_to_drop not in event["body"]:
201                        continue
202
203                    event["body"][field_to_drop] = log_message
204                    item_size -= size_to_drop
205
206                    self._log.debug(
207                        f"Dropped field {field_to_drop}, new item size {item_size}"
208                    )
209
210                    if item_size <= max_size:
211                        break
212
213            # if item does not have body or input/output fields, drop the event
214            if "body" not in event or (
215                "input" not in event["body"] and "output" not in event["body"]
216            ):
217                self._log.warning(
218                    "Item does not have body or input/output fields, dropping item."
219                )
220                self._ingestion_queue.task_done()
221                return 0
222
223        return self._get_item_size(event)
224
225    def _get_item_size(self, item: Any) -> int:
226        """Return the size of the item in bytes."""
227        return len(json.dumps(item, cls=EventSerializer).encode())
228
229    def _apply_mask_in_place(self, event: dict):
230        """Apply the mask function to the event. This is done in place."""
231        if not self._mask:
232            return
233
234        body = event["body"] if "body" in event else {}
235        for key in ("input", "output"):
236            if key in body:
237                try:
238                    body[key] = self._mask(data=body[key])
239                except Exception as e:
240                    self._log.error(f"Mask function failed with error: {e}")
241                    body[key] = "<fully masked due to failed mask function>"
242
243    def run(self):
244        """Run the consumer."""
245        self._log.debug("consumer is running...")
246        while self.running:
247            self.upload()
248
249    def upload(self):
250        """Upload the next batch of items, return whether successful."""
251        batch = self._next()
252        if len(batch) == 0:
253            return
254
255        try:
256            self._upload_batch(batch)
257        except Exception as e:
258            handle_exception(e)
259        finally:
260            # mark items as acknowledged from queue
261            for _ in batch:
262                self._ingestion_queue.task_done()
263
264    def pause(self):
265        """Pause the consumer."""
266        self.running = False
267
268    def _upload_batch(self, batch: List[Any]):
269        self._log.debug("uploading batch of %d items", len(batch))
270
271        metadata = IngestionMetadata(
272            batch_size=len(batch),
273            sdk_integration=self._sdk_integration,
274            sdk_name=self._sdk_name,
275            sdk_version=self._sdk_version,
276            public_key=self._public_key,
277        ).dict()
278
279        @backoff.on_exception(
280            backoff.expo, Exception, max_tries=self._max_retries, logger=None
281        )
282        def execute_task_with_backoff(batch: List[Any]):
283            try:
284                self._client.batch_post(batch=batch, metadata=metadata)
285            except Exception as e:
286                if (
287                    isinstance(e, APIError)
288                    and 400 <= int(e.status) < 500
289                    and int(e.status) != 429  # retry if rate-limited
290                ):
291                    return
292
293                raise e
294
295        execute_task_with_backoff(batch)
296        self._log.debug("successfully uploaded batch of %d events", len(batch))

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

IngestionConsumer( *, ingestion_queue: queue.Queue, identifier: int, client: langfuse.request.LangfuseClient, flush_at: int, flush_interval: float, max_retries: int, public_key: str, media_manager: langfuse._task_manager.media_manager.MediaManager, sdk_name: str, sdk_version: str, sdk_integration: str, sample_rate: float, mask: Optional[langfuse.types.MaskFunction] = None)
53    def __init__(
54        self,
55        *,
56        ingestion_queue: Queue,
57        identifier: int,
58        client: LangfuseClient,
59        flush_at: int,
60        flush_interval: float,
61        max_retries: int,
62        public_key: str,
63        media_manager: MediaManager,
64        sdk_name: str,
65        sdk_version: str,
66        sdk_integration: str,
67        sample_rate: float,
68        mask: Optional[MaskFunction] = None,
69    ):
70        """Create a consumer thread."""
71        super().__init__()
72        # It's important to set running in the constructor: if we are asked to
73        # pause immediately after construction, we might set running to True in
74        # run() *after* we set it to False in pause... and keep running
75        # forever.
76        self.running = True
77        # Make consumer a daemon thread so that it doesn't block program exit
78        self.daemon = True
79        self._ingestion_queue = ingestion_queue
80        self._identifier = identifier
81        self._client = client
82        self._flush_at = flush_at
83        self._flush_interval = flush_interval
84        self._max_retries = max_retries
85        self._public_key = public_key
86        self._sdk_name = sdk_name
87        self._sdk_version = sdk_version
88        self._sdk_integration = sdk_integration
89        self._mask = mask
90        self._sampler = Sampler(sample_rate)
91        self._media_manager = media_manager

Create a consumer thread.

running
daemon
1200    @property
1201    def daemon(self):
1202        """A boolean value indicating whether this thread is a daemon thread.
1203
1204        This must be set before start() is called, otherwise RuntimeError is
1205        raised. Its initial value is inherited from the creating thread; the
1206        main thread is not a daemon thread and therefore all threads created in
1207        the main thread default to daemon = False.
1208
1209        The entire Python program exits when only daemon threads are left.
1210
1211        """
1212        assert self._initialized, "Thread.__init__() not called"
1213        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

def run(self):
243    def run(self):
244        """Run the consumer."""
245        self._log.debug("consumer is running...")
246        while self.running:
247            self.upload()

Run the consumer.

def upload(self):
249    def upload(self):
250        """Upload the next batch of items, return whether successful."""
251        batch = self._next()
252        if len(batch) == 0:
253            return
254
255        try:
256            self._upload_batch(batch)
257        except Exception as e:
258            handle_exception(e)
259        finally:
260            # mark items as acknowledged from queue
261            for _ in batch:
262                self._ingestion_queue.task_done()

Upload the next batch of items, return whether successful.

def pause(self):
264    def pause(self):
265        """Pause the consumer."""
266        self.running = False

Pause the consumer.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id