langfuse.parse_error
1import logging 2from typing import Union 3 4# our own api errors 5from langfuse.request import APIErrors, APIError 6 7# fern api errors 8from langfuse.api.resources.commons.errors import ( 9 AccessDeniedError, 10 Error, 11 MethodNotAllowedError, 12 NotFoundError, 13 UnauthorizedError, 14) 15from langfuse.api.core import ApiError 16from langfuse.api.resources.health.errors import ServiceUnavailableError 17 18 19SUPPORT_URL = "https://langfuse.com/support" 20API_DOCS_URL = "https://api.reference.langfuse.com" 21RBAC_DOCS_URL = "https://langfuse.com/docs/rbac" 22INSTALLATION_DOCS_URL = "https://langfuse.com/docs/sdk/typescript/guide" 23RATE_LIMITS_URL = "https://langfuse.com/faq/all/api-limits" 24PYPI_PACKAGE_URL = "https://pypi.org/project/langfuse/" 25 26# Error messages 27updatePromptResponse = ( 28 f"Make sure to keep your SDK updated, refer to {PYPI_PACKAGE_URL} for details." 29) 30defaultServerErrorPrompt = f"This is an unusual occurrence and we are monitoring it closely. For help, please contact support: {SUPPORT_URL}." 31defaultErrorResponse = f"Unexpected error occurred. Please check your request and contact support: {SUPPORT_URL}." 32 33# Error response map 34errorResponseByCode = { 35 500: f"Internal server error occurred. For help, please contact support: {SUPPORT_URL}", 36 501: f"Not implemented. Please check your request and contact support for help: {SUPPORT_URL}.", 37 502: f"Bad gateway. {defaultServerErrorPrompt}", 38 503: f"Service unavailable. {defaultServerErrorPrompt}", 39 504: f"Gateway timeout. {defaultServerErrorPrompt}", 40 404: f"Internal error occurred. {defaultServerErrorPrompt}", 41 400: f"Bad request. Please check your request for any missing or incorrect parameters. Refer to our API docs: {API_DOCS_URL} for details.", 42 401: f"Unauthorized. Please check your public/private host settings. Refer to our installation and setup guide: {INSTALLATION_DOCS_URL} for details on SDK configuration.", 43 403: f"Forbidden. Please check your access control settings. Refer to our RBAC docs: {RBAC_DOCS_URL} for details.", 44 429: f"Rate limit exceeded. For more information on rate limits please see: {RATE_LIMITS_URL}", 45} 46 47 48def generate_error_message_fern(error: Error) -> str: 49 if isinstance(error, AccessDeniedError): 50 return errorResponseByCode.get(403, defaultErrorResponse) 51 elif isinstance(error, MethodNotAllowedError): 52 return errorResponseByCode.get(405, defaultErrorResponse) 53 elif isinstance(error, NotFoundError): 54 return errorResponseByCode.get(404, defaultErrorResponse) 55 elif isinstance(error, UnauthorizedError): 56 return errorResponseByCode.get(401, defaultErrorResponse) 57 elif isinstance(error, ServiceUnavailableError): 58 return errorResponseByCode.get(503, defaultErrorResponse) 59 elif isinstance(error, ApiError): 60 status_code = ( 61 int(error.status_code) 62 if isinstance(error.status_code, str) 63 else error.status_code 64 ) 65 return errorResponseByCode.get(status_code, defaultErrorResponse) 66 else: 67 return defaultErrorResponse 68 69 70def handle_fern_exception(exception: Error) -> None: 71 log = logging.getLogger("langfuse") 72 log.debug(exception) 73 error_message = generate_error_message_fern(exception) 74 log.error(error_message) 75 76 77def generate_error_message(exception: Union[APIError, APIErrors, Exception]) -> str: 78 if isinstance(exception, APIError): 79 status_code = ( 80 int(exception.status) 81 if isinstance(exception.status, str) 82 else exception.status 83 ) 84 return f"API error occurred: {errorResponseByCode.get(status_code, defaultErrorResponse)}" 85 elif isinstance(exception, APIErrors): 86 error_messages = [ 87 errorResponseByCode.get( 88 int(error.status) if isinstance(error.status, str) else error.status, 89 defaultErrorResponse, 90 ) 91 for error in exception.errors 92 ] 93 return "API errors occurred: " + "\n".join(error_messages) 94 else: 95 return defaultErrorResponse 96 97 98def handle_exception(exception: Union[APIError, APIErrors, Exception]) -> None: 99 log = logging.getLogger("langfuse") 100 log.debug(exception) 101 error_message = generate_error_message(exception) 102 log.error(error_message)
SUPPORT_URL =
'https://langfuse.com/support'
API_DOCS_URL =
'https://api.reference.langfuse.com'
RBAC_DOCS_URL =
'https://langfuse.com/docs/rbac'
INSTALLATION_DOCS_URL =
'https://langfuse.com/docs/sdk/typescript/guide'
RATE_LIMITS_URL =
'https://langfuse.com/faq/all/api-limits'
PYPI_PACKAGE_URL =
'https://pypi.org/project/langfuse/'
updatePromptResponse =
'Make sure to keep your SDK updated, refer to https://pypi.org/project/langfuse/ for details.'
defaultServerErrorPrompt =
'This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.'
defaultErrorResponse =
'Unexpected error occurred. Please check your request and contact support: https://langfuse.com/support.'
errorResponseByCode =
{500: 'Internal server error occurred. For help, please contact support: https://langfuse.com/support', 501: 'Not implemented. Please check your request and contact support for help: https://langfuse.com/support.', 502: 'Bad gateway. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 503: 'Service unavailable. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 504: 'Gateway timeout. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 404: 'Internal error occurred. This is an unusual occurrence and we are monitoring it closely. For help, please contact support: https://langfuse.com/support.', 400: 'Bad request. Please check your request for any missing or incorrect parameters. Refer to our API docs: https://api.reference.langfuse.com for details.', 401: 'Unauthorized. Please check your public/private host settings. Refer to our installation and setup guide: https://langfuse.com/docs/sdk/typescript/guide for details on SDK configuration.', 403: 'Forbidden. Please check your access control settings. Refer to our RBAC docs: https://langfuse.com/docs/rbac for details.', 429: 'Rate limit exceeded. For more information on rate limits please see: https://langfuse.com/faq/all/api-limits'}
49def generate_error_message_fern(error: Error) -> str: 50 if isinstance(error, AccessDeniedError): 51 return errorResponseByCode.get(403, defaultErrorResponse) 52 elif isinstance(error, MethodNotAllowedError): 53 return errorResponseByCode.get(405, defaultErrorResponse) 54 elif isinstance(error, NotFoundError): 55 return errorResponseByCode.get(404, defaultErrorResponse) 56 elif isinstance(error, UnauthorizedError): 57 return errorResponseByCode.get(401, defaultErrorResponse) 58 elif isinstance(error, ServiceUnavailableError): 59 return errorResponseByCode.get(503, defaultErrorResponse) 60 elif isinstance(error, ApiError): 61 status_code = ( 62 int(error.status_code) 63 if isinstance(error.status_code, str) 64 else error.status_code 65 ) 66 return errorResponseByCode.get(status_code, defaultErrorResponse) 67 else: 68 return defaultErrorResponse
def
generate_error_message( exception: Union[langfuse.request.APIError, langfuse.request.APIErrors, Exception]) -> str:
78def generate_error_message(exception: Union[APIError, APIErrors, Exception]) -> str: 79 if isinstance(exception, APIError): 80 status_code = ( 81 int(exception.status) 82 if isinstance(exception.status, str) 83 else exception.status 84 ) 85 return f"API error occurred: {errorResponseByCode.get(status_code, defaultErrorResponse)}" 86 elif isinstance(exception, APIErrors): 87 error_messages = [ 88 errorResponseByCode.get( 89 int(error.status) if isinstance(error.status, str) else error.status, 90 defaultErrorResponse, 91 ) 92 for error in exception.errors 93 ] 94 return "API errors occurred: " + "\n".join(error_messages) 95 else: 96 return defaultErrorResponse
def
handle_exception( exception: Union[langfuse.request.APIError, langfuse.request.APIErrors, Exception]) -> None: