langfuse._task_manager.media_manager

  1import logging
  2import time
  3from queue import Empty, Queue
  4from typing import Any, Callable, Optional, TypeVar
  5
  6import backoff
  7import requests
  8from typing_extensions import ParamSpec
  9
 10from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody
 11from langfuse.api.client import FernLangfuse
 12from langfuse.api.core import ApiError
 13from langfuse.media import LangfuseMedia
 14from langfuse.utils import _get_timestamp
 15
 16from .media_upload_queue import UploadMediaJob
 17
 18T = TypeVar("T")
 19P = ParamSpec("P")
 20
 21
 22class MediaManager:
 23    _log = logging.getLogger(__name__)
 24
 25    def __init__(
 26        self,
 27        *,
 28        api_client: FernLangfuse,
 29        media_upload_queue: Queue,
 30        max_retries: Optional[int] = 3,
 31    ):
 32        self._api_client = api_client
 33        self._queue = media_upload_queue
 34        self._max_retries = max_retries
 35
 36    def process_next_media_upload(self):
 37        try:
 38            upload_job = self._queue.get(block=True, timeout=1)
 39            self._log.debug(f"Processing upload for {upload_job['media_id']}")
 40            self._process_upload_media_job(data=upload_job)
 41
 42            self._queue.task_done()
 43        except Empty:
 44            self._log.debug("Media upload queue is empty")
 45            pass
 46        except Exception as e:
 47            self._log.error(f"Error uploading media: {e}")
 48            self._queue.task_done()
 49
 50    def process_media_in_event(self, event: dict):
 51        try:
 52            if "body" not in event:
 53                return
 54
 55            body = event["body"]
 56            trace_id = body.get("traceId", None) or (
 57                body.get("id", None)
 58                if "type" in event and "trace" in event["type"]
 59                else None
 60            )
 61
 62            if trace_id is None:
 63                raise ValueError("trace_id is required for media upload")
 64
 65            observation_id = (
 66                body.get("id", None)
 67                if "type" in event
 68                and ("generation" in event["type"] or "span" in event["type"])
 69                else None
 70            )
 71
 72            multimodal_fields = ["input", "output", "metadata"]
 73
 74            for field in multimodal_fields:
 75                if field in body:
 76                    processed_data = self._find_and_process_media(
 77                        data=body[field],
 78                        trace_id=trace_id,
 79                        observation_id=observation_id,
 80                        field=field,
 81                    )
 82
 83                    body[field] = processed_data
 84
 85        except Exception as e:
 86            self._log.error(f"Error processing multimodal event: {e}")
 87
 88    def _find_and_process_media(
 89        self,
 90        *,
 91        data: Any,
 92        trace_id: str,
 93        observation_id: Optional[str],
 94        field: str,
 95    ):
 96        seen = set()
 97        max_levels = 10
 98
 99        def _process_data_recursively(data: Any, level: int):
