langfuse.parse_error

  1import logging
  2from typing import Union
  3
  4# our own api errors
  5from langfuse.request import APIErrors, APIError
  6
  7# fern api errors
  8from langfuse.api.resources.commons.errors import (
  9    AccessDeniedError,
 10    Error,
 11    MethodNotAllowedError,
 12    NotFoundError,
 13    UnauthorizedError,
 14)
 15from langfuse.api.core import ApiError
 16from langfuse.api.resources.health.errors import ServiceUnavailableError
 17
 18
 19SUPPORT_URL = "https://langfuse.com/support"
 20API_DOCS_URL = "https://api.reference.langfuse.com"
 21RBAC_DOCS_URL = "https://langfuse.com/docs/rbac"
 22INSTALLATION_DOCS_URL = "https://langfuse.com/docs/sdk/typescript/guide"
 23RATE_LIMITS_URL = "https://langfuse.com/faq/all/api-limits"
 24PYPI_PACKAGE_URL = "https://pypi.org/project/langfuse/"
 25
 26# Error messages
 27updatePromptResponse = (
 28    f"Make sure to keep your SDK updated, refer to {PYPI_PACKAGE_URL} for details."
 29)
 30defaultServerErrorPrompt = f"This is an unusual occurrence and we are monitoring it closely. For help, please contact support: {SUPPORT_URL}."
 31defaultErrorResponse = f"Unexpected error occurred. Please check your request and contact support: {SUPPORT_URL}."
 32
 33# Error response map
 34errorResponseByCode = {
 35    500: f"Internal server error occurred. For help, please contact support: {SUPPORT_URL}",
 36    501: f"Not implemented. Please check your request and contact support for help: {SUPPORT_URL}.",
 37    502: f"Bad gateway. {defaultServerErrorPrompt}",
 38    503: f"Service unavailable. {defaultServerErrorPrompt}",
 39    504: f"Gateway timeout. {defaultServerErrorPrompt}",
 40    404: f"Internal error occurred. {defaultServerErrorPrompt}",
 41    400: f"Bad request. Please check your request for any missing or incorrect parameters. Refer to our API docs: {API_DOCS_URL} for details.",
 42    401: f"Unauthorized. Please check your public/private host settings. Refer to our installation and setup guide: {INSTALLATION_DOCS_URL} for details on SDK configuration.",
 43    403: f"Forbidden. Please check your access control settings. Refer to our RBAC docs: {RBAC_DOCS_URL} for details.",
 44    429: f"Rate limit exceeded. For more information on rate limits please see: {RATE_LIMITS_URL}",
 45}
 46
 47
 48def generate_error_message_fern(error: Error) -> str:
 49    if isinstance(error, AccessDeniedError):
 50        return errorResponseByCode.get(403, defaultErrorResponse)
 51    elif isinstance(error, MethodNotAllowedError):
 52        return errorResponseByCode.get(405, defaultErrorResponse)
 53    elif isinstance(error, NotFoundError):
 54        return errorResponseByCode.get(404, defaultErrorResponse)
 55    elif isinstance(error, UnauthorizedError):
 56        return errorResponseByCode.get(401, defaultErrorResponse)
 57    elif isinstance(error, ServiceUnavailableError):
 58        return errorResponseByCode.get(503, defaultErrorResponse)
 59    elif isinstance(error, ApiError):
 60        status_code = (
 61            int(error.status_code)
 62            if isinstance(error.status_code, str)
 63            else error.status_code
 64        )
 65        return errorResponseByCode.get(status_code, defaultErrorResponse)
 66    else:
 67        return defaultErrorResponse
 68
 69
 70def handle_fern_exception(exception: Error) -> None:
 71    log = logging.getLogger("langfuse")
 72    log.debug(exception)
 73    error_message = generate_error_message_fern(exception)
 74    log.error(error_message)
 75
 76
 77def generate_error_message(exception: Union[APIError, APIErrors, Exception]) -> str:
 78    if isinstance(exception, APIError):
 79        status_code = (
 80            int(exception.status)
 81            if isinstance(exception.status, str)
 82            else exception.status
 83        )
 84        return f"API error occurred: {errorResponseByCode.get(status_code, defaultErrorResponse)}"
 85    elif isinstance(exception, APIErrors):
 86        error_messages = [
 87            errorResponseByCode.get(
 88                int(error.status) if isinstance(error.status, str) else error.status,
 89                defaultErrorResponse,
 90            )
 91            for error in exception.errors
 92        ]
 93        return "API errors occurred: " + "\n".join(error_messages)
 94    else:
 95        return defaultErrorResponse
 96
 97
 98def handle_exception(exception: Union[APIError, APIErrors, Exception]) -> None:
 99    log = logging.getLogger("langfuse")
