langfuse.openai
If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
- import openai
+ from langfuse.openai import openai
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the observe()
decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import. 2 3```diff 4- import openai 5+ from langfuse.openai import openai 6``` 7 8Langfuse automatically tracks: 9 10- All prompts/completions with support for streaming, async and functions 11- Latencies 12- API Errors 13- Model usage (tokens) and cost (USD) 14 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK. 16 17See docs for more details: https://langfuse.com/docs/integrations/openai 18""" 19 20import logging 21import types 22from collections import defaultdict 23from dataclasses import dataclass 24from inspect import isclass 25from typing import Optional 26 27import openai.resources 28from openai._types import NotGiven 29from packaging.version import Version 30from pydantic import BaseModel 31from wrapt import wrap_function_wrapper 32 33from langfuse import Langfuse 34from langfuse.client import StatefulGenerationClient 35from langfuse.decorators import langfuse_context 36from langfuse.media import LangfuseMedia 37from langfuse.utils import _get_timestamp 38from langfuse.utils.langfuse_singleton import LangfuseSingleton 39 40try: 41 import openai 42except ImportError: 43 raise ModuleNotFoundError( 44 "Please install OpenAI to use this feature: 'pip install openai'" 45 ) 46 47try: 48 from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401 49except ImportError: 50 AsyncAzureOpenAI = None 51 AsyncOpenAI = None 52 AzureOpenAI = None 53 OpenAI = None 54 55log = logging.getLogger("langfuse") 56 57 58@dataclass 59class OpenAiDefinition: 60 module: str 61 object: str 62 method: str 63 type: str 64 sync: bool 65 min_version: Optional[str] = None 66 67 68OPENAI_METHODS_V0 = [ 69 OpenAiDefinition( 70 module="openai", 71 object="ChatCompletion", 72 method="create", 73 type="chat", 74 sync=True, 75 ), 76 OpenAiDefinition( 77 module="openai", 78 object="Completion", 79 method="create", 80 type="completion", 81 sync=True, 82 ), 83] 84 85 86OPENAI_METHODS_V1 = [ 87 OpenAiDefinition( 88 module="openai.resources.chat.completions", 89 object="Completions", 90 method="create", 91 type="chat", 92 sync=True, 93 ), 94 OpenAiDefinition( 95 module="openai.resources.completions", 96 object="Completions", 97 method="create", 98 type="completion", 99 sync=True, 100 ), 101 OpenAiDefinition( 102 module="openai.resources.chat.completions", 103 object="AsyncCompletions", 104 method="create", 105 type="chat", 106 sync=False, 107 ), 108 OpenAiDefinition( 109 module="openai.resources.completions", 110 object="AsyncCompletions", 111 method="create", 112 type="completion", 113 sync=False, 114 ), 115 OpenAiDefinition( 116 module="openai.resources.beta.chat.completions", 117 object="Completions", 118 method="parse", 119 type="chat", 120 sync=True, 121 min_version="1.50.0", 122 ), 123 OpenAiDefinition( 124 module="openai.resources.beta.chat.completions", 125 object="AsyncCompletions", 126 method="parse", 127 type="chat", 128 sync=False, 129 min_version="1.50.0", 130 ), 131] 132 133 134class OpenAiArgsExtractor: 135 def __init__( 136 self, 137 name=None, 138 metadata=None, 139 trace_id=None, 140 session_id=None, 141 user_id=None, 142 tags=None, 143 parent_observation_id=None, 144 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 145 **kwargs, 146 ): 147 self.args = {} 148 self.args["name"] = name 149 self.args["metadata"] = ( 150 metadata 151 if "response_format" not in kwargs 152 else { 153 **(metadata or {}), 154 "response_format": kwargs["response_format"].model_json_schema() 155 if isclass(kwargs["response_format"]) 156 and issubclass(kwargs["response_format"], BaseModel) 157 else kwargs["response_format"], 158 } 159 ) 160 self.args["trace_id"] = trace_id 161 self.args["session_id"] = session_id 162 self.args["user_id"] = user_id 163 self.args["tags"] = tags 164 self.args["parent_observation_id"] = parent_observation_id 165 self.args["langfuse_prompt"] = langfuse_prompt 166 self.kwargs = kwargs 167 168 def get_langfuse_args(self): 169 return {**self.args, **self.kwargs} 170 171 def get_openai_args(self): 172 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 173 # https://platform.openai.com/docs/guides/distillation 174 if self.kwargs.get("store", False): 175 self.kwargs["metadata"] = self.args.get("metadata", {}) 176 177 # OpenAI does not support non-string type values in metadata when using 178 # model distillation feature 179 self.kwargs["metadata"].pop("response_format", None) 180 181 return self.kwargs 182 183 184def _langfuse_wrapper(func): 185 def _with_langfuse(open_ai_definitions, initialize): 186 def wrapper(wrapped, instance, args, kwargs): 187 return func(open_ai_definitions, initialize, wrapped, args, kwargs) 188 189 return wrapper 190 191 return _with_langfuse 192 193 194def _extract_chat_prompt(kwargs: any): 195 """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions""" 196 prompt = {} 197 198 if kwargs.get("functions") is not None: 199 prompt.update({"functions": kwargs["functions"]}) 200 201 if kwargs.get("function_call") is not None: 202 prompt.update({"function_call": kwargs["function_call"]}) 203 204 if kwargs.get("tools") is not None: 205 prompt.update({"tools": kwargs["tools"]}) 206 207 if prompt: 208 # uf user provided functions, we need to send these together with messages to langfuse 209 prompt.update( 210 { 211 "messages": [ 212 _process_message(message) for message in kwargs.get("messages", []) 213 ], 214 } 215 ) 216 return prompt 217 else: 218 # vanilla case, only send messages in openai format to langfuse 219 return [_process_message(message) for message in kwargs.get("messages", [])] 220 221 222def _process_message(message): 223 if not isinstance(message, dict): 224 return message 225 226 processed_message = {**message} 227 228 content = processed_message.get("content", None) 229 if not isinstance(content, list): 230 return processed_message 231 232 processed_content = [] 233 234 for content_part in content: 235 if content_part.get("type") == "input_audio": 236 audio_base64 = content_part.get("input_audio", {}).get("data", None) 237 format = content_part.get("input_audio", {}).get("format", "wav") 238 239 if audio_base64 is not None: 240 base64_data_uri = f"data:audio/{format};base64,{audio_base64}" 241 242 processed_content.append( 243 { 244 "type": "input_audio", 245 "input_audio": { 246 "data": LangfuseMedia(base64_data_uri=base64_data_uri), 247 "format": format, 248 }, 249 } 250 ) 251 else: 252 processed_content.append(content_part) 253 254 processed_message["content"] = processed_content 255 256 return processed_message 257 258 259def _extract_chat_response(kwargs: any): 260 """Extracts the llm output from the response.""" 261 response = { 262 "role": kwargs.get("role", None), 263 } 264 265 audio = None 266 267 if kwargs.get("function_call") is not None: 268 response.update({"function_call": kwargs["function_call"]}) 269 270 if kwargs.get("tool_calls") is not None: 271 response.update({"tool_calls": kwargs["tool_calls"]}) 272 273 if kwargs.get("audio") is not None: 274 audio = kwargs["audio"].__dict__ 275 276 if "data" in audio and audio["data"] is not None: 277 base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}" 278 audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri) 279 280 response.update( 281 { 282 "content": kwargs.get("content", None), 283 } 284 ) 285 286 if audio is not None: 287 response.update({"audio": audio}) 288 289 return response 290 291 292def _get_langfuse_data_from_kwargs( 293 resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs 294): 295 name = kwargs.get("name", "OpenAI-generation") 296 297 if name is None: 298 name = "OpenAI-generation" 299 300 if name is not None and not isinstance(name, str): 301 raise TypeError("name must be a string") 302 303 decorator_context_observation_id = langfuse_context.get_current_observation_id() 304 decorator_context_trace_id = langfuse_context.get_current_trace_id() 305 306 trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id 307 if trace_id is not None and not isinstance(trace_id, str): 308 raise TypeError("trace_id must be a string") 309 310 session_id = kwargs.get("session_id", None) 311 if session_id is not None and not isinstance(session_id, str): 312 raise TypeError("session_id must be a string") 313 314 user_id = kwargs.get("user_id", None) 315 if user_id is not None and not isinstance(user_id, str): 316 raise TypeError("user_id must be a string") 317 318 tags = kwargs.get("tags", None) 319 if tags is not None and ( 320 not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags) 321 ): 322 raise TypeError("tags must be a list of strings") 323 324 # Update trace params in decorator context if specified in openai call 325 if decorator_context_trace_id: 326 langfuse_context.update_current_trace( 327 session_id=session_id, user_id=user_id, tags=tags 328 ) 329 330 parent_observation_id = kwargs.get("parent_observation_id", None) or ( 331 decorator_context_observation_id 332 if decorator_context_observation_id != decorator_context_trace_id 333 else None 334 ) 335 if parent_observation_id is not None and not isinstance(parent_observation_id, str): 336 raise TypeError("parent_observation_id must be a string") 337 if parent_observation_id is not None and trace_id is None: 338 raise ValueError("parent_observation_id requires trace_id to be set") 339 340 metadata = kwargs.get("metadata", {}) 341 342 if metadata is not None and not isinstance(metadata, dict): 343 raise TypeError("metadata must be a dictionary") 344 345 model = kwargs.get("model", None) or None 346 347 prompt = None 348 349 if resource.type == "completion": 350 prompt = kwargs.get("prompt", None) 351 elif resource.type == "chat": 352 prompt = _extract_chat_prompt(kwargs) 353 354 is_nested_trace = False 355 if trace_id: 356 is_nested_trace = True 357 langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags) 358 else: 359 trace_id = ( 360 decorator_context_trace_id 361 or langfuse.trace( 362 session_id=session_id, 363 user_id=user_id, 364 tags=tags, 365 name=name, 366 input=prompt, 367 metadata=metadata, 368 ).id 369 ) 370 371 parsed_temperature = ( 372 kwargs.get("temperature", 1) 373 if not isinstance(kwargs.get("temperature", 1), NotGiven) 374 else 1 375 ) 376 377 parsed_max_tokens = ( 378 kwargs.get("max_tokens", float("inf")) 379 if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven) 380 else float("inf") 381 ) 382 383 parsed_top_p = ( 384 kwargs.get("top_p", 1) 385 if not isinstance(kwargs.get("top_p", 1), NotGiven) 386 else 1 387 ) 388 389 parsed_frequency_penalty = ( 390 kwargs.get("frequency_penalty", 0) 391 if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven) 392 else 0 393 ) 394 395 parsed_presence_penalty = ( 396 kwargs.get("presence_penalty", 0) 397 if not isinstance(kwargs.get("presence_penalty", 0), NotGiven) 398 else 0 399 ) 400 401 parsed_seed = ( 402 kwargs.get("seed", None) 403 if not isinstance(kwargs.get("seed", None), NotGiven) 404 else None 405 ) 406 407 parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1 408 409 modelParameters = { 410 "temperature": parsed_temperature, 411 "max_tokens": parsed_max_tokens, # casing? 412 "top_p": parsed_top_p, 413 "frequency_penalty": parsed_frequency_penalty, 414 "presence_penalty": parsed_presence_penalty, 415 } 416 if parsed_n is not None and parsed_n > 1: 417 modelParameters["n"] = parsed_n 418 419 if parsed_seed is not None: 420 modelParameters["seed"] = parsed_seed 421 422 langfuse_prompt = kwargs.get("langfuse_prompt", None) 423 424 return { 425 "name": name, 426 "metadata": metadata, 427 "trace_id": trace_id, 428 "parent_observation_id": parent_observation_id, 429 "user_id": user_id, 430 "start_time": start_time, 431 "input": prompt, 432 "model_parameters": modelParameters, 433 "model": model or None, 434 "prompt": langfuse_prompt, 435 }, is_nested_trace 436 437 438def _create_langfuse_update( 439 completion, 440 generation: StatefulGenerationClient, 441 completion_start_time, 442 model=None, 443 usage=None, 444): 445 update = { 446 "end_time": _get_timestamp(), 447 "output": completion, 448 "completion_start_time": completion_start_time, 449 } 450 if model is not None: 451 update["model"] = model 452 453 if usage is not None: 454 update["usage"] = _parse_usage(usage) 455 456 generation.update(**update) 457 458 459def _parse_usage(usage=None): 460 if usage is None: 461 return 462 463 usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__.copy() 464 465 for tokens_details in ["prompt_tokens_details", "completion_tokens_details"]: 466 if tokens_details in usage_dict and usage_dict[tokens_details] is not None: 467 tokens_details_dict = ( 468 usage_dict[tokens_details] 469 if isinstance(usage_dict[tokens_details], dict) 470 else usage_dict[tokens_details].__dict__ 471 ) 472 usage_dict[tokens_details] = { 473 k: v for k, v in tokens_details_dict.items() if v is not None 474 } 475 476 return usage_dict 477 478 479def _extract_streamed_openai_response(resource, chunks): 480 completion = defaultdict(str) if resource.type == "chat" else "" 481 model, usage = None, None 482 483 for chunk in chunks: 484 if _is_openai_v1(): 485 chunk = chunk.__dict__ 486 487 model = model or chunk.get("model", None) or None 488 usage = chunk.get("usage", None) 489 490 choices = chunk.get("choices", []) 491 492 for choice in choices: 493 if _is_openai_v1(): 494 choice = choice.__dict__ 495 if resource.type == "chat": 496 delta = choice.get("delta", None) 497 498 if _is_openai_v1(): 499 delta = delta.__dict__ 500 501 if delta.get("role", None) is not None: 502 completion["role"] = delta["role"] 503 504 if delta.get("content", None) is not None: 505 completion["content"] = ( 506 delta.get("content", None) 507 if completion["content"] is None 508 else completion["content"] + delta.get("content", None) 509 ) 510 elif delta.get("function_call", None) is not None: 511 curr = completion["function_call"] 512 tool_call_chunk = delta.get("function_call", None) 513 514 if not curr: 515 completion["function_call"] = { 516 "name": getattr(tool_call_chunk, "name", ""), 517 "arguments": getattr(tool_call_chunk, "arguments", ""), 518 } 519 520 else: 521 curr["name"] = curr["name"] or getattr( 522 tool_call_chunk, "name", None 523 ) 524 curr["arguments"] += getattr(tool_call_chunk, "arguments", "") 525 526 elif delta.get("tool_calls", None) is not None: 527 curr = completion["tool_calls"] 528 tool_call_chunk = getattr( 529 delta.get("tool_calls", None)[0], "function", None 530 ) 531 532 if not curr: 533 completion["tool_calls"] = [ 534 { 535 "name": getattr(tool_call_chunk, "name", ""), 536 "arguments": getattr(tool_call_chunk, "arguments", ""), 537 } 538 ] 539 540 elif getattr(tool_call_chunk, "name", None) is not None: 541 curr.append( 542 { 543 "name": getattr(tool_call_chunk, "name", None), 544 "arguments": getattr( 545 tool_call_chunk, "arguments", None 546 ), 547 } 548 ) 549 550 else: 551 curr[-1]["name"] = curr[-1]["name"] or getattr( 552 tool_call_chunk, "name", None 553 ) 554 curr[-1]["arguments"] += getattr( 555 tool_call_chunk, "arguments", None 556 ) 557 558 if resource.type == "completion": 559 completion += choice.get("text", None) 560 561 def get_response_for_chat(): 562 return ( 563 completion["content"] 564 or ( 565 completion["function_call"] 566 and { 567 "role": "assistant", 568 "function_call": completion["function_call"], 569 } 570 ) 571 or ( 572 completion["tool_calls"] 573 and { 574 "role": "assistant", 575 # "tool_calls": [{"function": completion["tool_calls"]}], 576 "tool_calls": [ 577 {"function": data} for data in completion["tool_calls"] 578 ], 579 } 580 ) 581 or None 582 ) 583 584 return ( 585 model, 586 get_response_for_chat() if resource.type == "chat" else completion, 587 usage, 588 ) 589 590 591def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): 592 if response is None: 593 return None, "<NoneType response returned from OpenAI>", None 594 595 model = response.get("model", None) or None 596 597 completion = None 598 if resource.type == "completion": 599 choices = response.get("choices", []) 600 if len(choices) > 0: 601 choice = choices[-1] 602 603 completion = choice.text if _is_openai_v1() else choice.get("text", None) 604 elif resource.type == "chat": 605 choices = response.get("choices", []) 606 if len(choices) > 0: 607 # If multiple choices were generated, we'll show all of them in the UI as a list. 608 if len(choices) > 1: 609 completion = [ 610 _extract_chat_response(choice.message.__dict__) 611 if _is_openai_v1() 612 else choice.get("message", None) 613 for choice in choices 614 ] 615 else: 616 choice = choices[0] 617 completion = ( 618 _extract_chat_response(choice.message.__dict__) 619 if _is_openai_v1() 620 else choice.get("message", None) 621 ) 622 623 usage = _parse_usage(response.get("usage", None)) 624 625 return (model, completion, usage) 626 627 628def _is_openai_v1(): 629 return Version(openai.__version__) >= Version("1.0.0") 630 631 632def _is_streaming_response(response): 633 return ( 634 isinstance(response, types.GeneratorType) 635 or isinstance(response, types.AsyncGeneratorType) 636 or (_is_openai_v1() and isinstance(response, openai.Stream)) 637 or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) 638 ) 639 640 641@_langfuse_wrapper 642def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs): 643 new_langfuse: Langfuse = initialize() 644 645 start_time = _get_timestamp() 646 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 647 648 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 649 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 650 ) 651 generation = new_langfuse.generation(**generation) 652 try: 653 openai_response = wrapped(**arg_extractor.get_openai_args()) 654 655 if _is_streaming_response(openai_response): 656 return LangfuseResponseGeneratorSync( 657 resource=open_ai_resource, 658 response=openai_response, 659 generation=generation, 660 langfuse=new_langfuse, 661 is_nested_trace=is_nested_trace, 662 ) 663 664 else: 665 model, completion, usage = _get_langfuse_data_from_default_response( 666 open_ai_resource, 667 (openai_response and openai_response.__dict__) 668 if _is_openai_v1() 669 else openai_response, 670 ) 671 generation.update( 672 model=model, 673 output=completion, 674 end_time=_get_timestamp(), 675 usage=usage, # backward compat for all V2 self hosters 676 usage_details=usage, 677 ) 678 679 # Avoiding the trace-update if trace-id is provided by user. 680 if not is_nested_trace: 681 new_langfuse.trace(id=generation.trace_id, output=completion) 682 683 return openai_response 684 except Exception as ex: 685 log.warning(ex) 686 model = kwargs.get("model", None) or None 687 generation.update( 688 end_time=_get_timestamp(), 689 status_message=str(ex), 690 level="ERROR", 691 model=model, 692 usage={ 693 "input_cost": 0, 694 "output_cost": 0, 695 "total_cost": 0, 696 }, # backward compat for all V2 self hosters 697 cost_details={"input": 0, "output": 0, "total": 0}, 698 ) 699 raise ex 700 701 702@_langfuse_wrapper 703async def _wrap_async( 704 open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs 705): 706 new_langfuse = initialize() 707 start_time = _get_timestamp() 708 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 709 710 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 711 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 712 ) 713 generation = new_langfuse.generation(**generation) 714 try: 715 openai_response = await wrapped(**arg_extractor.get_openai_args()) 716 717 if _is_streaming_response(openai_response): 718 return LangfuseResponseGeneratorAsync( 719 resource=open_ai_resource, 720 response=openai_response, 721 generation=generation, 722 langfuse=new_langfuse, 723 is_nested_trace=is_nested_trace, 724 ) 725 726 else: 727 model, completion, usage = _get_langfuse_data_from_default_response( 728 open_ai_resource, 729 (openai_response and openai_response.__dict__) 730 if _is_openai_v1() 731 else openai_response, 732 ) 733 generation.update( 734 model=model, 735 output=completion, 736 end_time=_get_timestamp(), 737 usage=usage, # backward compat for all V2 self hosters 738 usage_details=usage, 739 ) 740 # Avoiding the trace-update if trace-id is provided by user. 741 if not is_nested_trace: 742 new_langfuse.trace(id=generation.trace_id, output=completion) 743 744 return openai_response 745 except Exception as ex: 746 model = kwargs.get("model", None) or None 747 generation.update( 748 end_time=_get_timestamp(), 749 status_message=str(ex), 750 level="ERROR", 751 model=model, 752 usage={ 753 "input_cost": 0, 754 "output_cost": 0, 755 "total_cost": 0, 756 }, # Backward compat for all V2 self hosters 757 cost_details={"input": 0, "output": 0, "total": 0}, 758 ) 759 raise ex 760 761 762class OpenAILangfuse: 763 _langfuse: Optional[Langfuse] = None 764 765 def initialize(self): 766 self._langfuse = LangfuseSingleton().get( 767 public_key=openai.langfuse_public_key, 768 secret_key=openai.langfuse_secret_key, 769 host=openai.langfuse_host, 770 debug=openai.langfuse_debug, 771 enabled=openai.langfuse_enabled, 772 sdk_integration="openai", 773 sample_rate=openai.langfuse_sample_rate, 774 ) 775 776 return self._langfuse 777 778 def flush(cls): 779 cls._langfuse.flush() 780 781 def langfuse_auth_check(self): 782 """Check if the provided Langfuse credentials (public and secret key) are valid. 783 784 Raises: 785 Exception: If no projects were found for the provided credentials. 786 787 Note: 788 This method is blocking. It is discouraged to use it in production code. 789 """ 790 if self._langfuse is None: 791 self.initialize() 792 793 return self._langfuse.auth_check() 794 795 def register_tracing(self): 796 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 797 798 for resource in resources: 799 if resource.min_version is not None and Version( 800 openai.__version__ 801 ) < Version(resource.min_version): 802 continue 803 804 wrap_function_wrapper( 805 resource.module, 806 f"{resource.object}.{resource.method}", 807 _wrap(resource, self.initialize) 808 if resource.sync 809 else _wrap_async(resource, self.initialize), 810 ) 811 812 setattr(openai, "langfuse_public_key", None) 813 setattr(openai, "langfuse_secret_key", None) 814 setattr(openai, "langfuse_host", None) 815 setattr(openai, "langfuse_debug", None) 816 setattr(openai, "langfuse_enabled", True) 817 setattr(openai, "langfuse_sample_rate", None) 818 setattr(openai, "langfuse_mask", None) 819 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 820 setattr(openai, "flush_langfuse", self.flush) 821 822 823modifier = OpenAILangfuse() 824modifier.register_tracing() 825 826 827# DEPRECATED: Use `openai.langfuse_auth_check()` instead 828def auth_check(): 829 if modifier._langfuse is None: 830 modifier.initialize() 831 832 return modifier._langfuse.auth_check() 833 834 835class LangfuseResponseGeneratorSync: 836 def __init__( 837 self, 838 *, 839 resource, 840 response, 841 generation, 842 langfuse, 843 is_nested_trace, 844 ): 845 self.items = [] 846 847 self.resource = resource 848 self.response = response 849 self.generation = generation 850 self.langfuse = langfuse 851 self.is_nested_trace = is_nested_trace 852 self.completion_start_time = None 853 854 def __iter__(self): 855 try: 856 for i in self.response: 857 self.items.append(i) 858 859 if self.completion_start_time is None: 860 self.completion_start_time = _get_timestamp() 861 862 yield i 863 finally: 864 self._finalize() 865 866 def __next__(self): 867 try: 868 item = self.response.__next__() 869 self.items.append(item) 870 871 if self.completion_start_time is None: 872 self.completion_start_time = _get_timestamp() 873 874 return item 875 876 except StopIteration: 877 self._finalize() 878 879 raise 880 881 def __enter__(self): 882 return self.__iter__() 883 884 def __exit__(self, exc_type, exc_value, traceback): 885 pass 886 887 def _finalize(self): 888 model, completion, usage = _extract_streamed_openai_response( 889 self.resource, self.items 890 ) 891 892 # Avoiding the trace-update if trace-id is provided by user. 893 if not self.is_nested_trace: 894 self.langfuse.trace(id=self.generation.trace_id, output=completion) 895 896 _create_langfuse_update( 897 completion, 898 self.generation, 899 self.completion_start_time, 900 model=model, 901 usage=usage, 902 ) 903 904 905class LangfuseResponseGeneratorAsync: 906 def __init__( 907 self, 908 *, 909 resource, 910 response, 911 generation, 912 langfuse, 913 is_nested_trace, 914 ): 915 self.items = [] 916 917 self.resource = resource 918 self.response = response 919 self.generation = generation 920 self.langfuse = langfuse 921 self.is_nested_trace = is_nested_trace 922 self.completion_start_time = None 923 924 async def __aiter__(self): 925 try: 926 async for i in self.response: 927 self.items.append(i) 928 929 if self.completion_start_time is None: 930 self.completion_start_time = _get_timestamp() 931 932 yield i 933 finally: 934 await self._finalize() 935 936 async def __anext__(self): 937 try: 938 item = await self.response.__anext__() 939 self.items.append(item) 940 941 if self.completion_start_time is None: 942 self.completion_start_time = _get_timestamp() 943 944 return item 945 946 except StopAsyncIteration: 947 await self._finalize() 948 949 raise 950 951 async def __aenter__(self): 952 return self.__aiter__() 953 954 async def __aexit__(self, exc_type, exc_value, traceback): 955 pass 956 957 async def _finalize(self): 958 model, completion, usage = _extract_streamed_openai_response( 959 self.resource, self.items 960 ) 961 962 # Avoiding the trace-update if trace-id is provided by user. 963 if not self.is_nested_trace: 964 self.langfuse.trace(id=self.generation.trace_id, output=completion) 965 966 _create_langfuse_update( 967 completion, 968 self.generation, 969 self.completion_start_time, 970 model=model, 971 usage=usage, 972 ) 973 974 async def close(self) -> None: 975 """Close the response and release the connection. 976 977 Automatically called if the response body is read to completion. 978 """ 979 await self.response.close() 980 981 async def aclose(self) -> None: 982 """Close the response and release the connection. 983 984 Automatically called if the response body is read to completion. 985 """ 986 await self.response.aclose()
log =
<Logger langfuse (WARNING)>
@dataclass
class
OpenAiDefinition:
59@dataclass 60class OpenAiDefinition: 61 module: str 62 object: str 63 method: str 64 type: str 65 sync: bool 66 min_version: Optional[str] = None
OPENAI_METHODS_V0 =
[OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 =
[OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0')]
class
OpenAiArgsExtractor:
135class OpenAiArgsExtractor: 136 def __init__( 137 self, 138 name=None, 139 metadata=None, 140 trace_id=None, 141 session_id=None, 142 user_id=None, 143 tags=None, 144 parent_observation_id=None, 145 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 146 **kwargs, 147 ): 148 self.args = {} 149 self.args["name"] = name 150 self.args["metadata"] = ( 151 metadata 152 if "response_format" not in kwargs 153 else { 154 **(metadata or {}), 155 "response_format": kwargs["response_format"].model_json_schema() 156 if isclass(kwargs["response_format"]) 157 and issubclass(kwargs["response_format"], BaseModel) 158 else kwargs["response_format"], 159 } 160 ) 161 self.args["trace_id"] = trace_id 162 self.args["session_id"] = session_id 163 self.args["user_id"] = user_id 164 self.args["tags"] = tags 165 self.args["parent_observation_id"] = parent_observation_id 166 self.args["langfuse_prompt"] = langfuse_prompt 167 self.kwargs = kwargs 168 169 def get_langfuse_args(self): 170 return {**self.args, **self.kwargs} 171 172 def get_openai_args(self): 173 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 174 # https://platform.openai.com/docs/guides/distillation 175 if self.kwargs.get("store", False): 176 self.kwargs["metadata"] = self.args.get("metadata", {}) 177 178 # OpenAI does not support non-string type values in metadata when using 179 # model distillation feature 180 self.kwargs["metadata"].pop("response_format", None) 181 182 return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
136 def __init__( 137 self, 138 name=None, 139 metadata=None, 140 trace_id=None, 141 session_id=None, 142 user_id=None, 143 tags=None, 144 parent_observation_id=None, 145 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 146 **kwargs, 147 ): 148 self.args = {} 149 self.args["name"] = name 150 self.args["metadata"] = ( 151 metadata 152 if "response_format" not in kwargs 153 else { 154 **(metadata or {}), 155 "response_format": kwargs["response_format"].model_json_schema() 156 if isclass(kwargs["response_format"]) 157 and issubclass(kwargs["response_format"], BaseModel) 158 else kwargs["response_format"], 159 } 160 ) 161 self.args["trace_id"] = trace_id 162 self.args["session_id"] = session_id 163 self.args["user_id"] = user_id 164 self.args["tags"] = tags 165 self.args["parent_observation_id"] = parent_observation_id 166 self.args["langfuse_prompt"] = langfuse_prompt 167 self.kwargs = kwargs
def
get_openai_args(self):
172 def get_openai_args(self): 173 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 174 # https://platform.openai.com/docs/guides/distillation 175 if self.kwargs.get("store", False): 176 self.kwargs["metadata"] = self.args.get("metadata", {}) 177 178 # OpenAI does not support non-string type values in metadata when using 179 # model distillation feature 180 self.kwargs["metadata"].pop("response_format", None) 181 182 return self.kwargs
class
OpenAILangfuse:
763class OpenAILangfuse: 764 _langfuse: Optional[Langfuse] = None 765 766 def initialize(self): 767 self._langfuse = LangfuseSingleton().get( 768 public_key=openai.langfuse_public_key, 769 secret_key=openai.langfuse_secret_key, 770 host=openai.langfuse_host, 771 debug=openai.langfuse_debug, 772 enabled=openai.langfuse_enabled, 773 sdk_integration="openai", 774 sample_rate=openai.langfuse_sample_rate, 775 ) 776 777 return self._langfuse 778 779 def flush(cls): 780 cls._langfuse.flush() 781 782 def langfuse_auth_check(self): 783 """Check if the provided Langfuse credentials (public and secret key) are valid. 784 785 Raises: 786 Exception: If no projects were found for the provided credentials. 787 788 Note: 789 This method is blocking. It is discouraged to use it in production code. 790 """ 791 if self._langfuse is None: 792 self.initialize() 793 794 return self._langfuse.auth_check() 795 796 def register_tracing(self): 797 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 798 799 for resource in resources: 800 if resource.min_version is not None and Version( 801 openai.__version__ 802 ) < Version(resource.min_version): 803 continue 804 805 wrap_function_wrapper( 806 resource.module, 807 f"{resource.object}.{resource.method}", 808 _wrap(resource, self.initialize) 809 if resource.sync 810 else _wrap_async(resource, self.initialize), 811 ) 812 813 setattr(openai, "langfuse_public_key", None) 814 setattr(openai, "langfuse_secret_key", None) 815 setattr(openai, "langfuse_host", None) 816 setattr(openai, "langfuse_debug", None) 817 setattr(openai, "langfuse_enabled", True) 818 setattr(openai, "langfuse_sample_rate", None) 819 setattr(openai, "langfuse_mask", None) 820 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 821 setattr(openai, "flush_langfuse", self.flush)
def
initialize(self):
766 def initialize(self): 767 self._langfuse = LangfuseSingleton().get( 768 public_key=openai.langfuse_public_key, 769 secret_key=openai.langfuse_secret_key, 770 host=openai.langfuse_host, 771 debug=openai.langfuse_debug, 772 enabled=openai.langfuse_enabled, 773 sdk_integration="openai", 774 sample_rate=openai.langfuse_sample_rate, 775 ) 776 777 return self._langfuse
def
langfuse_auth_check(self):
782 def langfuse_auth_check(self): 783 """Check if the provided Langfuse credentials (public and secret key) are valid. 784 785 Raises: 786 Exception: If no projects were found for the provided credentials. 787 788 Note: 789 This method is blocking. It is discouraged to use it in production code. 790 """ 791 if self._langfuse is None: 792 self.initialize() 793 794 return self._langfuse.auth_check()
Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
def
register_tracing(self):
796 def register_tracing(self): 797 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 798 799 for resource in resources: 800 if resource.min_version is not None and Version( 801 openai.__version__ 802 ) < Version(resource.min_version): 803 continue 804 805 wrap_function_wrapper( 806 resource.module, 807 f"{resource.object}.{resource.method}", 808 _wrap(resource, self.initialize) 809 if resource.sync 810 else _wrap_async(resource, self.initialize), 811 ) 812 813 setattr(openai, "langfuse_public_key", None) 814 setattr(openai, "langfuse_secret_key", None) 815 setattr(openai, "langfuse_host", None) 816 setattr(openai, "langfuse_debug", None) 817 setattr(openai, "langfuse_enabled", True) 818 setattr(openai, "langfuse_sample_rate", None) 819 setattr(openai, "langfuse_mask", None) 820 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 821 setattr(openai, "flush_langfuse", self.flush)
modifier =
<OpenAILangfuse object>
def
auth_check():
class
LangfuseResponseGeneratorSync:
836class LangfuseResponseGeneratorSync: 837 def __init__( 838 self, 839 *, 840 resource, 841 response, 842 generation, 843 langfuse, 844 is_nested_trace, 845 ): 846 self.items = [] 847 848 self.resource = resource 849 self.response = response 850 self.generation = generation 851 self.langfuse = langfuse 852 self.is_nested_trace = is_nested_trace 853 self.completion_start_time = None 854 855 def __iter__(self): 856 try: 857 for i in self.response: 858 self.items.append(i) 859 860 if self.completion_start_time is None: 861 self.completion_start_time = _get_timestamp() 862 863 yield i 864 finally: 865 self._finalize() 866 867 def __next__(self): 868 try: 869 item = self.response.__next__() 870 self.items.append(item) 871 872 if self.completion_start_time is None: 873 self.completion_start_time = _get_timestamp() 874 875 return item 876 877 except StopIteration: 878 self._finalize() 879 880 raise 881 882 def __enter__(self): 883 return self.__iter__() 884 885 def __exit__(self, exc_type, exc_value, traceback): 886 pass 887 888 def _finalize(self): 889 model, completion, usage = _extract_streamed_openai_response( 890 self.resource, self.items 891 ) 892 893 # Avoiding the trace-update if trace-id is provided by user. 894 if not self.is_nested_trace: 895 self.langfuse.trace(id=self.generation.trace_id, output=completion) 896 897 _create_langfuse_update( 898 completion, 899 self.generation, 900 self.completion_start_time, 901 model=model, 902 usage=usage, 903 )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
837 def __init__( 838 self, 839 *, 840 resource, 841 response, 842 generation, 843 langfuse, 844 is_nested_trace, 845 ): 846 self.items = [] 847 848 self.resource = resource 849 self.response = response 850 self.generation = generation 851 self.langfuse = langfuse 852 self.is_nested_trace = is_nested_trace 853 self.completion_start_time = None
class
LangfuseResponseGeneratorAsync:
906class LangfuseResponseGeneratorAsync: 907 def __init__( 908 self, 909 *, 910 resource, 911 response, 912 generation, 913 langfuse, 914 is_nested_trace, 915 ): 916 self.items = [] 917 918 self.resource = resource 919 self.response = response 920 self.generation = generation 921 self.langfuse = langfuse 922 self.is_nested_trace = is_nested_trace 923 self.completion_start_time = None 924 925 async def __aiter__(self): 926 try: 927 async for i in self.response: 928 self.items.append(i) 929 930 if self.completion_start_time is None: 931 self.completion_start_time = _get_timestamp() 932 933 yield i 934 finally: 935 await self._finalize() 936 937 async def __anext__(self): 938 try: 939 item = await self.response.__anext__() 940 self.items.append(item) 941 942 if self.completion_start_time is None: 943 self.completion_start_time = _get_timestamp() 944 945 return item 946 947 except StopAsyncIteration: 948 await self._finalize() 949 950 raise 951 952 async def __aenter__(self): 953 return self.__aiter__() 954 955 async def __aexit__(self, exc_type, exc_value, traceback): 956 pass 957 958 async def _finalize(self): 959 model, completion, usage = _extract_streamed_openai_response( 960 self.resource, self.items 961 ) 962 963 # Avoiding the trace-update if trace-id is provided by user. 964 if not self.is_nested_trace: 965 self.langfuse.trace(id=self.generation.trace_id, output=completion) 966 967 _create_langfuse_update( 968 completion, 969 self.generation, 970 self.completion_start_time, 971 model=model, 972 usage=usage, 973 ) 974 975 async def close(self) -> None: 976 """Close the response and release the connection. 977 978 Automatically called if the response body is read to completion. 979 """ 980 await self.response.close() 981 982 async def aclose(self) -> None: 983 """Close the response and release the connection. 984 985 Automatically called if the response body is read to completion. 986 """ 987 await self.response.aclose()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
907 def __init__( 908 self, 909 *, 910 resource, 911 response, 912 generation, 913 langfuse, 914 is_nested_trace, 915 ): 916 self.items = [] 917 918 self.resource = resource 919 self.response = response 920 self.generation = generation 921 self.langfuse = langfuse 922 self.is_nested_trace = is_nested_trace 923 self.completion_start_time = None
async def
close(self) -> None:
975 async def close(self) -> None: 976 """Close the response and release the connection. 977 978 Automatically called if the response body is read to completion. 979 """ 980 await self.response.close()
Close the response and release the connection.
Automatically called if the response body is read to completion.
async def
aclose(self) -> None:
982 async def aclose(self) -> None: 983 """Close the response and release the connection. 984 985 Automatically called if the response body is read to completion. 986 """ 987 await self.response.aclose()
Close the response and release the connection.
Automatically called if the response body is read to completion.