100            if id(data) in seen or level > max_levels:
101                return data
102
103            seen.add(id(data))
104
105            if isinstance(data, LangfuseMedia):
106                self._process_media(
107                    media=data,
108                    trace_id=trace_id,
109                    observation_id=observation_id,
110                    field=field,
111                )
112
113                return data
114
115            if isinstance(data, str) and data.startswith("data:"):
116                media = LangfuseMedia(
117                    obj=data,
118                    base64_data_uri=data,
119                )
120
121                self._process_media(
122                    media=media,
123                    trace_id=trace_id,
124                    observation_id=observation_id,
125                    field=field,
126                )
127
128                return media
129
130            # Anthropic
131            if (
132                isinstance(data, dict)
133                and "type" in data
134                and data["type"] == "base64"
135                and "media_type" in data
136                and "data" in data
137            ):
138                media = LangfuseMedia(
139                    base64_data_uri=f"data:{data['media_type']};base64," + data["data"],
140                )
141
142                self._process_media(
143                    media=media,
144                    trace_id=trace_id,
145                    observation_id=observation_id,
146                    field=field,
147                )
148
149                data["data"] = media
150
151                return data
152
153            # Vertex
154            if (
155                isinstance(data, dict)
156                and "type" in data
157                and data["type"] == "media"
158                and "mime_type" in data
159                and "data" in data
160            ):
161                media = LangfuseMedia(
162                    base64_data_uri=f"data:{data['mime_type']};base64," + data["data"],
163                )
164
165                self._process_media(
166                    media=media,
167                    trace_id=trace_id,
168                    observation_id=observation_id,
169                    field=field,
170                )
171
172                data["data"] = media
173
174                return data
175
176            if isinstance(data, list):
177                return [_process_data_recursively(item, level + 1) for item in data]
178
179            if isinstance(data, dict):
180                return {
181                    key: _process_data_recursively(value, level + 1)
182                    for key, value in data.items()
183                }
184
185            return data
186
187        return _process_data_recursively(data, 1)
188
189    def _process_media(
190        self,
191        *,
192        media: LangfuseMedia,
193        trace_id: str,
194        observation_id: Optional[str],
195        field: str,
196    ):
197        if (
198            media._content_length is None
199            or media._content_type is None
200            or media._content_sha256_hash is None
201            or media._content_bytes is None
202        ):
203            return
204
205        upload_url_response = self._request_with_backoff(
206            self._api_client.media.get_upload_url,
207            request=GetMediaUploadUrlRequest(
208                contentLength=media._content_length,
209                contentType=media._content_type,
210                sha256Hash=media._content_sha256_hash,
211                field=field,
212                traceId=trace_id,
213                observationId=observation_id,
214            ),
215        )
216
217        upload_url = upload_url_response.upload_url
218        media._media_id = upload_url_response.media_id  # Important as this is will be used in the media reference string in serializer
219
220        if upload_url is not None:
221            self._log.debug(f"Scheduling upload for {media._media_id}")
222            self._queue.put(
223                item={
224                    "upload_url": upload_url,
225                    "media_id": media._media_id,
226                    "content_bytes": media._content_bytes,
227                    "content_type": media._content_type,
228                    "content_sha256_hash": media._content_sha256_hash,
229                },
230                block=True,
231                timeout=1,
232            )
233
234        else:
235            self._log.debug(f"Media {media._media_id} already uploaded")
236
237    def _process_upload_media_job(
238        self,
239        *,
240        data: UploadMediaJob,
241    ):
242        upload_start_time = time.time()
243        upload_response = self._request_with_backoff(
244            requests.put,
245            data["upload_url"],
246            headers={
247                "Content-Type": data["content_type"],
248                "x-amz-checksum-sha256": data["content_sha256_hash"],
249                "x-ms-blob-type": "BlockBlob",
250            },
251            data=data["content_bytes"],
252        )
253        upload_time_ms = int((time.time() - upload_start_time) * 1000)
254
255        self._request_with_backoff(
256            self._api_client.media.patch,
257            media_id=data["media_id"],
258            request=PatchMediaBody(
259                uploadedAt=_get_timestamp(),
260                uploadHttpStatus=upload_response.status_code,
261                uploadHttpError=upload_response.text,
262                uploadTimeMs=upload_time_ms,
263            ),
264        )
265
266        self._log.debug(
267            f"Media upload completed for {data['media_id']} in {upload_time_ms}ms"
268        )
269
270    def _request_with_backoff(
271        self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
272    ) -> T:
273        def _should_give_up(e: Exception) -> bool:
274            if isinstance(e, ApiError):
275                return (
276                    e.status_code is not None
277                    and 400 <= e.status_code < 500
278                    and e.status_code != 429
279                )
280            if isinstance(e, requests.exceptions.RequestException):
281                return (
282                    e.response is not None
283                    and e.response.status_code < 500
284                    and e.response.status_code != 429
285                )
286            return False
287
288        @backoff.on_exception(
289            backoff.expo,
290            Exception,
291            max_tries=self._max_retries,
292            giveup=_should_give_up,
293            logger=None,
294        )
295        def execute_task_with_backoff() -> T:
296            return func(*args, **kwargs)
297
298        return execute_task_with_backoff()
P = ~P
class MediaManager:
 23class MediaManager:
 24    _log = logging.getLogger(__name__)
 25
 26    def __init__(
 27        self,
 28        *,
 29        api_client: FernLangfuse,
 30        media_upload_queue: Queue,
 31        max_retries: Optional[int] = 3,
 32    ):
 33        self._api_client = api_client
 34        self._queue = media_upload_queue
 35        self._max_retries = max_retries
 36
 37    def process_next_media_upload(self):
 38        try:
 39            upload_job = self._queue.get(block=True, timeout=1)
 40            self._log.debug(f"Processing upload for {upload_job['media_id']}")
 41            self._process_upload_media_job(data=upload_job)
 42
 43            self._queue.task_done()
 44        except Empty:
 45            self._log.debug("Media upload queue is empty")
 46            pass
 47        except Exception as e:
 48            self._log.error(f"Error uploading media: {e}")
 49            self._queue.task_done()
 50
 51    def process_media_in_event(self, event: dict):
 52        try:
 53            if "body" not in event:
 54                return
 55
 56            body = event["body"]
 57            trace_id = body.get("traceId", None) or (
 58                body.get("id", None)
 59                if "type" in event and "trace" in event["type"]
 60                else None
 61            )
 62
 63            if trace_id is None:
 64                raise ValueError("trace_id is required for media upload")
 65
 66            observation_id = (
 67                body.get("id", None)
 68                if "type" in event
 69                and ("generation" in event["type"] or "span" in event["type"])
 70                else None
 71            )
 72
 73            multimodal_fields = ["input", "output", "metadata"]
 74
 75            for field in multimodal_fields:
 76                if field in body:
 77                    processed_data = self._find_and_process_media(
 78                        data=body[field],
 79                        trace_id=trace_id,
 80                        observation_id=observation_id,
 81                        field=field,
 82                    )
 83
 84                    body[field] = processed_data
 85
 86        except Exception as e:
 87            self._log.error(f"Error processing multimodal event: {e}")
 88
 89    def _find_and_process_media(
 90        self,
 91        *,
 92        data: Any,
 93        trace_id: str,
 94        observation_id: Optional[str],
 95        field: str,
 96    ):
 97        seen = set()
 98        max_levels = 10
 99
