langfuse.utils
@private
1"""@private""" 2 3import logging 4import typing 5from datetime import datetime, timezone 6 7try: 8 import pydantic.v1 as pydantic # type: ignore 9except ImportError: 10 import pydantic # type: ignore 11 12from langfuse.model import ModelUsage, PromptClient 13 14log = logging.getLogger("langfuse") 15 16 17def _get_timestamp(): 18 return datetime.now(timezone.utc) 19 20 21def _create_prompt_context( 22 prompt: typing.Optional[PromptClient] = None, 23): 24 if prompt is not None and not prompt.is_fallback: 25 return {"prompt_version": prompt.version, "prompt_name": prompt.name} 26 27 return {"prompt_version": None, "prompt_name": None} 28 29 30T = typing.TypeVar("T") 31 32 33def extract_by_priority( 34 usage: dict, keys: typing.List[str], target_type: typing.Type[T] 35) -> typing.Optional[T]: 36 """Extracts the first key that exists in usage and converts its value to target_type""" 37 for key in keys: 38 if key in usage: 39 value = usage[key] 40 try: 41 if value is None: 42 return None 43 return target_type(value) 44 except Exception: 45 continue 46 return None 47 48 49def _convert_usage_input(usage: typing.Union[pydantic.BaseModel, ModelUsage]): 50 """Converts any usage input to a usage object""" 51 if isinstance(usage, pydantic.BaseModel): 52 usage = usage.dict() 53 54 # sometimes we do not match the pydantic usage object 55 # in these cases, we convert to dict manually 56 if hasattr(usage, "__dict__"): 57 usage = usage.__dict__ 58 59 # validate that usage object has input, output, total, usage 60 is_langfuse_usage = any(k in usage for k in ("input", "output", "total", "unit")) 61 62 if is_langfuse_usage: 63 return usage 64 65 is_openai_usage = any( 66 k in usage 67 for k in ( 68 "promptTokens", 69 "prompt_tokens", 70 "completionTokens", 71 "completion_tokens", 72 "totalTokens", 73 "total_tokens", 74 "inputCost", 75 "input_cost", 76 "outputCost", 77 "output_cost", 78 "totalCost", 79 "total_cost", 80 ) 81 ) 82 83 if is_openai_usage: 84 # convert to langfuse usage 85 usage = { 86 "input": extract_by_priority(usage, ["promptTokens", "prompt_tokens"], int), 87 "output": extract_by_priority( 88 usage, 89 ["completionTokens", "completion_tokens"], 90 int, 91 ), 92 "total": extract_by_priority(usage, ["totalTokens", "total_tokens"], int), 93 "unit": "TOKENS", 94 "inputCost": extract_by_priority(usage, ["inputCost", "input_cost"], float), 95 "outputCost": extract_by_priority( 96 usage, ["outputCost", "output_cost"], float 97 ), 98 "totalCost": extract_by_priority(usage, ["totalCost", "total_cost"], float), 99 } 100 return usage 101 102 if not is_langfuse_usage and not is_openai_usage: 103 raise ValueError( 104 "Usage object must have either {input, output, total, unit} or {promptTokens, completionTokens, totalTokens}" 105 )
log =
<Logger langfuse (WARNING)>
def
extract_by_priority(usage: dict, keys: List[str], target_type: Type[~T]) -> Optional[~T]:
34def extract_by_priority( 35 usage: dict, keys: typing.List[str], target_type: typing.Type[T] 36) -> typing.Optional[T]: 37 """Extracts the first key that exists in usage and converts its value to target_type""" 38 for key in keys: 39 if key in usage: 40 value = usage[key] 41 try: 42 if value is None: 43 return None 44 return target_type(value) 45 except Exception: 46 continue 47 return None
Extracts the first key that exists in usage and converts its value to target_type