langfuse.request

@private

  1"""@private"""
  2
  3import json
  4import logging
  5from base64 import b64encode
  6from typing import Any, List, Union
  7
  8import httpx
  9
 10from langfuse.serializer import EventSerializer
 11
 12
 13class LangfuseClient:
 14    _public_key: str
 15    _secret_key: str
 16    _base_url: str
 17    _version: str
 18    _timeout: int
 19    _session: httpx.Client
 20
 21    def __init__(
 22        self,
 23        public_key: str,
 24        secret_key: str,
 25        base_url: str,
 26        version: str,
 27        timeout: int,
 28        session: httpx.Client,
 29    ):
 30        self._public_key = public_key
 31        self._secret_key = secret_key
 32        self._base_url = base_url
 33        self._version = version
 34        self._timeout = timeout
 35        self._session = session
 36
 37    def generate_headers(self):
 38        return {
 39            "Authorization": "Basic "
 40            + b64encode(
 41                f"{self._public_key}:{self._secret_key}".encode("utf-8")
 42            ).decode("ascii"),
 43            "Content-Type": "application/json",
 44            "x_langfuse_sdk_name": "python",
 45            "x_langfuse_sdk_version": self._version,
 46            "x_langfuse_public_key": self._public_key,
 47        }
 48
 49    def batch_post(self, **kwargs) -> httpx.Response:
 50        """Post the `kwargs` to the batch API endpoint for events"""
 51        log = logging.getLogger("langfuse")
 52        log.debug("uploading data: %s", kwargs)
 53
 54        res = self.post(**kwargs)
 55        return self._process_response(
 56            res, success_message="data uploaded successfully", return_json=False
 57        )
 58
 59    def post(self, **kwargs) -> httpx.Response:
 60        """Post the `kwargs` to the API"""
 61        log = logging.getLogger("langfuse")
 62        url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion"
 63        data = json.dumps(kwargs, cls=EventSerializer)
 64        log.debug("making request: %s to %s", data, url)
 65        headers = self.generate_headers()
 66        res = self._session.post(
 67            url, content=data, headers=headers, timeout=self._timeout
 68        )
 69
 70        if res.status_code == 200:
 71            log.debug("data uploaded successfully")
 72
 73        return res
 74
 75    def _remove_trailing_slash(self, url: str) -> str:
 76        """Removes the trailing slash from a URL"""
 77        if url.endswith("/"):
 78            return url[:-1]
 79        return url
 80
 81    def _process_response(
 82        self, res: httpx.Response, success_message: str, *, return_json: bool = True
 83    ) -> Union[httpx.Response, Any]:
 84        log = logging.getLogger("langfuse")
 85        log.debug("received response: %s", res.text)
 86        if res.status_code in (200, 201):
 87            log.debug(success_message)
 88            if return_json:
 89                try:
 90                    return res.json()
 91                except json.JSONDecodeError:
 92                    raise APIError(res.status_code, "Invalid JSON response received")
 93            else:
 94                return res
 95        elif res.status_code == 207:
 96            try:
 97                payload = res.json()
 98                errors = payload.get("errors", [])
 99                if errors:
100                    raise APIErrors(
101                        [
102                            APIError(
103                                error.get("status"),
104                                error.get("message", "No message provided"),
105                                error.get("error", "No error details provided"),
106                            )
107                            for error in errors
108                        ]
109                    )
110                else:
111                    return res.json() if return_json else res
112            except json.JSONDecodeError:
113                raise APIError(res.status_code, "Invalid JSON response received")
114
115        try:
116            payload = res.json()
117            raise APIError(res.status_code, payload)
118        except (KeyError, ValueError):
119            raise APIError(res.status_code, res.text)
120
121
122class APIError(Exception):
123    def __init__(self, status: Union[int, str], message: str, details: Any = None):
124        self.message = message
125        self.status = status
126        self.details = details
127
128    def __str__(self):
129        msg = "{0} ({1}): {2}"
130        return msg.format(self.message, self.status, self.details)
131
132
133class APIErrors(Exception):
134    def __init__(self, errors: List[APIError]):
135        self.errors = errors
136
137    def __str__(self):
138        errors = ", ".join(str(error) for error in self.errors)
139
140        return f"[Langfuse] {errors}"
class LangfuseClient:
 14class LangfuseClient:
 15    _public_key: str
 16    _secret_key: str
 17    _base_url: str
 18    _version: str
 19    _timeout: int
 20    _session: httpx.Client
 21
 22    def __init__(
 23        self,
 24        public_key: str,
 25        secret_key: str,
 26        base_url: str,
 27        version: str,
 28        timeout: int,
 29        session: httpx.Client,
 30    ):
 31        self._public_key = public_key
 32        self._secret_key = secret_key
 33        self._base_url = base_url
 34        self._version = version
 35        self._timeout = timeout
 36        self._session = session
 37
 38    def generate_headers(self):
 39        return {
 40            "Authorization": "Basic "
 41            + b64encode(
 42                f"{self._public_key}:{self._secret_key}".encode("utf-8")
 43            ).decode("ascii"),
 44            "Content-Type": "application/json",
 45            "x_langfuse_sdk_name": "python",
 46            "x_langfuse_sdk_version": self._version,
 47            "x_langfuse_public_key": self._public_key,
 48        }
 49
 50    def batch_post(self, **kwargs) -> httpx.Response:
 51        """Post the `kwargs` to the batch API endpoint for events"""
 52        log = logging.getLogger("langfuse")
 53        log.debug("uploading data: %s", kwargs)
 54
 55        res = self.post(**kwargs)
 56        return self._process_response(
 57            res, success_message="data uploaded successfully", return_json=False
 58        )
 59
 60    def post(self, **kwargs) -> httpx.Response:
 61        """Post the `kwargs` to the API"""
 62        log = logging.getLogger("langfuse")
 63        url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion"
 64        data = json.dumps(kwargs, cls=EventSerializer)
 65        log.debug("making request: %s to %s", data, url)
 66        headers = self.generate_headers()
 67        res = self._session.post(
 68            url, content=data, headers=headers, timeout=self._timeout
 69        )
 70
 71        if res.status_code == 200:
 72            log.debug("data uploaded successfully")
 73
 74        return res
 75
 76    def _remove_trailing_slash(self, url: str) -> str:
 77        """Removes the trailing slash from a URL"""
 78        if url.endswith("/"):
 79            return url[:-1]
 80        return url
 81
 82    def _process_response(
 83        self, res: httpx.Response, success_message: str, *, return_json: bool = True
 84    ) -> Union[httpx.Response, Any]:
 85        log = logging.getLogger("langfuse")
 86        log.debug("received response: %s", res.text)
 87        if res.status_code in (200, 201):
 88            log.debug(success_message)
 89            if return_json:
 90                try:
 91                    return res.json()
 92                except json.JSONDecodeError:
 93                    raise APIError(res.status_code, "Invalid JSON response received")
 94            else:
 95                return res
 96        elif res.status_code == 207:
 97            try:
 98                payload = res.json()
 99                errors = payload.get("errors", [])