100        def _process_data_recursively(data: Any, level: int):
101            if id(data) in seen or level > max_levels:
102                return data
103
104            seen.add(id(data))
105
106            if isinstance(data, LangfuseMedia):
107                self._process_media(
108                    media=data,
109                    trace_id=trace_id,
110                    observation_id=observation_id,
111                    field=field,
112                )
113
114                return data
115
116            if isinstance(data, str) and data.startswith("data:"):
117                media = LangfuseMedia(
118                    obj=data,
119                    base64_data_uri=data,
120                )
121
122                self._process_media(
123                    media=media,
124                    trace_id=trace_id,
125                    observation_id=observation_id,
126                    field=field,
127                )
128
129                return media
130
131            # Anthropic
132            if (
133                isinstance(data, dict)
134                and "type" in data
135                and data["type"] == "base64"
136                and "media_type" in data
137                and "data" in data
138            ):
139                media = LangfuseMedia(
140                    base64_data_uri=f"data:{data['media_type']};base64," + data["data"],
141                )
142
143                self._process_media(
144                    media=media,
145                    trace_id=trace_id,
146                    observation_id=observation_id,
147                    field=field,
148                )
149
150                data["data"] = media
151
152                return data
153
154            # Vertex
155            if (
156                isinstance(data, dict)
157                and "type" in data
158                and data["type"] == "media"
159                and "mime_type" in data
160                and "data" in data
161            ):
162                media = LangfuseMedia(
163                    base64_data_uri=f"data:{data['mime_type']};base64," + data["data"],
164                )
165
166                self._process_media(
167                    media=media,
168                    trace_id=trace_id,
169                    observation_id=observation_id,
170                    field=field,
171                )
172
173                data["data"] = media
174
175                return data
176
177            if isinstance(data, list):
178                return [_process_data_recursively(item, level + 1) for item in data]
179
180            if isinstance(data, dict):
181                return {
182                    key: _process_data_recursively(value, level + 1)
183                    for key, value in data.items()
184                }
185
186            return data
187
188        return _process_data_recursively(data, 1)
189
190    def _process_media(
191        self,
192        *,
193        media: LangfuseMedia,
194        trace_id: str,
195        observation_id: Optional[str],
196        field: str,
197    ):
198        if (
199            media._content_length is None
200            or media._content_type is None
201            or media._content_sha256_hash is None
202            or media._content_bytes is None
203        ):
204            return
205
206        upload_url_response = self._request_with_backoff(
207            self._api_client.media.get_upload_url,
208            request=GetMediaUploadUrlRequest(
209                contentLength=media._content_length,
210                contentType=media._content_type,
211                sha256Hash=media._content_sha256_hash,
212                field=field,
213                traceId=trace_id,
214                observationId=observation_id,
215            ),
216        )
217
218        upload_url = upload_url_response.upload_url
219        media._media_id = upload_url_response.media_id  # Important as this is will be used in the media reference string in serializer
220
221        if upload_url is not None:
222            self._log.debug(f"Scheduling upload for {media._media_id}")
223            self._queue.put(
224                item={
225                    "upload_url": upload_url,
226                    "media_id": media._media_id,
227                    "content_bytes": media._content_bytes,
228                    "content_type": media._content_type,
229                    "content_sha256_hash": media._content_sha256_hash,
230                },
231                block=True,
232                timeout=1,
233            )
234
235        else:
236            self._log.debug(f"Media {media._media_id} already uploaded")
237
238    def _process_upload_media_job(
239        self,
240        *,
241        data: UploadMediaJob,
242    ):
243        upload_start_time = time.time()
244        upload_response = self._request_with_backoff(
245            requests.put,
246            data["upload_url"],
247            headers={
248                "Content-Type": data["content_type"],
249                "x-amz-checksum-sha256": data["content_sha256_hash"],
250                "x-ms-blob-type": "BlockBlob",
251            },
252            data=data["content_bytes"],
253        )
254        upload_time_ms = int((time.time() - upload_start_time) * 1000)
255
256        self._request_with_backoff(
257            self._api_client.media.patch,
258            media_id=data["media_id"],
259            request=PatchMediaBody(
260                uploadedAt=_get_timestamp(),
261                uploadHttpStatus=upload_response.status_code,
262                uploadHttpError=upload_response.text,
263                uploadTimeMs=upload_time_ms,
264            ),
265        )
266
267        self._log.debug(
268            f"Media upload completed for {data['media_id']} in {upload_time_ms}ms"
269        )
270
271    def _request_with_backoff(
272        self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
273    ) -> T:
274        def _should_give_up(e: Exception) -> bool:
275            if isinstance(e, ApiError):
276                return (
277                    e.status_code is not None
278                    and 400 <= e.status_code < 500
279                    and e.status_code != 429
280                )
281            if isinstance(e, requests.exceptions.RequestException):
282                return (
283                    e.response is not None
284                    and e.response.status_code < 500
285                    and e.response.status_code != 429
286                )
287            return False
288
289        @backoff.on_exception(
290            backoff.expo,
291            Exception,
292            max_tries=self._max_retries,
293            giveup=_should_give_up,
294            logger=None,
295        )
296        def execute_task_with_backoff() -> T:
297            return func(*args, **kwargs)
298
299        return execute_task_with_backoff()
MediaManager( *, api_client: langfuse.api.client.FernLangfuse, media_upload_queue: queue.Queue, max_retries: Optional[int] = 3)
26    def __init__(
27        self,
28        *,
29        api_client: FernLangfuse,
30        media_upload_queue: Queue,
31        max_retries: Optional[int] = 3,
32    ):
33        self._api_client = api_client
34        self._queue = media_upload_queue
35        self._max_retries = max_retries
def process_next_media_upload(self):
37    def process_next_media_upload(self):
38        try:
39            upload_job = self._queue.get(block=True, timeout=1)
40            self._log.debug(f"Processing upload for {upload_job['media_id']}")
41            self._process_upload_media_job(data=upload_job)
42
43            self._queue.task_done()
44        except Empty:
45            self._log.debug("Media upload queue is empty")
46            pass
47        except Exception as e:
48            self._log.error(f"Error uploading media: {e}")
49            self._queue.task_done()
def process_media_in_event(self, event: dict):
51    def process_media_in_event(self, event: dict):
52        try:
53            if "body" not in event:
54                return
55
56            body = event["body"]
57            trace_id = body.get("traceId", None) or (
58                body.get("id", None)
59                if "type" in event and "trace" in event["type"]
60                else None
61            )
62
63            if trace_id is None:
64                raise ValueError("trace_id is required for media upload")
65
66            observation_id = (
67                body.get("id", None)
68                if "type" in event
69                and ("generation" in event["type"] or "span" in event["type"])
70                else None
71            )
72
73            multimodal_fields = ["input", "output", "metadata"]
74
75            for field in multimodal_fields:
76                if field in body:
77                    processed_data = self._find_and_process_media(
78                        data=body[field],
79                        trace_id=trace_id,
80                        observation_id=observation_id,
81                        field=field,
82                    )
83
84                    body[field] = processed_data
85
86        except Exception as e:
87            self._log.error(f"Error processing multimodal event: {e}")