100    log.debug(exception)
101    error_message = generate_error_message(exception)
102    log.error(error_message)
SUPPORT_URL = 'https://langfuse.com/support'
API_DOCS_URL = 'https://api.reference.langfuse.com'
RBAC_DOCS_URL = 'https://langfuse.com/docs/rbac'
INSTALLATION_DOCS_URL = 'https://langfuse.com/docs/sdk/typescript/guide'
RATE_LIMITS_URL = 'https://langfuse.com/faq/all/api-limits'
PYPI_PACKAGE_URL = 'https://pypi.org/project/langfuse/'
updatePromptResponse = 'Make sure to keep your SDK updated, refer to https://pypi.org/project/langfuse/ for details.'
defaultServerErrorPrompt = 'This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.'
defaultErrorResponse = 'Unexpected error occurred. Please check your request and contact support: https://langfuse.com/support.'
errorResponseByCode = {500: 'Internal server error occurred. For help, please contact support: https://langfuse.com/support', 501: 'Not implemented. Please check your request and contact support for help: https://langfuse.com/support.', 502: 'Bad gateway. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 503: 'Service unavailable. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 504: 'Gateway timeout. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 404: 'Internal error occurred. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 400: 'Bad request. Please check your request for any missing or incorrect parameters. Refer to our API docs: https://api.reference.langfuse.com for details.', 401: 'Unauthorized. Please check your public/private host settings. Refer to our installation and setup guide: https://langfuse.com/docs/sdk/typescript/guide for details on SDK configuration.', 403: 'Forbidden. Please check your access control settings. Refer to our RBAC docs: https://langfuse.com/docs/rbac for details.', 429: 'Rate limit exceeded. For more information on rate limits please see: https://langfuse.com/faq/all/api-limits'}
def generate_error_message_fern(error: langfuse.api.Error) -> str:
49def generate_error_message_fern(error: Error) -> str:
50    if isinstance(error, AccessDeniedError):
51        return errorResponseByCode.get(403, defaultErrorResponse)
52    elif isinstance(error, MethodNotAllowedError):
53        return errorResponseByCode.get(405, defaultErrorResponse)
54    elif isinstance(error, NotFoundError):
55        return errorResponseByCode.get(404, defaultErrorResponse)
56    elif isinstance(error, UnauthorizedError):
57        return errorResponseByCode.get(401, defaultErrorResponse)
58    elif isinstance(error, ServiceUnavailableError):
59        return errorResponseByCode.get(503, defaultErrorResponse)
60    elif isinstance(error, ApiError):
61        status_code = (
62            int(error.status_code)
63            if isinstance(error.status_code, str)
64            else error.status_code
65        )
66        return errorResponseByCode.get(status_code, defaultErrorResponse)
67    else:
68        return defaultErrorResponse
def handle_fern_exception(exception: langfuse.api.Error) -> None:
71def handle_fern_exception(exception: Error) -> None:
72    log = logging.getLogger("langfuse")
73    log.debug(exception)
74    error_message = generate_error_message_fern(exception)
75    log.error(error_message)
def generate_error_message( exception: Union[langfuse.request.APIError, langfuse.request.APIErrors, Exception]) -> str:
78def generate_error_message(exception: Union[APIError, APIErrors, Exception]) -> str:
79    if isinstance(exception, APIError):
80        status_code = (
81            int(exception.status)
82            if isinstance(exception.status, str)
83            else exception.status
84        )
85        return f"API error occurred: {errorResponseByCode.get(status_code, defaultErrorResponse)}"
86    elif isinstance(exception, APIErrors):
87        error_messages = [
88            errorResponseByCode.get(
89                int(error.status) if isinstance(error.status, str) else error.status,
90                defaultErrorResponse,
91            )
92            for error in exception.errors
93        ]
94        return "API errors occurred: " + "\n".join(error_messages)
95    else:
96        return defaultErrorResponse
def handle_exception( exception: Union[langfuse.request.APIError, langfuse.request.APIErrors, Exception]) -> None:
 99def handle_exception(exception: Union[APIError, APIErrors, Exception]) -> None:
100    log = logging.getLogger("langfuse")
101    log.debug(exception)
102    error_message = generate_error_message(exception)
103    log.error(error_message)