100                if errors:
101                    raise APIErrors(
102                        [
103                            APIError(
104                                error.get("status"),
105                                error.get("message", "No message provided"),
106                                error.get("error", "No error details provided"),
107                            )
108                            for error in errors
109                        ]
110                    )
111                else:
112                    return res.json() if return_json else res
113            except json.JSONDecodeError:
114                raise APIError(res.status_code, "Invalid JSON response received")
115
116        try:
117            payload = res.json()
118            raise APIError(res.status_code, payload)
119        except (KeyError, ValueError):
120            raise APIError(res.status_code, res.text)
LangfuseClient( public_key: str, secret_key: str, base_url: str, version: str, timeout: int, session: httpx.Client)
22    def __init__(
23        self,
24        public_key: str,
25        secret_key: str,
26        base_url: str,
27        version: str,
28        timeout: int,
29        session: httpx.Client,
30    ):
31        self._public_key = public_key
32        self._secret_key = secret_key
33        self._base_url = base_url
34        self._version = version
35        self._timeout = timeout
36        self._session = session
def generate_headers(self):
38    def generate_headers(self):
39        return {
40            "Authorization": "Basic "
41            + b64encode(
42                f"{self._public_key}:{self._secret_key}".encode("utf-8")
43            ).decode("ascii"),
44            "Content-Type": "application/json",
45            "x_langfuse_sdk_name": "python",
46            "x_langfuse_sdk_version": self._version,
47            "x_langfuse_public_key": self._public_key,
48        }
def batch_post(self, **kwargs) -> httpx.Response:
50    def batch_post(self, **kwargs) -> httpx.Response:
51        """Post the `kwargs` to the batch API endpoint for events"""
52        log = logging.getLogger("langfuse")
53        log.debug("uploading data: %s", kwargs)
54
55        res = self.post(**kwargs)
56        return self._process_response(
57            res, success_message="data uploaded successfully", return_json=False
58        )

Post the kwargs to the batch API endpoint for events

def post(self, **kwargs) -> httpx.Response:
60    def post(self, **kwargs) -> httpx.Response:
61        """Post the `kwargs` to the API"""
62        log = logging.getLogger("langfuse")
63        url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion"
64        data = json.dumps(kwargs, cls=EventSerializer)
65        log.debug("making request: %s to %s", data, url)
66        headers = self.generate_headers()
67        res = self._session.post(
68            url, content=data, headers=headers, timeout=self._timeout
69        )
70
71        if res.status_code == 200:
72            log.debug("data uploaded successfully")
73
74        return res

Post the kwargs to the API

class APIError(builtins.Exception):
123class APIError(Exception):
124    def __init__(self, status: Union[int, str], message: str, details: Any = None):
125        self.message = message
126        self.status = status
127        self.details = details
128
129    def __str__(self):
130        msg = "{0} ({1}): {2}"
131        return msg.format(self.message, self.status, self.details)

Common base class for all non-exit exceptions.

APIError(status: Union[int, str], message: str, details: Any = None)
124    def __init__(self, status: Union[int, str], message: str, details: Any = None):
125        self.message = message
126        self.status = status
127        self.details = details
message
status
details
Inherited Members
builtins.BaseException
with_traceback
add_note
args
class APIErrors(builtins.Exception):
134class APIErrors(Exception):
135    def __init__(self, errors: List[APIError]):
136        self.errors = errors
137
138    def __str__(self):
139        errors = ", ".join(str(error) for error in self.errors)
140
141        return f"[Langfuse] {errors}"

Common base class for all non-exit exceptions.

APIErrors(errors: List[APIError])
135    def __init__(self, errors: List[APIError]):
136        self.errors = errors
errors
Inherited Members
builtins.BaseException
with_traceback
add_note
args