langfuse.request
@private
1"""@private""" 2 3import json 4import logging 5from base64 import b64encode 6from typing import Any, List, Union 7 8import httpx 9 10from langfuse.serializer import EventSerializer 11 12 13class LangfuseClient: 14 _public_key: str 15 _secret_key: str 16 _base_url: str 17 _version: str 18 _timeout: int 19 _session: httpx.Client 20 21 def __init__( 22 self, 23 public_key: str, 24 secret_key: str, 25 base_url: str, 26 version: str, 27 timeout: int, 28 session: httpx.Client, 29 ): 30 self._public_key = public_key 31 self._secret_key = secret_key 32 self._base_url = base_url 33 self._version = version 34 self._timeout = timeout 35 self._session = session 36 37 def generate_headers(self): 38 return { 39 "Authorization": "Basic " 40 + b64encode( 41 f"{self._public_key}:{self._secret_key}".encode("utf-8") 42 ).decode("ascii"), 43 "Content-Type": "application/json", 44 "x_langfuse_sdk_name": "python", 45 "x_langfuse_sdk_version": self._version, 46 "x_langfuse_public_key": self._public_key, 47 } 48 49 def batch_post(self, **kwargs) -> httpx.Response: 50 """Post the `kwargs` to the batch API endpoint for events""" 51 log = logging.getLogger("langfuse") 52 log.debug("uploading data: %s", kwargs) 53 54 res = self.post(**kwargs) 55 return self._process_response( 56 res, success_message="data uploaded successfully", return_json=False 57 ) 58 59 def post(self, **kwargs) -> httpx.Response: 60 """Post the `kwargs` to the API""" 61 log = logging.getLogger("langfuse") 62 url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion" 63 data = json.dumps(kwargs, cls=EventSerializer) 64 log.debug("making request: %s to %s", data, url) 65 headers = self.generate_headers() 66 res = self._session.post( 67 url, content=data, headers=headers, timeout=self._timeout 68 ) 69 70 if res.status_code == 200: 71 log.debug("data uploaded successfully") 72 73 return res 74 75 def _remove_trailing_slash(self, url: str) -> str: 76 """Removes the trailing slash from a URL""" 77 if url.endswith("/"): 78 return url[:-1] 79 return url 80 81 def _process_response( 82 self, res: httpx.Response, success_message: str, *, return_json: bool = True 83 ) -> Union[httpx.Response, Any]: 84 log = logging.getLogger("langfuse") 85 log.debug("received response: %s", res.text) 86 if res.status_code in (200, 201): 87 log.debug(success_message) 88 if return_json: 89 try: 90 return res.json() 91 except json.JSONDecodeError: 92 raise APIError(res.status_code, "Invalid JSON response received") 93 else: 94 return res 95 elif res.status_code == 207: 96 try: 97 payload = res.json() 98 errors = payload.get("errors", []) 99 if errors: 100 raise APIErrors( 101 [ 102 APIError( 103 error.get("status"), 104 error.get("message", "No message provided"), 105 error.get("error", "No error details provided"), 106 ) 107 for error in errors 108 ] 109 ) 110 else: 111 return res.json() if return_json else res 112 except json.JSONDecodeError: 113 raise APIError(res.status_code, "Invalid JSON response received") 114 115 try: 116 payload = res.json() 117 raise APIError(res.status_code, payload) 118 except (KeyError, ValueError): 119 raise APIError(res.status_code, res.text) 120 121 122class APIError(Exception): 123 def __init__(self, status: Union[int, str], message: str, details: Any = None): 124 self.message = message 125 self.status = status 126 self.details = details 127 128 def __str__(self): 129 msg = "{0} ({1}): {2}" 130 return msg.format(self.message, self.status, self.details) 131 132 133class APIErrors(Exception): 134 def __init__(self, errors: List[APIError]): 135 self.errors = errors 136 137 def __str__(self): 138 errors = ", ".join(str(error) for error in self.errors) 139 140 return f"[Langfuse] {errors}"
class
LangfuseClient:
14class LangfuseClient: 15 _public_key: str 16 _secret_key: str 17 _base_url: str 18 _version: str 19 _timeout: int 20 _session: httpx.Client 21 22 def __init__( 23 self, 24 public_key: str, 25 secret_key: str, 26 base_url: str, 27 version: str, 28 timeout: int, 29 session: httpx.Client, 30 ): 31 self._public_key = public_key 32 self._secret_key = secret_key 33 self._base_url = base_url 34 self._version = version 35 self._timeout = timeout 36 self._session = session 37 38 def generate_headers(self): 39 return { 40 "Authorization": "Basic " 41 + b64encode( 42 f"{self._public_key}:{self._secret_key}".encode("utf-8") 43 ).decode("ascii"), 44 "Content-Type": "application/json", 45 "x_langfuse_sdk_name": "python", 46 "x_langfuse_sdk_version": self._version, 47 "x_langfuse_public_key": self._public_key, 48 } 49 50 def batch_post(self, **kwargs) -> httpx.Response: 51 """Post the `kwargs` to the batch API endpoint for events""" 52 log = logging.getLogger("langfuse") 53 log.debug("uploading data: %s", kwargs) 54 55 res = self.post(**kwargs) 56 return self._process_response( 57 res, success_message="data uploaded successfully", return_json=False 58 ) 59 60 def post(self, **kwargs) -> httpx.Response: 61 """Post the `kwargs` to the API""" 62 log = logging.getLogger("langfuse") 63 url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion" 64 data = json.dumps(kwargs, cls=EventSerializer) 65 log.debug("making request: %s to %s", data, url) 66 headers = self.generate_headers() 67 res = self._session.post( 68 url, content=data, headers=headers, timeout=self._timeout 69 ) 70 71 if res.status_code == 200: 72 log.debug("data uploaded successfully") 73 74 return res 75 76 def _remove_trailing_slash(self, url: str) -> str: 77 """Removes the trailing slash from a URL""" 78 if url.endswith("/"): 79 return url[:-1] 80 return url 81 82 def _process_response( 83 self, res: httpx.Response, success_message: str, *, return_json: bool = True 84 ) -> Union[httpx.Response, Any]: 85 log = logging.getLogger("langfuse") 86 log.debug("received response: %s", res.text) 87 if res.status_code in (200, 201): 88 log.debug(success_message) 89 if return_json: 90 try: 91 return res.json() 92 except json.JSONDecodeError: 93 raise APIError(res.status_code, "Invalid JSON response received") 94 else: 95 return res 96 elif res.status_code == 207: 97 try: 98 payload = res.json() 99 errors = payload.get("errors", []) 100 if errors: 101 raise APIErrors( 102 [ 103 APIError( 104 error.get("status"), 105 error.get("message", "No message provided"), 106 error.get("error", "No error details provided"), 107 ) 108 for error in errors 109 ] 110 ) 111 else: 112 return res.json() if return_json else res 113 except json.JSONDecodeError: 114 raise APIError(res.status_code, "Invalid JSON response received") 115 116 try: 117 payload = res.json() 118 raise APIError(res.status_code, payload) 119 except (KeyError, ValueError): 120 raise APIError(res.status_code, res.text)
LangfuseClient( public_key: str, secret_key: str, base_url: str, version: str, timeout: int, session: httpx.Client)
22 def __init__( 23 self, 24 public_key: str, 25 secret_key: str, 26 base_url: str, 27 version: str, 28 timeout: int, 29 session: httpx.Client, 30 ): 31 self._public_key = public_key 32 self._secret_key = secret_key 33 self._base_url = base_url 34 self._version = version 35 self._timeout = timeout 36 self._session = session
def
generate_headers(self):
38 def generate_headers(self): 39 return { 40 "Authorization": "Basic " 41 + b64encode( 42 f"{self._public_key}:{self._secret_key}".encode("utf-8") 43 ).decode("ascii"), 44 "Content-Type": "application/json", 45 "x_langfuse_sdk_name": "python", 46 "x_langfuse_sdk_version": self._version, 47 "x_langfuse_public_key": self._public_key, 48 }
def
batch_post(self, **kwargs) -> httpx.Response:
50 def batch_post(self, **kwargs) -> httpx.Response: 51 """Post the `kwargs` to the batch API endpoint for events""" 52 log = logging.getLogger("langfuse") 53 log.debug("uploading data: %s", kwargs) 54 55 res = self.post(**kwargs) 56 return self._process_response( 57 res, success_message="data uploaded successfully", return_json=False 58 )
Post the kwargs
to the batch API endpoint for events
def
post(self, **kwargs) -> httpx.Response:
60 def post(self, **kwargs) -> httpx.Response: 61 """Post the `kwargs` to the API""" 62 log = logging.getLogger("langfuse") 63 url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion" 64 data = json.dumps(kwargs, cls=EventSerializer) 65 log.debug("making request: %s to %s", data, url) 66 headers = self.generate_headers() 67 res = self._session.post( 68 url, content=data, headers=headers, timeout=self._timeout 69 ) 70 71 if res.status_code == 200: 72 log.debug("data uploaded successfully") 73 74 return res
Post the kwargs
to the API
class
APIError(builtins.Exception):
123class APIError(Exception): 124 def __init__(self, status: Union[int, str], message: str, details: Any = None): 125 self.message = message 126 self.status = status 127 self.details = details 128 129 def __str__(self): 130 msg = "{0} ({1}): {2}" 131 return msg.format(self.message, self.status, self.details)
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
class
APIErrors(builtins.Exception):
134class APIErrors(Exception): 135 def __init__(self, errors: List[APIError]): 136 self.errors = errors 137 138 def __str__(self): 139 errors = ", ".join(str(error) for error in self.errors) 140 141 return f"[Langfuse] {errors}"
Common base class for all non-exit exceptions.
APIErrors(errors: List[APIError])
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args