langfuse.openai
If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
- import openai
+ from langfuse.openai import openai
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the observe()
decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import. 2 3```diff 4- import openai 5+ from langfuse.openai import openai 6``` 7 8Langfuse automatically tracks: 9 10- All prompts/completions with support for streaming, async and functions 11- Latencies 12- API Errors 13- Model usage (tokens) and cost (USD) 14 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK. 16 17See docs for more details: https://langfuse.com/docs/integrations/openai 18""" 19 20import logging 21import types 22from collections import defaultdict 23from dataclasses import dataclass 24from inspect import isclass 25from typing import Optional 26 27from openai._types import NotGiven 28from packaging.version import Version 29from pydantic import BaseModel 30from wrapt import wrap_function_wrapper 31 32from langfuse import Langfuse 33from langfuse.client import StatefulGenerationClient 34from langfuse.decorators import langfuse_context 35from langfuse.media import LangfuseMedia 36from langfuse.utils import _get_timestamp 37from langfuse.utils.langfuse_singleton import LangfuseSingleton 38 39try: 40 import openai 41except ImportError: 42 raise ModuleNotFoundError( 43 "Please install OpenAI to use this feature: 'pip install openai'" 44 ) 45 46try: 47 from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401 48except ImportError: 49 AsyncAzureOpenAI = None 50 AsyncOpenAI = None 51 AzureOpenAI = None 52 OpenAI = None 53 54log = logging.getLogger("langfuse") 55 56 57@dataclass 58class OpenAiDefinition: 59 module: str 60 object: str 61 method: str 62 type: str 63 sync: bool 64 min_version: Optional[str] = None 65 66 67OPENAI_METHODS_V0 = [ 68 OpenAiDefinition( 69 module="openai", 70 object="ChatCompletion", 71 method="create", 72 type="chat", 73 sync=True, 74 ), 75 OpenAiDefinition( 76 module="openai", 77 object="Completion", 78 method="create", 79 type="completion", 80 sync=True, 81 ), 82] 83 84 85OPENAI_METHODS_V1 = [ 86 OpenAiDefinition( 87 module="openai.resources.chat.completions", 88 object="Completions", 89 method="create", 90 type="chat", 91 sync=True, 92 ), 93 OpenAiDefinition( 94 module="openai.resources.completions", 95 object="Completions", 96 method="create", 97 type="completion", 98 sync=True, 99 ), 100 OpenAiDefinition( 101 module="openai.resources.chat.completions", 102 object="AsyncCompletions", 103 method="create", 104 type="chat", 105 sync=False, 106 ), 107 OpenAiDefinition( 108 module="openai.resources.completions", 109 object="AsyncCompletions", 110 method="create", 111 type="completion", 112 sync=False, 113 ), 114 OpenAiDefinition( 115 module="openai.resources.beta.chat.completions", 116 object="Completions", 117 method="parse", 118 type="chat", 119 sync=True, 120 min_version="1.50.0", 121 ), 122 OpenAiDefinition( 123 module="openai.resources.beta.chat.completions", 124 object="AsyncCompletions", 125 method="parse", 126 type="chat", 127 sync=False, 128 min_version="1.50.0", 129 ), 130 OpenAiDefinition( 131 module="openai.resources.responses", 132 object="Responses", 133 method="create", 134 type="chat", 135 sync=True, 136 min_version="1.66.0", 137 ), 138 OpenAiDefinition( 139 module="openai.resources.responses", 140 object="AsyncResponses", 141 method="create", 142 type="chat", 143 sync=False, 144 min_version="1.66.0", 145 ), 146] 147 148 149class OpenAiArgsExtractor: 150 def __init__( 151 self, 152 name=None, 153 metadata=None, 154 trace_id=None, 155 session_id=None, 156 user_id=None, 157 tags=None, 158 parent_observation_id=None, 159 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 160 **kwargs, 161 ): 162 self.args = {} 163 self.args["name"] = name 164 self.args["metadata"] = ( 165 metadata 166 if "response_format" not in kwargs 167 else { 168 **(metadata or {}), 169 "response_format": kwargs["response_format"].model_json_schema() 170 if isclass(kwargs["response_format"]) 171 and issubclass(kwargs["response_format"], BaseModel) 172 else kwargs["response_format"], 173 } 174 ) 175 self.args["trace_id"] = trace_id 176 self.args["session_id"] = session_id 177 self.args["user_id"] = user_id 178 self.args["tags"] = tags 179 self.args["parent_observation_id"] = parent_observation_id 180 self.args["langfuse_prompt"] = langfuse_prompt 181 self.kwargs = kwargs 182 183 def get_langfuse_args(self): 184 return {**self.args, **self.kwargs} 185 186 def get_openai_args(self): 187 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 188 # https://platform.openai.com/docs/guides/distillation 189 if self.kwargs.get("store", False): 190 self.kwargs["metadata"] = ( 191 {} if self.args.get("metadata", None) is None else self.args["metadata"] 192 ) 193 194 # OpenAI does not support non-string type values in metadata when using 195 # model distillation feature 196 self.kwargs["metadata"].pop("response_format", None) 197 198 return self.kwargs 199 200 201def _langfuse_wrapper(func): 202 def _with_langfuse(open_ai_definitions, initialize): 203 def wrapper(wrapped, instance, args, kwargs): 204 return func(open_ai_definitions, initialize, wrapped, args, kwargs) 205 206 return wrapper 207 208 return _with_langfuse 209 210 211def _extract_chat_prompt(kwargs: any): 212 """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions""" 213 prompt = {} 214 215 if kwargs.get("functions") is not None: 216 prompt.update({"functions": kwargs["functions"]}) 217 218 if kwargs.get("function_call") is not None: 219 prompt.update({"function_call": kwargs["function_call"]}) 220 221 if kwargs.get("tools") is not None: 222 prompt.update({"tools": kwargs["tools"]}) 223 224 if prompt: 225 # uf user provided functions, we need to send these together with messages to langfuse 226 prompt.update( 227 { 228 "messages": [ 229 _process_message(message) for message in kwargs.get("messages", []) 230 ], 231 } 232 ) 233 return prompt 234 else: 235 # vanilla case, only send messages in openai format to langfuse 236 return [_process_message(message) for message in kwargs.get("messages", [])] 237 238 239def _process_message(message): 240 if not isinstance(message, dict): 241 return message 242 243 processed_message = {**message} 244 245 content = processed_message.get("content", None) 246 if not isinstance(content, list): 247 return processed_message 248 249 processed_content = [] 250 251 for content_part in content: 252 if content_part.get("type") == "input_audio": 253 audio_base64 = content_part.get("input_audio", {}).get("data", None) 254 format = content_part.get("input_audio", {}).get("format", "wav") 255 256 if audio_base64 is not None: 257 base64_data_uri = f"data:audio/{format};base64,{audio_base64}" 258 259 processed_content.append( 260 { 261 "type": "input_audio", 262 "input_audio": { 263 "data": LangfuseMedia(base64_data_uri=base64_data_uri), 264 "format": format, 265 }, 266 } 267 ) 268 else: 269 processed_content.append(content_part) 270 271 processed_message["content"] = processed_content 272 273 return processed_message 274 275 276def _extract_chat_response(kwargs: any): 277 """Extracts the llm output from the response.""" 278 response = { 279 "role": kwargs.get("role", None), 280 } 281 282 audio = None 283 284 if kwargs.get("function_call") is not None: 285 response.update({"function_call": kwargs["function_call"]}) 286 287 if kwargs.get("tool_calls") is not None: 288 response.update({"tool_calls": kwargs["tool_calls"]}) 289 290 if kwargs.get("audio") is not None: 291 audio = kwargs["audio"].__dict__ 292 293 if "data" in audio and audio["data"] is not None: 294 base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}" 295 audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri) 296 297 response.update( 298 { 299 "content": kwargs.get("content", None), 300 } 301 ) 302 303 if audio is not None: 304 response.update({"audio": audio}) 305 306 return response 307 308 309def _get_langfuse_data_from_kwargs( 310 resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs 311): 312 name = kwargs.get("name", "OpenAI-generation") 313 314 if name is None: 315 name = "OpenAI-generation" 316 317 if name is not None and not isinstance(name, str): 318 raise TypeError("name must be a string") 319 320 decorator_context_observation_id = langfuse_context.get_current_observation_id() 321 decorator_context_trace_id = langfuse_context.get_current_trace_id() 322 323 trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id 324 if trace_id is not None and not isinstance(trace_id, str): 325 raise TypeError("trace_id must be a string") 326 327 session_id = kwargs.get("session_id", None) 328 if session_id is not None and not isinstance(session_id, str): 329 raise TypeError("session_id must be a string") 330 331 user_id = kwargs.get("user_id", None) 332 if user_id is not None and not isinstance(user_id, str): 333 raise TypeError("user_id must be a string") 334 335 tags = kwargs.get("tags", None) 336 if tags is not None and ( 337 not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags) 338 ): 339 raise TypeError("tags must be a list of strings") 340 341 # Update trace params in decorator context if specified in openai call 342 if decorator_context_trace_id: 343 langfuse_context.update_current_trace( 344 session_id=session_id, user_id=user_id, tags=tags 345 ) 346 347 parent_observation_id = kwargs.get("parent_observation_id", None) or ( 348 decorator_context_observation_id 349 if decorator_context_observation_id != decorator_context_trace_id 350 else None 351 ) 352 if parent_observation_id is not None and not isinstance(parent_observation_id, str): 353 raise TypeError("parent_observation_id must be a string") 354 if parent_observation_id is not None and trace_id is None: 355 raise ValueError("parent_observation_id requires trace_id to be set") 356 357 metadata = kwargs.get("metadata", {}) 358 359 if metadata is not None and not isinstance(metadata, dict): 360 raise TypeError("metadata must be a dictionary") 361 362 model = kwargs.get("model", None) or None 363 364 prompt = None 365 366 if resource.type == "completion": 367 prompt = kwargs.get("prompt", None) 368 369 elif resource.object == "Responses": 370 prompt = kwargs.get("input", None) 371 372 elif resource.type == "chat": 373 prompt = _extract_chat_prompt(kwargs) 374 375 is_nested_trace = False 376 if trace_id: 377 is_nested_trace = True 378 langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags) 379 else: 380 trace_id = ( 381 decorator_context_trace_id 382 or langfuse.trace( 383 session_id=session_id, 384 user_id=user_id, 385 tags=tags, 386 name=name, 387 input=prompt, 388 metadata=metadata, 389 ).id 390 ) 391 392 parsed_temperature = ( 393 kwargs.get("temperature", 1) 394 if not isinstance(kwargs.get("temperature", 1), NotGiven) 395 else 1 396 ) 397 398 parsed_max_tokens = ( 399 kwargs.get("max_tokens", float("inf")) 400 if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven) 401 else float("inf") 402 ) 403 404 parsed_top_p = ( 405 kwargs.get("top_p", 1) 406 if not isinstance(kwargs.get("top_p", 1), NotGiven) 407 else 1 408 ) 409 410 parsed_frequency_penalty = ( 411 kwargs.get("frequency_penalty", 0) 412 if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven) 413 else 0 414 ) 415 416 parsed_presence_penalty = ( 417 kwargs.get("presence_penalty", 0) 418 if not isinstance(kwargs.get("presence_penalty", 0), NotGiven) 419 else 0 420 ) 421 422 parsed_seed = ( 423 kwargs.get("seed", None) 424 if not isinstance(kwargs.get("seed", None), NotGiven) 425 else None 426 ) 427 428 parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1 429 430 modelParameters = { 431 "temperature": parsed_temperature, 432 "max_tokens": parsed_max_tokens, # casing? 433 "top_p": parsed_top_p, 434 "frequency_penalty": parsed_frequency_penalty, 435 "presence_penalty": parsed_presence_penalty, 436 } 437 if parsed_n is not None and parsed_n > 1: 438 modelParameters["n"] = parsed_n 439 440 if parsed_seed is not None: 441 modelParameters["seed"] = parsed_seed 442 443 langfuse_prompt = kwargs.get("langfuse_prompt", None) 444 445 return { 446 "name": name, 447 "metadata": metadata, 448 "trace_id": trace_id, 449 "parent_observation_id": parent_observation_id, 450 "user_id": user_id, 451 "start_time": start_time, 452 "input": prompt, 453 "model_parameters": modelParameters, 454 "model": model or None, 455 "prompt": langfuse_prompt, 456 }, is_nested_trace 457 458 459def _create_langfuse_update( 460 completion, 461 generation: StatefulGenerationClient, 462 completion_start_time, 463 model=None, 464 usage=None, 465 metadata=None, 466): 467 update = { 468 "end_time": _get_timestamp(), 469 "output": completion, 470 "completion_start_time": completion_start_time, 471 } 472 if model is not None: 473 update["model"] = model 474 475 if metadata is not None: 476 update["metadata"] = metadata 477 478 if usage is not None: 479 update["usage"] = _parse_usage(usage) 480 481 generation.update(**update) 482 483 484def _parse_usage(usage=None): 485 if usage is None: 486 return 487 488 usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__.copy() 489 490 for tokens_details in [ 491 "prompt_tokens_details", 492 "completion_tokens_details", 493 "input_token_details", 494 "output_token_details", 495 ]: 496 if tokens_details in usage_dict and usage_dict[tokens_details] is not None: 497 tokens_details_dict = ( 498 usage_dict[tokens_details] 499 if isinstance(usage_dict[tokens_details], dict) 500 else usage_dict[tokens_details].__dict__ 501 ) 502 usage_dict[tokens_details] = { 503 k: v for k, v in tokens_details_dict.items() if v is not None 504 } 505 506 return usage_dict 507 508 509def _extract_streamed_response_api_response(chunks): 510 completion, model, usage = None, None, None 511 metadata = {} 512 513 for raw_chunk in chunks: 514 chunk = raw_chunk.__dict__ 515 if raw_response := chunk.get("response", None): 516 usage = chunk.get("usage", None) 517 response = raw_response.__dict__ 518 model = response.get("model") 519 520 for key, val in response.items(): 521 if key not in ["created_at", "model", "output", "usage", "text"]: 522 metadata[key] = val 523 524 if key == "output": 525 output = val 526 if not isinstance(output, list): 527 completion = output 528 elif len(output) > 1: 529 completion = output 530 elif len(output) == 1: 531 completion = output[0] 532 533 return (model, completion, usage, metadata) 534 535 536def _extract_streamed_openai_response(resource, chunks): 537 completion = defaultdict(str) if resource.type == "chat" else "" 538 model, usage = None, None 539 540 for chunk in chunks: 541 if _is_openai_v1(): 542 chunk = chunk.__dict__ 543 544 model = model or chunk.get("model", None) or None 545 usage = chunk.get("usage", None) 546 547 choices = chunk.get("choices", []) 548 549 for choice in choices: 550 if _is_openai_v1(): 551 choice = choice.__dict__ 552 if resource.type == "chat": 553 delta = choice.get("delta", None) 554 555 if _is_openai_v1(): 556 delta = delta.__dict__ 557 558 if delta.get("role", None) is not None: 559 completion["role"] = delta["role"] 560 561 if delta.get("content", None) is not None: 562 completion["content"] = ( 563 delta.get("content", None) 564 if completion["content"] is None 565 else completion["content"] + delta.get("content", None) 566 ) 567 elif delta.get("function_call", None) is not None: 568 curr = completion["function_call"] 569 tool_call_chunk = delta.get("function_call", None) 570 571 if not curr: 572 completion["function_call"] = { 573 "name": getattr(tool_call_chunk, "name", ""), 574 "arguments": getattr(tool_call_chunk, "arguments", ""), 575 } 576 577 else: 578 curr["name"] = curr["name"] or getattr( 579 tool_call_chunk, "name", None 580 ) 581 curr["arguments"] += getattr(tool_call_chunk, "arguments", "") 582 583 elif delta.get("tool_calls", None) is not None: 584 curr = completion["tool_calls"] 585 tool_call_chunk = getattr( 586 delta.get("tool_calls", None)[0], "function", None 587 ) 588 589 if not curr: 590 completion["tool_calls"] = [ 591 { 592 "name": getattr(tool_call_chunk, "name", ""), 593 "arguments": getattr(tool_call_chunk, "arguments", ""), 594 } 595 ] 596 597 elif getattr(tool_call_chunk, "name", None) is not None: 598 curr.append( 599 { 600 "name": getattr(tool_call_chunk, "name", None), 601 "arguments": getattr( 602 tool_call_chunk, "arguments", None 603 ), 604 } 605 ) 606 607 else: 608 curr[-1]["name"] = curr[-1]["name"] or getattr( 609 tool_call_chunk, "name", None 610 ) 611 curr[-1]["arguments"] += getattr( 612 tool_call_chunk, "arguments", None 613 ) 614 615 if resource.type == "completion": 616 completion += choice.get("text", "") 617 618 def get_response_for_chat(): 619 return ( 620 completion["content"] 621 or ( 622 completion["function_call"] 623 and { 624 "role": "assistant", 625 "function_call": completion["function_call"], 626 } 627 ) 628 or ( 629 completion["tool_calls"] 630 and { 631 "role": "assistant", 632 # "tool_calls": [{"function": completion["tool_calls"]}], 633 "tool_calls": [ 634 {"function": data} for data in completion["tool_calls"] 635 ], 636 } 637 ) 638 or None 639 ) 640 641 return ( 642 model, 643 get_response_for_chat() if resource.type == "chat" else completion, 644 usage, 645 None, 646 ) 647 648 649def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): 650 if response is None: 651 return None, "<NoneType response returned from OpenAI>", None 652 653 model = response.get("model", None) or None 654 655 completion = None 656 657 if resource.type == "completion": 658 choices = response.get("choices", []) 659 if len(choices) > 0: 660 choice = choices[-1] 661 662 completion = choice.text if _is_openai_v1() else choice.get("text", None) 663 664 elif resource.object == "Responses": 665 output = response.get("output", {}) 666 667 if not isinstance(output, list): 668 completion = output 669 elif len(output) > 1: 670 completion = output 671 elif len(output) == 1: 672 completion = output[0] 673 674 elif resource.type == "chat": 675 choices = response.get("choices", []) 676 if len(choices) > 0: 677 # If multiple choices were generated, we'll show all of them in the UI as a list. 678 if len(choices) > 1: 679 completion = [ 680 _extract_chat_response(choice.message.__dict__) 681 if _is_openai_v1() 682 else choice.get("message", None) 683 for choice in choices 684 ] 685 else: 686 choice = choices[0] 687 completion = ( 688 _extract_chat_response(choice.message.__dict__) 689 if _is_openai_v1() 690 else choice.get("message", None) 691 ) 692 693 usage = _parse_usage(response.get("usage", None)) 694 695 return (model, completion, usage) 696 697 698def _is_openai_v1(): 699 return Version(openai.__version__) >= Version("1.0.0") 700 701 702def _is_streaming_response(response): 703 return ( 704 isinstance(response, types.GeneratorType) 705 or isinstance(response, types.AsyncGeneratorType) 706 or (_is_openai_v1() and isinstance(response, openai.Stream)) 707 or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) 708 ) 709 710 711@_langfuse_wrapper 712def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs): 713 new_langfuse: Langfuse = initialize() 714 715 start_time = _get_timestamp() 716 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 717 718 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 719 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 720 ) 721 generation = new_langfuse.generation(**generation) 722 try: 723 openai_response = wrapped(**arg_extractor.get_openai_args()) 724 725 if _is_streaming_response(openai_response): 726 return LangfuseResponseGeneratorSync( 727 resource=open_ai_resource, 728 response=openai_response, 729 generation=generation, 730 langfuse=new_langfuse, 731 is_nested_trace=is_nested_trace, 732 ) 733 734 else: 735 model, completion, usage = _get_langfuse_data_from_default_response( 736 open_ai_resource, 737 (openai_response and openai_response.__dict__) 738 if _is_openai_v1() 739 else openai_response, 740 ) 741 generation.update( 742 model=model, 743 output=completion, 744 end_time=_get_timestamp(), 745 usage=usage, # backward compat for all V2 self hosters 746 usage_details=usage, 747 ) 748 749 # Avoiding the trace-update if trace-id is provided by user. 750 if not is_nested_trace: 751 new_langfuse.trace(id=generation.trace_id, output=completion) 752 753 return openai_response 754 except Exception as ex: 755 log.warning(ex) 756 model = kwargs.get("model", None) or None 757 generation.update( 758 end_time=_get_timestamp(), 759 status_message=str(ex), 760 level="ERROR", 761 model=model, 762 usage={ 763 "input_cost": 0, 764 "output_cost": 0, 765 "total_cost": 0, 766 }, # backward compat for all V2 self hosters 767 cost_details={"input": 0, "output": 0, "total": 0}, 768 ) 769 raise ex 770 771 772@_langfuse_wrapper 773async def _wrap_async( 774 open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs 775): 776 new_langfuse = initialize() 777 start_time = _get_timestamp() 778 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 779 780 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 781 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 782 ) 783 generation = new_langfuse.generation(**generation) 784 try: 785 openai_response = await wrapped(**arg_extractor.get_openai_args()) 786 787 if _is_streaming_response(openai_response): 788 return LangfuseResponseGeneratorAsync( 789 resource=open_ai_resource, 790 response=openai_response, 791 generation=generation, 792 langfuse=new_langfuse, 793 is_nested_trace=is_nested_trace, 794 ) 795 796 else: 797 model, completion, usage = _get_langfuse_data_from_default_response( 798 open_ai_resource, 799 (openai_response and openai_response.__dict__) 800 if _is_openai_v1() 801 else openai_response, 802 ) 803 generation.update( 804 model=model, 805 output=completion, 806 end_time=_get_timestamp(), 807 usage=usage, # backward compat for all V2 self hosters 808 usage_details=usage, 809 ) 810 # Avoiding the trace-update if trace-id is provided by user. 811 if not is_nested_trace: 812 new_langfuse.trace(id=generation.trace_id, output=completion) 813 814 return openai_response 815 except Exception as ex: 816 model = kwargs.get("model", None) or None 817 generation.update( 818 end_time=_get_timestamp(), 819 status_message=str(ex), 820 level="ERROR", 821 model=model, 822 usage={ 823 "input_cost": 0, 824 "output_cost": 0, 825 "total_cost": 0, 826 }, # Backward compat for all V2 self hosters 827 cost_details={"input": 0, "output": 0, "total": 0}, 828 ) 829 raise ex 830 831 832class OpenAILangfuse: 833 _langfuse: Optional[Langfuse] = None 834 835 def initialize(self): 836 self._langfuse = LangfuseSingleton().get( 837 public_key=openai.langfuse_public_key, 838 secret_key=openai.langfuse_secret_key, 839 host=openai.langfuse_host, 840 debug=openai.langfuse_debug, 841 enabled=openai.langfuse_enabled, 842 sdk_integration="openai", 843 sample_rate=openai.langfuse_sample_rate, 844 environment=openai.langfuse_environment, 845 mask=openai.langfuse_mask, 846 ) 847 848 return self._langfuse 849 850 def flush(cls): 851 cls._langfuse.flush() 852 853 def langfuse_auth_check(self): 854 """Check if the provided Langfuse credentials (public and secret key) are valid. 855 856 Raises: 857 Exception: If no projects were found for the provided credentials. 858 859 Note: 860 This method is blocking. It is discouraged to use it in production code. 861 """ 862 if self._langfuse is None: 863 self.initialize() 864 865 return self._langfuse.auth_check() 866 867 def register_tracing(self): 868 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 869 870 for resource in resources: 871 if resource.min_version is not None and Version( 872 openai.__version__ 873 ) < Version(resource.min_version): 874 continue 875 876 wrap_function_wrapper( 877 resource.module, 878 f"{resource.object}.{resource.method}", 879 _wrap(resource, self.initialize) 880 if resource.sync 881 else _wrap_async(resource, self.initialize), 882 ) 883 884 setattr(openai, "langfuse_public_key", None) 885 setattr(openai, "langfuse_secret_key", None) 886 setattr(openai, "langfuse_host", None) 887 setattr(openai, "langfuse_debug", None) 888 setattr(openai, "langfuse_enabled", True) 889 setattr(openai, "langfuse_sample_rate", None) 890 setattr(openai, "langfuse_environment", None) 891 setattr(openai, "langfuse_mask", None) 892 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 893 setattr(openai, "flush_langfuse", self.flush) 894 895 896modifier = OpenAILangfuse() 897modifier.register_tracing() 898 899 900# DEPRECATED: Use `openai.langfuse_auth_check()` instead 901def auth_check(): 902 if modifier._langfuse is None: 903 modifier.initialize() 904 905 return modifier._langfuse.auth_check() 906 907 908class LangfuseResponseGeneratorSync: 909 def __init__( 910 self, 911 *, 912 resource, 913 response, 914 generation, 915 langfuse, 916 is_nested_trace, 917 ): 918 self.items = [] 919 920 self.resource = resource 921 self.response = response 922 self.generation = generation 923 self.langfuse = langfuse 924 self.is_nested_trace = is_nested_trace 925 self.completion_start_time = None 926 927 def __iter__(self): 928 try: 929 for i in self.response: 930 self.items.append(i) 931 932 if self.completion_start_time is None: 933 self.completion_start_time = _get_timestamp() 934 935 yield i 936 finally: 937 self._finalize() 938 939 def __next__(self): 940 try: 941 item = self.response.__next__() 942 self.items.append(item) 943 944 if self.completion_start_time is None: 945 self.completion_start_time = _get_timestamp() 946 947 return item 948 949 except StopIteration: 950 self._finalize() 951 952 raise 953 954 def __enter__(self): 955 return self.__iter__() 956 957 def __exit__(self, exc_type, exc_value, traceback): 958 pass 959 960 def _finalize(self): 961 model, completion, usage, metadata = ( 962 _extract_streamed_response_api_response(self.items) 963 if self.resource.object == "Responses" 964 else _extract_streamed_openai_response(self.resource, self.items) 965 ) 966 967 # Avoiding the trace-update if trace-id is provided by user. 968 if not self.is_nested_trace: 969 self.langfuse.trace(id=self.generation.trace_id, output=completion) 970 971 _create_langfuse_update( 972 completion, 973 self.generation, 974 self.completion_start_time, 975 model=model, 976 usage=usage, 977 metadata=metadata, 978 ) 979 980 981class LangfuseResponseGeneratorAsync: 982 def __init__( 983 self, 984 *, 985 resource, 986 response, 987 generation, 988 langfuse, 989 is_nested_trace, 990 ): 991 self.items = [] 992 993 self.resource = resource 994 self.response = response 995 self.generation = generation 996 self.langfuse = langfuse 997 self.is_nested_trace = is_nested_trace 998 self.completion_start_time = None 999 1000 async def __aiter__(self): 1001 try: 1002 async for i in self.response: 1003 self.items.append(i) 1004 1005 if self.completion_start_time is None: 1006 self.completion_start_time = _get_timestamp() 1007 1008 yield i 1009 finally: 1010 await self._finalize() 1011 1012 async def __anext__(self): 1013 try: 1014 item = await self.response.__anext__() 1015 self.items.append(item) 1016 1017 if self.completion_start_time is None: 1018 self.completion_start_time = _get_timestamp() 1019 1020 return item 1021 1022 except StopAsyncIteration: 1023 await self._finalize() 1024 1025 raise 1026 1027 async def __aenter__(self): 1028 return self.__aiter__() 1029 1030 async def __aexit__(self, exc_type, exc_value, traceback): 1031 pass 1032 1033 async def _finalize(self): 1034 model, completion, usage, metadata = ( 1035 _extract_streamed_response_api_response(self.items) 1036 if self.resource.object == "Responses" 1037 else _extract_streamed_openai_response(self.resource, self.items) 1038 ) 1039 1040 # Avoiding the trace-update if trace-id is provided by user. 1041 if not self.is_nested_trace: 1042 self.langfuse.trace(id=self.generation.trace_id, output=completion) 1043 1044 _create_langfuse_update( 1045 completion, 1046 self.generation, 1047 self.completion_start_time, 1048 model=model, 1049 usage=usage, 1050 metadata=metadata, 1051 ) 1052 1053 async def close(self) -> None: 1054 """Close the response and release the connection. 1055 1056 Automatically called if the response body is read to completion. 1057 """ 1058 await self.response.close() 1059 1060 async def aclose(self) -> None: 1061 """Close the response and release the connection. 1062 1063 Automatically called if the response body is read to completion. 1064 """ 1065 await self.response.aclose()
log =
<Logger langfuse (WARNING)>
@dataclass
class
OpenAiDefinition:
58@dataclass 59class OpenAiDefinition: 60 module: str 61 object: str 62 method: str 63 type: str 64 sync: bool 65 min_version: Optional[str] = None
OPENAI_METHODS_V0 =
[OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 =
[OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.responses', object='Responses', method='create', type='chat', sync=True, min_version='1.66.0'), OpenAiDefinition(module='openai.resources.responses', object='AsyncResponses', method='create', type='chat', sync=False, min_version='1.66.0')]
class
OpenAiArgsExtractor:
150class OpenAiArgsExtractor: 151 def __init__( 152 self, 153 name=None, 154 metadata=None, 155 trace_id=None, 156 session_id=None, 157 user_id=None, 158 tags=None, 159 parent_observation_id=None, 160 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 161 **kwargs, 162 ): 163 self.args = {} 164 self.args["name"] = name 165 self.args["metadata"] = ( 166 metadata 167 if "response_format" not in kwargs 168 else { 169 **(metadata or {}), 170 "response_format": kwargs["response_format"].model_json_schema() 171 if isclass(kwargs["response_format"]) 172 and issubclass(kwargs["response_format"], BaseModel) 173 else kwargs["response_format"], 174 } 175 ) 176 self.args["trace_id"] = trace_id 177 self.args["session_id"] = session_id 178 self.args["user_id"] = user_id 179 self.args["tags"] = tags 180 self.args["parent_observation_id"] = parent_observation_id 181 self.args["langfuse_prompt"] = langfuse_prompt 182 self.kwargs = kwargs 183 184 def get_langfuse_args(self): 185 return {**self.args, **self.kwargs} 186 187 def get_openai_args(self): 188 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 189 # https://platform.openai.com/docs/guides/distillation 190 if self.kwargs.get("store", False): 191 self.kwargs["metadata"] = ( 192 {} if self.args.get("metadata", None) is None else self.args["metadata"] 193 ) 194 195 # OpenAI does not support non-string type values in metadata when using 196 # model distillation feature 197 self.kwargs["metadata"].pop("response_format", None) 198 199 return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
151 def __init__( 152 self, 153 name=None, 154 metadata=None, 155 trace_id=None, 156 session_id=None, 157 user_id=None, 158 tags=None, 159 parent_observation_id=None, 160 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 161 **kwargs, 162 ): 163 self.args = {} 164 self.args["name"] = name 165 self.args["metadata"] = ( 166 metadata 167 if "response_format" not in kwargs 168 else { 169 **(metadata or {}), 170 "response_format": kwargs["response_format"].model_json_schema() 171 if isclass(kwargs["response_format"]) 172 and issubclass(kwargs["response_format"], BaseModel) 173 else kwargs["response_format"], 174 } 175 ) 176 self.args["trace_id"] = trace_id 177 self.args["session_id"] = session_id 178 self.args["user_id"] = user_id 179 self.args["tags"] = tags 180 self.args["parent_observation_id"] = parent_observation_id 181 self.args["langfuse_prompt"] = langfuse_prompt 182 self.kwargs = kwargs
def
get_openai_args(self):
187 def get_openai_args(self): 188 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 189 # https://platform.openai.com/docs/guides/distillation 190 if self.kwargs.get("store", False): 191 self.kwargs["metadata"] = ( 192 {} if self.args.get("metadata", None) is None else self.args["metadata"] 193 ) 194 195 # OpenAI does not support non-string type values in metadata when using 196 # model distillation feature 197 self.kwargs["metadata"].pop("response_format", None) 198 199 return self.kwargs
class
OpenAILangfuse:
833class OpenAILangfuse: 834 _langfuse: Optional[Langfuse] = None 835 836 def initialize(self): 837 self._langfuse = LangfuseSingleton().get( 838 public_key=openai.langfuse_public_key, 839 secret_key=openai.langfuse_secret_key, 840 host=openai.langfuse_host, 841 debug=openai.langfuse_debug, 842 enabled=openai.langfuse_enabled, 843 sdk_integration="openai", 844 sample_rate=openai.langfuse_sample_rate, 845 environment=openai.langfuse_environment, 846 mask=openai.langfuse_mask, 847 ) 848 849 return self._langfuse 850 851 def flush(cls): 852 cls._langfuse.flush() 853 854 def langfuse_auth_check(self): 855 """Check if the provided Langfuse credentials (public and secret key) are valid. 856 857 Raises: 858 Exception: If no projects were found for the provided credentials. 859 860 Note: 861 This method is blocking. It is discouraged to use it in production code. 862 """ 863 if self._langfuse is None: 864 self.initialize() 865 866 return self._langfuse.auth_check() 867 868 def register_tracing(self): 869 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 870 871 for resource in resources: 872 if resource.min_version is not None and Version( 873 openai.__version__ 874 ) < Version(resource.min_version): 875 continue 876 877 wrap_function_wrapper( 878 resource.module, 879 f"{resource.object}.{resource.method}", 880 _wrap(resource, self.initialize) 881 if resource.sync 882 else _wrap_async(resource, self.initialize), 883 ) 884 885 setattr(openai, "langfuse_public_key", None) 886 setattr(openai, "langfuse_secret_key", None) 887 setattr(openai, "langfuse_host", None) 888 setattr(openai, "langfuse_debug", None) 889 setattr(openai, "langfuse_enabled", True) 890 setattr(openai, "langfuse_sample_rate", None) 891 setattr(openai, "langfuse_environment", None) 892 setattr(openai, "langfuse_mask", None) 893 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 894 setattr(openai, "flush_langfuse", self.flush)
def
initialize(self):
836 def initialize(self): 837 self._langfuse = LangfuseSingleton().get( 838 public_key=openai.langfuse_public_key, 839 secret_key=openai.langfuse_secret_key, 840 host=openai.langfuse_host, 841 debug=openai.langfuse_debug, 842 enabled=openai.langfuse_enabled, 843 sdk_integration="openai", 844 sample_rate=openai.langfuse_sample_rate, 845 environment=openai.langfuse_environment, 846 mask=openai.langfuse_mask, 847 ) 848 849 return self._langfuse
def
langfuse_auth_check(self):
854 def langfuse_auth_check(self): 855 """Check if the provided Langfuse credentials (public and secret key) are valid. 856 857 Raises: 858 Exception: If no projects were found for the provided credentials. 859 860 Note: 861 This method is blocking. It is discouraged to use it in production code. 862 """ 863 if self._langfuse is None: 864 self.initialize() 865 866 return self._langfuse.auth_check()
Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
def
register_tracing(self):
868 def register_tracing(self): 869 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 870 871 for resource in resources: 872 if resource.min_version is not None and Version( 873 openai.__version__ 874 ) < Version(resource.min_version): 875 continue 876 877 wrap_function_wrapper( 878 resource.module, 879 f"{resource.object}.{resource.method}", 880 _wrap(resource, self.initialize) 881 if resource.sync 882 else _wrap_async(resource, self.initialize), 883 ) 884 885 setattr(openai, "langfuse_public_key", None) 886 setattr(openai, "langfuse_secret_key", None) 887 setattr(openai, "langfuse_host", None) 888 setattr(openai, "langfuse_debug", None) 889 setattr(openai, "langfuse_enabled", True) 890 setattr(openai, "langfuse_sample_rate", None) 891 setattr(openai, "langfuse_environment", None) 892 setattr(openai, "langfuse_mask", None) 893 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 894 setattr(openai, "flush_langfuse", self.flush)
modifier =
<OpenAILangfuse object>
def
auth_check():
class
LangfuseResponseGeneratorSync:
909class LangfuseResponseGeneratorSync: 910 def __init__( 911 self, 912 *, 913 resource, 914 response, 915 generation, 916 langfuse, 917 is_nested_trace, 918 ): 919 self.items = [] 920 921 self.resource = resource 922 self.response = response 923 self.generation = generation 924 self.langfuse = langfuse 925 self.is_nested_trace = is_nested_trace 926 self.completion_start_time = None 927 928 def __iter__(self): 929 try: 930 for i in self.response: 931 self.items.append(i) 932 933 if self.completion_start_time is None: 934 self.completion_start_time = _get_timestamp() 935 936 yield i 937 finally: 938 self._finalize() 939 940 def __next__(self): 941 try: 942 item = self.response.__next__() 943 self.items.append(item) 944 945 if self.completion_start_time is None: 946 self.completion_start_time = _get_timestamp() 947 948 return item 949 950 except StopIteration: 951 self._finalize() 952 953 raise 954 955 def __enter__(self): 956 return self.__iter__() 957 958 def __exit__(self, exc_type, exc_value, traceback): 959 pass 960 961 def _finalize(self): 962 model, completion, usage, metadata = ( 963 _extract_streamed_response_api_response(self.items) 964 if self.resource.object == "Responses" 965 else _extract_streamed_openai_response(self.resource, self.items) 966 ) 967 968 # Avoiding the trace-update if trace-id is provided by user. 969 if not self.is_nested_trace: 970 self.langfuse.trace(id=self.generation.trace_id, output=completion) 971 972 _create_langfuse_update( 973 completion, 974 self.generation, 975 self.completion_start_time, 976 model=model, 977 usage=usage, 978 metadata=metadata, 979 )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
910 def __init__( 911 self, 912 *, 913 resource, 914 response, 915 generation, 916 langfuse, 917 is_nested_trace, 918 ): 919 self.items = [] 920 921 self.resource = resource 922 self.response = response 923 self.generation = generation 924 self.langfuse = langfuse 925 self.is_nested_trace = is_nested_trace 926 self.completion_start_time = None
class
LangfuseResponseGeneratorAsync:
982class LangfuseResponseGeneratorAsync: 983 def __init__( 984 self, 985 *, 986 resource, 987 response, 988 generation, 989 langfuse, 990 is_nested_trace, 991 ): 992 self.items = [] 993 994 self.resource = resource 995 self.response = response 996 self.generation = generation 997 self.langfuse = langfuse 998 self.is_nested_trace = is_nested_trace 999 self.completion_start_time = None 1000 1001 async def __aiter__(self): 1002 try: 1003 async for i in self.response: 1004 self.items.append(i) 1005 1006 if self.completion_start_time is None: 1007 self.completion_start_time = _get_timestamp() 1008 1009 yield i 1010 finally: 1011 await self._finalize() 1012 1013 async def __anext__(self): 1014 try: 1015 item = await self.response.__anext__() 1016 self.items.append(item) 1017 1018 if self.completion_start_time is None: 1019 self.completion_start_time = _get_timestamp() 1020 1021 return item 1022 1023 except StopAsyncIteration: 1024 await self._finalize() 1025 1026 raise 1027 1028 async def __aenter__(self): 1029 return self.__aiter__() 1030 1031 async def __aexit__(self, exc_type, exc_value, traceback): 1032 pass 1033 1034 async def _finalize(self): 1035 model, completion, usage, metadata = ( 1036 _extract_streamed_response_api_response(self.items) 1037 if self.resource.object == "Responses" 1038 else _extract_streamed_openai_response(self.resource, self.items) 1039 ) 1040 1041 # Avoiding the trace-update if trace-id is provided by user. 1042 if not self.is_nested_trace: 1043 self.langfuse.trace(id=self.generation.trace_id, output=completion) 1044 1045 _create_langfuse_update( 1046 completion, 1047 self.generation, 1048 self.completion_start_time, 1049 model=model, 1050 usage=usage, 1051 metadata=metadata, 1052 ) 1053 1054 async def close(self) -> None: 1055 """Close the response and release the connection. 1056 1057 Automatically called if the response body is read to completion. 1058 """ 1059 await self.response.close() 1060 1061 async def aclose(self) -> None: 1062 """Close the response and release the connection. 1063 1064 Automatically called if the response body is read to completion. 1065 """ 1066 await self.response.aclose()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
983 def __init__( 984 self, 985 *, 986 resource, 987 response, 988 generation, 989 langfuse, 990 is_nested_trace, 991 ): 992 self.items = [] 993 994 self.resource = resource 995 self.response = response 996 self.generation = generation 997 self.langfuse = langfuse 998 self.is_nested_trace = is_nested_trace 999 self.completion_start_time = None
async def
close(self) -> None:
1054 async def close(self) -> None: 1055 """Close the response and release the connection. 1056 1057 Automatically called if the response body is read to completion. 1058 """ 1059 await self.response.close()
Close the response and release the connection.
Automatically called if the response body is read to completion.
async def
aclose(self) -> None:
1061 async def aclose(self) -> None: 1062 """Close the response and release the connection. 1063 1064 Automatically called if the response body is read to completion. 1065 """ 1066 await self.response.aclose()
Close the response and release the connection.
Automatically called if the response body is read to completion.