langfuse.media
This module contains the LangfuseMedia class, which is used to wrap media objects for upload to Langfuse.
1"""This module contains the LangfuseMedia class, which is used to wrap media objects for upload to Langfuse.""" 2 3import base64 4import hashlib 5import logging 6import os 7import re 8import requests 9from typing import Optional, cast, Tuple, Any, TypeVar, Literal 10 11from langfuse.api import MediaContentType 12from langfuse.types import ParsedMediaReference 13 14T = TypeVar("T") 15 16 17class LangfuseMedia: 18 """A class for wrapping media objects for upload to Langfuse. 19 20 This class handles the preparation and formatting of media content for Langfuse, 21 supporting both base64 data URIs and raw content bytes. 22 23 Args: 24 obj (Optional[object]): The source object to be wrapped. Can be accessed via the `obj` attribute. 25 base64_data_uri (Optional[str]): A base64-encoded data URI containing the media content 26 and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ..."). 27 content_type (Optional[str]): The MIME type of the media content when providing raw bytes. 28 content_bytes (Optional[bytes]): Raw bytes of the media content. 29 file_path (Optional[str]): The path to the file containing the media content. For relative paths, 30 the current working directory is used. 31 32 Raises: 33 ValueError: If neither base64_data_uri or the combination of content_bytes 34 and content_type is provided. 35 """ 36 37 obj: object 38 39 _log = logging.getLogger(__name__) 40 _content_bytes: Optional[bytes] 41 _content_type: Optional[MediaContentType] 42 _source: Optional[str] 43 _media_id: Optional[str] 44 45 def __init__( 46 self, 47 *, 48 obj: Optional[object] = None, 49 base64_data_uri: Optional[str] = None, 50 content_type: Optional[MediaContentType] = None, 51 content_bytes: Optional[bytes] = None, 52 file_path: Optional[str] = None, 53 ): 54 """Initialize a LangfuseMedia object. 55 56 Args: 57 obj: The object to wrap. 58 59 base64_data_uri: A base64-encoded data URI containing the media content 60 and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ..."). 61 content_type: The MIME type of the media content when providing raw bytes or reading from a file. 62 content_bytes: Raw bytes of the media content. 63 file_path: The path to the file containing the media content. For relative paths, 64 the current working directory is used. 65 """ 66 self.obj = obj 67 self._media_id = None 68 69 if base64_data_uri is not None: 70 parsed_data = self._parse_base64_data_uri(base64_data_uri) 71 self._content_bytes, self._content_type = parsed_data 72 self._source = "base64_data_uri" 73 74 elif content_bytes is not None and content_type is not None: 75 self._content_type = content_type 76 self._content_bytes = content_bytes 77 self._source = "bytes" 78 elif ( 79 file_path is not None 80 and content_type is not None 81 and os.path.exists(file_path) 82 ): 83 self._content_bytes = self._read_file(file_path) 84 self._content_type = content_type if self._content_bytes else None 85 self._source = "file" if self._content_bytes else None 86 else: 87 self._log.error( 88 "base64_data_uri, or content_bytes and content_type, or file_path must be provided to LangfuseMedia" 89 ) 90 91 self._content_bytes = None 92 self._content_type = None 93 self._source = None 94 95 def _read_file(self, file_path: str) -> Optional[bytes]: 96 try: 97 with open(file_path, "rb") as file: 98 return file.read() 99 except Exception as e: 100 self._log.error(f"Error reading file at path {file_path}", exc_info=e) 101 102 return None 103 104 @property 105 def _content_length(self) -> Optional[int]: 106 return len(self._content_bytes) if self._content_bytes else None 107 108 @property 109 def _content_sha256_hash(self) -> Optional[str]: 110 if self._content_bytes is None: 111 return None 112 113 sha256_hash_bytes = hashlib.sha256(self._content_bytes).digest() 114 115 return base64.b64encode(sha256_hash_bytes).decode("utf-8") 116 117 @property 118 def _reference_string(self) -> Optional[str]: 119 if self._content_type is None or self._source is None or self._media_id is None: 120 return None 121 122 return f"@@@langfuseMedia:type={self._content_type}|id={self._media_id}|source={self._source}@@@" 123 124 @staticmethod 125 def parse_reference_string(reference_string: str) -> ParsedMediaReference: 126 """Parse a media reference string into a ParsedMediaReference. 127 128 Example reference string: 129 "@@@langfuseMedia:type=image/jpeg|id=some-uuid|source=base64_data_uri@@@" 130 131 Args: 132 reference_string: The reference string to parse. 133 134 Returns: 135 A TypedDict with the media_id, source, and content_type. 136 137 Raises: 138 ValueError: If the reference string is empty or not a string. 139 ValueError: If the reference string does not start with "@@@langfuseMedia:type=". 140 ValueError: If the reference string does not end with "@@@". 141 ValueError: If the reference string is missing required fields. 142 """ 143 if not reference_string: 144 raise ValueError("Reference string is empty") 145 146 if not isinstance(reference_string, str): 147 raise ValueError("Reference string is not a string") 148 149 if not reference_string.startswith("@@@langfuseMedia:type="): 150 raise ValueError( 151 "Reference string does not start with '@@@langfuseMedia:type='" 152 ) 153 154 if not reference_string.endswith("@@@"): 155 raise ValueError("Reference string does not end with '@@@'") 156 157 content = reference_string[len("@@@langfuseMedia:") :].rstrip("@@@") 158 159 # Split into key-value pairs 160 pairs = content.split("|") 161 parsed_data = {} 162 163 for pair in pairs: 164 key, value = pair.split("=", 1) 165 parsed_data[key] = value 166 167 # Verify all required fields are present 168 if not all(key in parsed_data for key in ["type", "id", "source"]): 169 raise ValueError("Missing required fields in reference string") 170 171 return ParsedMediaReference( 172 media_id=parsed_data["id"], 173 source=parsed_data["source"], 174 content_type=parsed_data["type"], 175 ) 176 177 def _parse_base64_data_uri( 178 self, data: str 179 ) -> Tuple[Optional[bytes], Optional[MediaContentType]]: 180 # Example data URI: data:image/jpeg;base64,/9j/4AAQ... 181 try: 182 if not data or not isinstance(data, str): 183 raise ValueError("Data URI is not a string") 184 185 if not data.startswith("data:"): 186 raise ValueError("Data URI does not start with 'data:'") 187 188 header, actual_data = data[5:].split(",", 1) 189 if not header or not actual_data: 190 raise ValueError("Invalid URI") 191 192 # Split header into parts and check for base64 193 header_parts = header.split(";") 194 if "base64" not in header_parts: 195 raise ValueError("Data is not base64 encoded") 196 197 # Content type is the first part 198 content_type = header_parts[0] 199 if not content_type: 200 raise ValueError("Content type is empty") 201 202 return base64.b64decode(actual_data), cast(MediaContentType, content_type) 203 204 except Exception as e: 205 self._log.error("Error parsing base64 data URI", exc_info=e) 206 207 return None, None 208 209 @staticmethod 210 def resolve_media_references( 211 *, 212 obj: T, 213 langfuse_client: Any, 214 resolve_with: Literal["base64_data_uri"], 215 max_depth: int = 10, 216 content_fetch_timeout_seconds: int = 10, 217 ) -> T: 218 """Replace media reference strings in an object with base64 data URIs. 219 220 This method recursively traverses an object (up to max_depth) looking for media reference strings 221 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 222 the provided Langfuse client and replaces the reference string with a base64 data URI. 223 224 If fetching media content fails for a reference string, a warning is logged and the reference 225 string is left unchanged. 226 227 Args: 228 obj: The object to process. Can be a primitive value, array, or nested object. 229 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 230 langfuse_client: Langfuse client instance used to fetch media content. 231 resolve_with: The representation of the media content to replace the media reference string with. 232 Currently only "base64_data_uri" is supported. 233 max_depth: Optional. Default is 10. The maximum depth to traverse the object. 234 235 Returns: 236 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 237 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 238 239 Example: 240 obj = { 241 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 242 "nested": { 243 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 244 } 245 } 246 247 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 248 249 # Result: 250 # { 251 # "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", 252 # "nested": { 253 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 254 # } 255 # } 256 """ 257 258 def traverse(obj: Any, depth: int) -> Any: 259 if depth > max_depth: 260 return obj 261 262 # Handle string 263 if isinstance(obj, str): 264 regex = r"@@@langfuseMedia:.+?@@@" 265 reference_string_matches = re.findall(regex, obj) 266 if len(reference_string_matches) == 0: 267 return obj 268 269 result = obj 270 reference_string_to_media_content = {} 271 272 for reference_string in reference_string_matches: 273 try: 274 parsed_media_reference = LangfuseMedia.parse_reference_string( 275 reference_string 276 ) 277 media_data = langfuse_client.fetch_media( 278 parsed_media_reference["media_id"] 279 ).data 280 media_content = requests.get( 281 media_data.url, timeout=content_fetch_timeout_seconds 282 ) 283 if not media_content.ok: 284 raise Exception("Failed to fetch media content") 285 286 base64_media_content = base64.b64encode( 287 media_content.content 288 ).decode() 289 base64_data_uri = f"data:{media_data.content_type};base64,{base64_media_content}" 290 291 reference_string_to_media_content[reference_string] = ( 292 base64_data_uri 293 ) 294 except Exception as e: 295 LangfuseMedia._log.warning( 296 f"Error fetching media content for reference string {reference_string}: {e}" 297 ) 298 # Do not replace the reference string if there's an error 299 continue 300 301 for ref_str, media_content in reference_string_to_media_content.items(): 302 result = result.replace(ref_str, media_content) 303 304 return result 305 306 # Handle arrays 307 if isinstance(obj, list): 308 return [traverse(item, depth + 1) for item in obj] 309 310 # Handle dictionaries 311 if isinstance(obj, dict): 312 return {key: traverse(value, depth + 1) for key, value in obj.items()} 313 314 # Handle objects: 315 if hasattr(obj, "__dict__"): 316 return { 317 key: traverse(value, depth + 1) 318 for key, value in obj.__dict__.items() 319 } 320 321 return obj 322 323 return traverse(obj, 0)
18class LangfuseMedia: 19 """A class for wrapping media objects for upload to Langfuse. 20 21 This class handles the preparation and formatting of media content for Langfuse, 22 supporting both base64 data URIs and raw content bytes. 23 24 Args: 25 obj (Optional[object]): The source object to be wrapped. Can be accessed via the `obj` attribute. 26 base64_data_uri (Optional[str]): A base64-encoded data URI containing the media content 27 and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ..."). 28 content_type (Optional[str]): The MIME type of the media content when providing raw bytes. 29 content_bytes (Optional[bytes]): Raw bytes of the media content. 30 file_path (Optional[str]): The path to the file containing the media content. For relative paths, 31 the current working directory is used. 32 33 Raises: 34 ValueError: If neither base64_data_uri or the combination of content_bytes 35 and content_type is provided. 36 """ 37 38 obj: object 39 40 _log = logging.getLogger(__name__) 41 _content_bytes: Optional[bytes] 42 _content_type: Optional[MediaContentType] 43 _source: Optional[str] 44 _media_id: Optional[str] 45 46 def __init__( 47 self, 48 *, 49 obj: Optional[object] = None, 50 base64_data_uri: Optional[str] = None, 51 content_type: Optional[MediaContentType] = None, 52 content_bytes: Optional[bytes] = None, 53 file_path: Optional[str] = None, 54 ): 55 """Initialize a LangfuseMedia object. 56 57 Args: 58 obj: The object to wrap. 59 60 base64_data_uri: A base64-encoded data URI containing the media content 61 and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ..."). 62 content_type: The MIME type of the media content when providing raw bytes or reading from a file. 63 content_bytes: Raw bytes of the media content. 64 file_path: The path to the file containing the media content. For relative paths, 65 the current working directory is used. 66 """ 67 self.obj = obj 68 self._media_id = None 69 70 if base64_data_uri is not None: 71 parsed_data = self._parse_base64_data_uri(base64_data_uri) 72 self._content_bytes, self._content_type = parsed_data 73 self._source = "base64_data_uri" 74 75 elif content_bytes is not None and content_type is not None: 76 self._content_type = content_type 77 self._content_bytes = content_bytes 78 self._source = "bytes" 79 elif ( 80 file_path is not None 81 and content_type is not None 82 and os.path.exists(file_path) 83 ): 84 self._content_bytes = self._read_file(file_path) 85 self._content_type = content_type if self._content_bytes else None 86 self._source = "file" if self._content_bytes else None 87 else: 88 self._log.error( 89 "base64_data_uri, or content_bytes and content_type, or file_path must be provided to LangfuseMedia" 90 ) 91 92 self._content_bytes = None 93 self._content_type = None 94 self._source = None 95 96 def _read_file(self, file_path: str) -> Optional[bytes]: 97 try: 98 with open(file_path, "rb") as file: 99 return file.read() 100 except Exception as e: 101 self._log.error(f"Error reading file at path {file_path}", exc_info=e) 102 103 return None 104 105 @property 106 def _content_length(self) -> Optional[int]: 107 return len(self._content_bytes) if self._content_bytes else None 108 109 @property 110 def _content_sha256_hash(self) -> Optional[str]: 111 if self._content_bytes is None: 112 return None 113 114 sha256_hash_bytes = hashlib.sha256(self._content_bytes).digest() 115 116 return base64.b64encode(sha256_hash_bytes).decode("utf-8") 117 118 @property 119 def _reference_string(self) -> Optional[str]: 120 if self._content_type is None or self._source is None or self._media_id is None: 121 return None 122 123 return f"@@@langfuseMedia:type={self._content_type}|id={self._media_id}|source={self._source}@@@" 124 125 @staticmethod 126 def parse_reference_string(reference_string: str) -> ParsedMediaReference: 127 """Parse a media reference string into a ParsedMediaReference. 128 129 Example reference string: 130 "@@@langfuseMedia:type=image/jpeg|id=some-uuid|source=base64_data_uri@@@" 131 132 Args: 133 reference_string: The reference string to parse. 134 135 Returns: 136 A TypedDict with the media_id, source, and content_type. 137 138 Raises: 139 ValueError: If the reference string is empty or not a string. 140 ValueError: If the reference string does not start with "@@@langfuseMedia:type=". 141 ValueError: If the reference string does not end with "@@@". 142 ValueError: If the reference string is missing required fields. 143 """ 144 if not reference_string: 145 raise ValueError("Reference string is empty") 146 147 if not isinstance(reference_string, str): 148 raise ValueError("Reference string is not a string") 149 150 if not reference_string.startswith("@@@langfuseMedia:type="): 151 raise ValueError( 152 "Reference string does not start with '@@@langfuseMedia:type='" 153 ) 154 155 if not reference_string.endswith("@@@"): 156 raise ValueError("Reference string does not end with '@@@'") 157 158 content = reference_string[len("@@@langfuseMedia:") :].rstrip("@@@") 159 160 # Split into key-value pairs 161 pairs = content.split("|") 162 parsed_data = {} 163 164 for pair in pairs: 165 key, value = pair.split("=", 1) 166 parsed_data[key] = value 167 168 # Verify all required fields are present 169 if not all(key in parsed_data for key in ["type", "id", "source"]): 170 raise ValueError("Missing required fields in reference string") 171 172 return ParsedMediaReference( 173 media_id=parsed_data["id"], 174 source=parsed_data["source"], 175 content_type=parsed_data["type"], 176 ) 177 178 def _parse_base64_data_uri( 179 self, data: str 180 ) -> Tuple[Optional[bytes], Optional[MediaContentType]]: 181 # Example data URI: data:image/jpeg;base64,/9j/4AAQ... 182 try: 183 if not data or not isinstance(data, str): 184 raise ValueError("Data URI is not a string") 185 186 if not data.startswith("data:"): 187 raise ValueError("Data URI does not start with 'data:'") 188 189 header, actual_data = data[5:].split(",", 1) 190 if not header or not actual_data: 191 raise ValueError("Invalid URI") 192 193 # Split header into parts and check for base64 194 header_parts = header.split(";") 195 if "base64" not in header_parts: 196 raise ValueError("Data is not base64 encoded") 197 198 # Content type is the first part 199 content_type = header_parts[0] 200 if not content_type: 201 raise ValueError("Content type is empty") 202 203 return base64.b64decode(actual_data), cast(MediaContentType, content_type) 204 205 except Exception as e: 206 self._log.error("Error parsing base64 data URI", exc_info=e) 207 208 return None, None 209 210 @staticmethod 211 def resolve_media_references( 212 *, 213 obj: T, 214 langfuse_client: Any, 215 resolve_with: Literal["base64_data_uri"], 216 max_depth: int = 10, 217 content_fetch_timeout_seconds: int = 10, 218 ) -> T: 219 """Replace media reference strings in an object with base64 data URIs. 220 221 This method recursively traverses an object (up to max_depth) looking for media reference strings 222 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 223 the provided Langfuse client and replaces the reference string with a base64 data URI. 224 225 If fetching media content fails for a reference string, a warning is logged and the reference 226 string is left unchanged. 227 228 Args: 229 obj: The object to process. Can be a primitive value, array, or nested object. 230 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 231 langfuse_client: Langfuse client instance used to fetch media content. 232 resolve_with: The representation of the media content to replace the media reference string with. 233 Currently only "base64_data_uri" is supported. 234 max_depth: Optional. Default is 10. The maximum depth to traverse the object. 235 236 Returns: 237 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 238 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 239 240 Example: 241 obj = { 242 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 243 "nested": { 244 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 245 } 246 } 247 248 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 249 250 # Result: 251 # { 252 # "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", 253 # "nested": { 254 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 255 # } 256 # } 257 """ 258 259 def traverse(obj: Any, depth: int) -> Any: 260 if depth > max_depth: 261 return obj 262 263 # Handle string 264 if isinstance(obj, str): 265 regex = r"@@@langfuseMedia:.+?@@@" 266 reference_string_matches = re.findall(regex, obj) 267 if len(reference_string_matches) == 0: 268 return obj 269 270 result = obj 271 reference_string_to_media_content = {} 272 273 for reference_string in reference_string_matches: 274 try: 275 parsed_media_reference = LangfuseMedia.parse_reference_string( 276 reference_string 277 ) 278 media_data = langfuse_client.fetch_media( 279 parsed_media_reference["media_id"] 280 ).data 281 media_content = requests.get( 282 media_data.url, timeout=content_fetch_timeout_seconds 283 ) 284 if not media_content.ok: 285 raise Exception("Failed to fetch media content") 286 287 base64_media_content = base64.b64encode( 288 media_content.content 289 ).decode() 290 base64_data_uri = f"data:{media_data.content_type};base64,{base64_media_content}" 291 292 reference_string_to_media_content[reference_string] = ( 293 base64_data_uri 294 ) 295 except Exception as e: 296 LangfuseMedia._log.warning( 297 f"Error fetching media content for reference string {reference_string}: {e}" 298 ) 299 # Do not replace the reference string if there's an error 300 continue 301 302 for ref_str, media_content in reference_string_to_media_content.items(): 303 result = result.replace(ref_str, media_content) 304 305 return result 306 307 # Handle arrays 308 if isinstance(obj, list): 309 return [traverse(item, depth + 1) for item in obj] 310 311 # Handle dictionaries 312 if isinstance(obj, dict): 313 return {key: traverse(value, depth + 1) for key, value in obj.items()} 314 315 # Handle objects: 316 if hasattr(obj, "__dict__"): 317 return { 318 key: traverse(value, depth + 1) 319 for key, value in obj.__dict__.items() 320 } 321 322 return obj 323 324 return traverse(obj, 0)
A class for wrapping media objects for upload to Langfuse.
This class handles the preparation and formatting of media content for Langfuse, supporting both base64 data URIs and raw content bytes.
Arguments:
- obj (Optional[object]): The source object to be wrapped. Can be accessed via the
obj
attribute. - base64_data_uri (Optional[str]): A base64-encoded data URI containing the media content and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ...").
- content_type (Optional[str]): The MIME type of the media content when providing raw bytes.
- content_bytes (Optional[bytes]): Raw bytes of the media content.
- file_path (Optional[str]): The path to the file containing the media content. For relative paths, the current working directory is used.
Raises:
- ValueError: If neither base64_data_uri or the combination of content_bytes and content_type is provided.
46 def __init__( 47 self, 48 *, 49 obj: Optional[object] = None, 50 base64_data_uri: Optional[str] = None, 51 content_type: Optional[MediaContentType] = None, 52 content_bytes: Optional[bytes] = None, 53 file_path: Optional[str] = None, 54 ): 55 """Initialize a LangfuseMedia object. 56 57 Args: 58 obj: The object to wrap. 59 60 base64_data_uri: A base64-encoded data URI containing the media content 61 and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ..."). 62 content_type: The MIME type of the media content when providing raw bytes or reading from a file. 63 content_bytes: Raw bytes of the media content. 64 file_path: The path to the file containing the media content. For relative paths, 65 the current working directory is used. 66 """ 67 self.obj = obj 68 self._media_id = None 69 70 if base64_data_uri is not None: 71 parsed_data = self._parse_base64_data_uri(base64_data_uri) 72 self._content_bytes, self._content_type = parsed_data 73 self._source = "base64_data_uri" 74 75 elif content_bytes is not None and content_type is not None: 76 self._content_type = content_type 77 self._content_bytes = content_bytes 78 self._source = "bytes" 79 elif ( 80 file_path is not None 81 and content_type is not None 82 and os.path.exists(file_path) 83 ): 84 self._content_bytes = self._read_file(file_path) 85 self._content_type = content_type if self._content_bytes else None 86 self._source = "file" if self._content_bytes else None 87 else: 88 self._log.error( 89 "base64_data_uri, or content_bytes and content_type, or file_path must be provided to LangfuseMedia" 90 ) 91 92 self._content_bytes = None 93 self._content_type = None 94 self._source = None
Initialize a LangfuseMedia object.
Arguments:
- obj: The object to wrap.
- base64_data_uri: A base64-encoded data URI containing the media content and content type (e.g., "data:image/jpeg;base64,/9j/4AAQ...").
- content_type: The MIME type of the media content when providing raw bytes or reading from a file.
- content_bytes: Raw bytes of the media content.
- file_path: The path to the file containing the media content. For relative paths, the current working directory is used.
125 @staticmethod 126 def parse_reference_string(reference_string: str) -> ParsedMediaReference: 127 """Parse a media reference string into a ParsedMediaReference. 128 129 Example reference string: 130 "@@@langfuseMedia:type=image/jpeg|id=some-uuid|source=base64_data_uri@@@" 131 132 Args: 133 reference_string: The reference string to parse. 134 135 Returns: 136 A TypedDict with the media_id, source, and content_type. 137 138 Raises: 139 ValueError: If the reference string is empty or not a string. 140 ValueError: If the reference string does not start with "@@@langfuseMedia:type=". 141 ValueError: If the reference string does not end with "@@@". 142 ValueError: If the reference string is missing required fields. 143 """ 144 if not reference_string: 145 raise ValueError("Reference string is empty") 146 147 if not isinstance(reference_string, str): 148 raise ValueError("Reference string is not a string") 149 150 if not reference_string.startswith("@@@langfuseMedia:type="): 151 raise ValueError( 152 "Reference string does not start with '@@@langfuseMedia:type='" 153 ) 154 155 if not reference_string.endswith("@@@"): 156 raise ValueError("Reference string does not end with '@@@'") 157 158 content = reference_string[len("@@@langfuseMedia:") :].rstrip("@@@") 159 160 # Split into key-value pairs 161 pairs = content.split("|") 162 parsed_data = {} 163 164 for pair in pairs: 165 key, value = pair.split("=", 1) 166 parsed_data[key] = value 167 168 # Verify all required fields are present 169 if not all(key in parsed_data for key in ["type", "id", "source"]): 170 raise ValueError("Missing required fields in reference string") 171 172 return ParsedMediaReference( 173 media_id=parsed_data["id"], 174 source=parsed_data["source"], 175 content_type=parsed_data["type"], 176 )
Parse a media reference string into a ParsedMediaReference.
Example reference string:
"@@@langfuseMedia:type=image/jpeg|id=some-uuid|source=base64_data_uri@@@"
Arguments:
- reference_string: The reference string to parse.
Returns:
A TypedDict with the media_id, source, and content_type.
Raises:
- ValueError: If the reference string is empty or not a string.
- ValueError: If the reference string does not start with "@@@langfuseMedia:type=".
- ValueError: If the reference string does not end with "@@@".
- ValueError: If the reference string is missing required fields.
210 @staticmethod 211 def resolve_media_references( 212 *, 213 obj: T, 214 langfuse_client: Any, 215 resolve_with: Literal["base64_data_uri"], 216 max_depth: int = 10, 217 content_fetch_timeout_seconds: int = 10, 218 ) -> T: 219 """Replace media reference strings in an object with base64 data URIs. 220 221 This method recursively traverses an object (up to max_depth) looking for media reference strings 222 in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using 223 the provided Langfuse client and replaces the reference string with a base64 data URI. 224 225 If fetching media content fails for a reference string, a warning is logged and the reference 226 string is left unchanged. 227 228 Args: 229 obj: The object to process. Can be a primitive value, array, or nested object. 230 If the object has a __dict__ attribute, a dict will be returned instead of the original object type. 231 langfuse_client: Langfuse client instance used to fetch media content. 232 resolve_with: The representation of the media content to replace the media reference string with. 233 Currently only "base64_data_uri" is supported. 234 max_depth: Optional. Default is 10. The maximum depth to traverse the object. 235 236 Returns: 237 A deep copy of the input object with all media references replaced with base64 data URIs where possible. 238 If the input object has a __dict__ attribute, a dict will be returned instead of the original object type. 239 240 Example: 241 obj = { 242 "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", 243 "nested": { 244 "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" 245 } 246 } 247 248 result = await LangfuseMedia.resolve_media_references(obj, langfuse_client) 249 250 # Result: 251 # { 252 # "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", 253 # "nested": { 254 # "pdf": "data:application/pdf;base64,JVBERi0xLjcK..." 255 # } 256 # } 257 """ 258 259 def traverse(obj: Any, depth: int) -> Any: 260 if depth > max_depth: 261 return obj 262 263 # Handle string 264 if isinstance(obj, str): 265 regex = r"@@@langfuseMedia:.+?@@@" 266 reference_string_matches = re.findall(regex, obj) 267 if len(reference_string_matches) == 0: 268 return obj 269 270 result = obj 271 reference_string_to_media_content = {} 272 273 for reference_string in reference_string_matches: 274 try: 275 parsed_media_reference = LangfuseMedia.parse_reference_string( 276 reference_string 277 ) 278 media_data = langfuse_client.fetch_media( 279 parsed_media_reference["media_id"] 280 ).data 281 media_content = requests.get( 282 media_data.url, timeout=content_fetch_timeout_seconds 283 ) 284 if not media_content.ok: 285 raise Exception("Failed to fetch media content") 286 287 base64_media_content = base64.b64encode( 288 media_content.content 289 ).decode() 290 base64_data_uri = f"data:{media_data.content_type};base64,{base64_media_content}" 291 292 reference_string_to_media_content[reference_string] = ( 293 base64_data_uri 294 ) 295 except Exception as e: 296 LangfuseMedia._log.warning( 297 f"Error fetching media content for reference string {reference_string}: {e}" 298 ) 299 # Do not replace the reference string if there's an error 300 continue 301 302 for ref_str, media_content in reference_string_to_media_content.items(): 303 result = result.replace(ref_str, media_content) 304 305 return result 306 307 # Handle arrays 308 if isinstance(obj, list): 309 return [traverse(item, depth + 1) for item in obj] 310 311 # Handle dictionaries 312 if isinstance(obj, dict): 313 return {key: traverse(value, depth + 1) for key, value in obj.items()} 314 315 # Handle objects: 316 if hasattr(obj, "__dict__"): 317 return { 318 key: traverse(value, depth + 1) 319 for key, value in obj.__dict__.items() 320 } 321 322 return obj 323 324 return traverse(obj, 0)
Replace media reference strings in an object with base64 data URIs.
This method recursively traverses an object (up to max_depth) looking for media reference strings in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using the provided Langfuse client and replaces the reference string with a base64 data URI.
If fetching media content fails for a reference string, a warning is logged and the reference string is left unchanged.
Arguments:
- obj: The object to process. Can be a primitive value, array, or nested object. If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
- langfuse_client: Langfuse client instance used to fetch media content.
- resolve_with: The representation of the media content to replace the media reference string with. Currently only "base64_data_uri" is supported.
- max_depth: Optional. Default is 10. The maximum depth to traverse the object.
Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible. If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
Example:
obj = { "image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@", "nested": { "pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@" } }
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
Result:
{
"image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...",
"nested": {
"pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
}
}