langfuse._task_manager.media_manager
1import logging 2import time 3from queue import Empty, Queue 4from typing import Any, Callable, Optional, TypeVar 5 6import backoff 7import requests 8from typing_extensions import ParamSpec 9 10from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody 11from langfuse.api.client import FernLangfuse 12from langfuse.api.core import ApiError 13from langfuse.media import LangfuseMedia 14from langfuse.utils import _get_timestamp 15 16from .media_upload_queue import UploadMediaJob 17 18T = TypeVar("T") 19P = ParamSpec("P") 20 21 22class MediaManager: 23 _log = logging.getLogger(__name__) 24 25 def __init__( 26 self, 27 *, 28 api_client: FernLangfuse, 29 media_upload_queue: Queue, 30 max_retries: Optional[int] = 3, 31 ): 32 self._api_client = api_client 33 self._queue = media_upload_queue 34 self._max_retries = max_retries 35 36 def process_next_media_upload(self): 37 try: 38 upload_job = self._queue.get(block=True, timeout=1) 39 self._log.debug(f"Processing upload for {upload_job['media_id']}") 40 self._process_upload_media_job(data=upload_job) 41 42 self._queue.task_done() 43 except Empty: 44 self._log.debug("Media upload queue is empty") 45 pass 46 except Exception as e: 47 self._log.error(f"Error uploading media: {e}") 48 self._queue.task_done() 49 50 def process_media_in_event(self, event: dict): 51 try: 52 if "body" not in event: 53 return 54 55 body = event["body"] 56 trace_id = body.get("traceId", None) or ( 57 body.get("id", None) 58 if "type" in event and "trace" in event["type"] 59 else None 60 ) 61 62 if trace_id is None: 63 raise ValueError("trace_id is required for media upload") 64 65 observation_id = ( 66 body.get("id", None) 67 if "type" in event 68 and ("generation" in event["type"] or "span" in event["type"]) 69 else None 70 ) 71 72 multimodal_fields = ["input", "output", "metadata"] 73 74 for field in multimodal_fields: 75 if field in body: 76 processed_data = self._find_and_process_media( 77 data=body[field], 78 trace_id=trace_id, 79 observation_id=observation_id, 80 field=field, 81 ) 82 83 body[field] = processed_data 84 85 except Exception as e: 86 self._log.error(f"Error processing multimodal event: {e}") 87 88 def _find_and_process_media( 89 self, 90 *, 91 data: Any, 92 trace_id: str, 93 observation_id: Optional[str], 94 field: str, 95 ): 96 seen = set() 97 max_levels = 10 98 99 def _process_data_recursively(data: Any, level: int): 100 if id(data) in seen or level > max_levels: 101 return data 102 103 seen.add(id(data)) 104 105 if isinstance(data, LangfuseMedia): 106 self._process_media( 107 media=data, 108 trace_id=trace_id, 109 observation_id=observation_id, 110 field=field, 111 ) 112 113 return data 114 115 if isinstance(data, str) and data.startswith("data:"): 116 media = LangfuseMedia( 117 obj=data, 118 base64_data_uri=data, 119 ) 120 121 self._process_media( 122 media=media, 123 trace_id=trace_id, 124 observation_id=observation_id, 125 field=field, 126 ) 127 128 return media 129 130 # Anthropic 131 if ( 132 isinstance(data, dict) 133 and "type" in data 134 and data["type"] == "base64" 135 and "media_type" in data 136 and "data" in data 137 ): 138 media = LangfuseMedia( 139 base64_data_uri=f"data:{data['media_type']};base64," + data["data"], 140 ) 141 142 self._process_media( 143 media=media, 144 trace_id=trace_id, 145 observation_id=observation_id, 146 field=field, 147 ) 148 149 data["data"] = media 150 151 return data 152 153 # Vertex 154 if ( 155 isinstance(data, dict) 156 and "type" in data 157 and data["type"] == "media" 158 and "mime_type" in data 159 and "data" in data 160 ): 161 media = LangfuseMedia( 162 base64_data_uri=f"data:{data['mime_type']};base64," + data["data"], 163 ) 164 165 self._process_media( 166 media=media, 167 trace_id=trace_id, 168 observation_id=observation_id, 169 field=field, 170 ) 171 172 data["data"] = media 173 174 return data 175 176 if isinstance(data, list): 177 return [_process_data_recursively(item, level + 1) for item in data] 178 179 if isinstance(data, dict): 180 return { 181 key: _process_data_recursively(value, level + 1) 182 for key, value in data.items() 183 } 184 185 return data 186 187 return _process_data_recursively(data, 1) 188 189 def _process_media( 190 self, 191 *, 192 media: LangfuseMedia, 193 trace_id: str, 194 observation_id: Optional[str], 195 field: str, 196 ): 197 if ( 198 media._content_length is None 199 or media._content_type is None 200 or media._content_sha256_hash is None 201 or media._content_bytes is None 202 ): 203 return 204 205 upload_url_response = self._request_with_backoff( 206 self._api_client.media.get_upload_url, 207 request=GetMediaUploadUrlRequest( 208 contentLength=media._content_length, 209 contentType=media._content_type, 210 sha256Hash=media._content_sha256_hash, 211 field=field, 212 traceId=trace_id, 213 observationId=observation_id, 214 ), 215 ) 216 217 upload_url = upload_url_response.upload_url 218 media._media_id = upload_url_response.media_id # Important as this is will be used in the media reference string in serializer 219 220 if upload_url is not None: 221 self._log.debug(f"Scheduling upload for {media._media_id}") 222 self._queue.put( 223 item={ 224 "upload_url": upload_url, 225 "media_id": media._media_id, 226 "content_bytes": media._content_bytes, 227 "content_type": media._content_type, 228 "content_sha256_hash": media._content_sha256_hash, 229 }, 230 block=True, 231 timeout=1, 232 ) 233 234 else: 235 self._log.debug(f"Media {media._media_id} already uploaded") 236 237 def _process_upload_media_job( 238 self, 239 *, 240 data: UploadMediaJob, 241 ): 242 upload_start_time = time.time() 243 upload_response = self._request_with_backoff( 244 requests.put, 245 data["upload_url"], 246 headers={ 247 "Content-Type": data["content_type"], 248 "x-amz-checksum-sha256": data["content_sha256_hash"], 249 "x-ms-blob-type": "BlockBlob", 250 }, 251 data=data["content_bytes"], 252 ) 253 upload_time_ms = int((time.time() - upload_start_time) * 1000) 254 255 self._request_with_backoff( 256 self._api_client.media.patch, 257 media_id=data["media_id"], 258 request=PatchMediaBody( 259 uploadedAt=_get_timestamp(), 260 uploadHttpStatus=upload_response.status_code, 261 uploadHttpError=upload_response.text, 262 uploadTimeMs=upload_time_ms, 263 ), 264 ) 265 266 self._log.debug( 267 f"Media upload completed for {data['media_id']} in {upload_time_ms}ms" 268 ) 269 270 def _request_with_backoff( 271 self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs 272 ) -> T: 273 def _should_give_up(e: Exception) -> bool: 274 if isinstance(e, ApiError): 275 return ( 276 e.status_code is not None 277 and 400 <= e.status_code < 500 278 and e.status_code != 429 279 ) 280 if isinstance(e, requests.exceptions.RequestException): 281 return ( 282 e.response is not None 283 and e.response.status_code < 500 284 and e.response.status_code != 429 285 ) 286 return False 287 288 @backoff.on_exception( 289 backoff.expo, 290 Exception, 291 max_tries=self._max_retries, 292 giveup=_should_give_up, 293 logger=None, 294 ) 295 def execute_task_with_backoff() -> T: 296 return func(*args, **kwargs) 297 298 return execute_task_with_backoff()
P =
~P
class
MediaManager:
23class MediaManager: 24 _log = logging.getLogger(__name__) 25 26 def __init__( 27 self, 28 *, 29 api_client: FernLangfuse, 30 media_upload_queue: Queue, 31 max_retries: Optional[int] = 3, 32 ): 33 self._api_client = api_client 34 self._queue = media_upload_queue 35 self._max_retries = max_retries 36 37 def process_next_media_upload(self): 38 try: 39 upload_job = self._queue.get(block=True, timeout=1) 40 self._log.debug(f"Processing upload for {upload_job['media_id']}") 41 self._process_upload_media_job(data=upload_job) 42 43 self._queue.task_done() 44 except Empty: 45 self._log.debug("Media upload queue is empty") 46 pass 47 except Exception as e: 48 self._log.error(f"Error uploading media: {e}") 49 self._queue.task_done() 50 51 def process_media_in_event(self, event: dict): 52 try: 53 if "body" not in event: 54 return 55 56 body = event["body"] 57 trace_id = body.get("traceId", None) or ( 58 body.get("id", None) 59 if "type" in event and "trace" in event["type"] 60 else None 61 ) 62 63 if trace_id is None: 64 raise ValueError("trace_id is required for media upload") 65 66 observation_id = ( 67 body.get("id", None) 68 if "type" in event 69 and ("generation" in event["type"] or "span" in event["type"]) 70 else None 71 ) 72 73 multimodal_fields = ["input", "output", "metadata"] 74 75 for field in multimodal_fields: 76 if field in body: 77 processed_data = self._find_and_process_media( 78 data=body[field], 79 trace_id=trace_id, 80 observation_id=observation_id, 81 field=field, 82 ) 83 84 body[field] = processed_data 85 86 except Exception as e: 87 self._log.error(f"Error processing multimodal event: {e}") 88 89 def _find_and_process_media( 90 self, 91 *, 92 data: Any, 93 trace_id: str, 94 observation_id: Optional[str], 95 field: str, 96 ): 97 seen = set() 98 max_levels = 10 99 100 def _process_data_recursively(data: Any, level: int): 101 if id(data) in seen or level > max_levels: 102 return data 103 104 seen.add(id(data)) 105 106 if isinstance(data, LangfuseMedia): 107 self._process_media( 108 media=data, 109 trace_id=trace_id, 110 observation_id=observation_id, 111 field=field, 112 ) 113 114 return data 115 116 if isinstance(data, str) and data.startswith("data:"): 117 media = LangfuseMedia( 118 obj=data, 119 base64_data_uri=data, 120 ) 121 122 self._process_media( 123 media=media, 124 trace_id=trace_id, 125 observation_id=observation_id, 126 field=field, 127 ) 128 129 return media 130 131 # Anthropic 132 if ( 133 isinstance(data, dict) 134 and "type" in data 135 and data["type"] == "base64" 136 and "media_type" in data 137 and "data" in data 138 ): 139 media = LangfuseMedia( 140 base64_data_uri=f"data:{data['media_type']};base64," + data["data"], 141 ) 142 143 self._process_media( 144 media=media, 145 trace_id=trace_id, 146 observation_id=observation_id, 147 field=field, 148 ) 149 150 data["data"] = media 151 152 return data 153 154 # Vertex 155 if ( 156 isinstance(data, dict) 157 and "type" in data 158 and data["type"] == "media" 159 and "mime_type" in data 160 and "data" in data 161 ): 162 media = LangfuseMedia( 163 base64_data_uri=f"data:{data['mime_type']};base64," + data["data"], 164 ) 165 166 self._process_media( 167 media=media, 168 trace_id=trace_id, 169 observation_id=observation_id, 170 field=field, 171 ) 172 173 data["data"] = media 174 175 return data 176 177 if isinstance(data, list): 178 return [_process_data_recursively(item, level + 1) for item in data] 179 180 if isinstance(data, dict): 181 return { 182 key: _process_data_recursively(value, level + 1) 183 for key, value in data.items() 184 } 185 186 return data 187 188 return _process_data_recursively(data, 1) 189 190 def _process_media( 191 self, 192 *, 193 media: LangfuseMedia, 194 trace_id: str, 195 observation_id: Optional[str], 196 field: str, 197 ): 198 if ( 199 media._content_length is None 200 or media._content_type is None 201 or media._content_sha256_hash is None 202 or media._content_bytes is None 203 ): 204 return 205 206 upload_url_response = self._request_with_backoff( 207 self._api_client.media.get_upload_url, 208 request=GetMediaUploadUrlRequest( 209 contentLength=media._content_length, 210 contentType=media._content_type, 211 sha256Hash=media._content_sha256_hash, 212 field=field, 213 traceId=trace_id, 214 observationId=observation_id, 215 ), 216 ) 217 218 upload_url = upload_url_response.upload_url 219 media._media_id = upload_url_response.media_id # Important as this is will be used in the media reference string in serializer 220 221 if upload_url is not None: 222 self._log.debug(f"Scheduling upload for {media._media_id}") 223 self._queue.put( 224 item={ 225 "upload_url": upload_url, 226 "media_id": media._media_id, 227 "content_bytes": media._content_bytes, 228 "content_type": media._content_type, 229 "content_sha256_hash": media._content_sha256_hash, 230 }, 231 block=True, 232 timeout=1, 233 ) 234 235 else: 236 self._log.debug(f"Media {media._media_id} already uploaded") 237 238 def _process_upload_media_job( 239 self, 240 *, 241 data: UploadMediaJob, 242 ): 243 upload_start_time = time.time() 244 upload_response = self._request_with_backoff( 245 requests.put, 246 data["upload_url"], 247 headers={ 248 "Content-Type": data["content_type"], 249 "x-amz-checksum-sha256": data["content_sha256_hash"], 250 "x-ms-blob-type": "BlockBlob", 251 }, 252 data=data["content_bytes"], 253 ) 254 upload_time_ms = int((time.time() - upload_start_time) * 1000) 255 256 self._request_with_backoff( 257 self._api_client.media.patch, 258 media_id=data["media_id"], 259 request=PatchMediaBody( 260 uploadedAt=_get_timestamp(), 261 uploadHttpStatus=upload_response.status_code, 262 uploadHttpError=upload_response.text, 263 uploadTimeMs=upload_time_ms, 264 ), 265 ) 266 267 self._log.debug( 268 f"Media upload completed for {data['media_id']} in {upload_time_ms}ms" 269 ) 270 271 def _request_with_backoff( 272 self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs 273 ) -> T: 274 def _should_give_up(e: Exception) -> bool: 275 if isinstance(e, ApiError): 276 return ( 277 e.status_code is not None 278 and 400 <= e.status_code < 500 279 and e.status_code != 429 280 ) 281 if isinstance(e, requests.exceptions.RequestException): 282 return ( 283 e.response is not None 284 and e.response.status_code < 500 285 and e.response.status_code != 429 286 ) 287 return False 288 289 @backoff.on_exception( 290 backoff.expo, 291 Exception, 292 max_tries=self._max_retries, 293 giveup=_should_give_up, 294 logger=None, 295 ) 296 def execute_task_with_backoff() -> T: 297 return func(*args, **kwargs) 298 299 return execute_task_with_backoff()
MediaManager( *, api_client: langfuse.api.client.FernLangfuse, media_upload_queue: queue.Queue, max_retries: Optional[int] = 3)
def
process_next_media_upload(self):
37 def process_next_media_upload(self): 38 try: 39 upload_job = self._queue.get(block=True, timeout=1) 40 self._log.debug(f"Processing upload for {upload_job['media_id']}") 41 self._process_upload_media_job(data=upload_job) 42 43 self._queue.task_done() 44 except Empty: 45 self._log.debug("Media upload queue is empty") 46 pass 47 except Exception as e: 48 self._log.error(f"Error uploading media: {e}") 49 self._queue.task_done()
def
process_media_in_event(self, event: dict):
51 def process_media_in_event(self, event: dict): 52 try: 53 if "body" not in event: 54 return 55 56 body = event["body"] 57 trace_id = body.get("traceId", None) or ( 58 body.get("id", None) 59 if "type" in event and "trace" in event["type"] 60 else None 61 ) 62 63 if trace_id is None: 64 raise ValueError("trace_id is required for media upload") 65 66 observation_id = ( 67 body.get("id", None) 68 if "type" in event 69 and ("generation" in event["type"] or "span" in event["type"]) 70 else None 71 ) 72 73 multimodal_fields = ["input", "output", "metadata"] 74 75 for field in multimodal_fields: 76 if field in body: 77 processed_data = self._find_and_process_media( 78 data=body[field], 79 trace_id=trace_id, 80 observation_id=observation_id, 81 field=field, 82 ) 83 84 body[field] = processed_data 85 86 except Exception as e: 87 self._log.error(f"Error processing multimodal event: {e}")