langfuse.openai
If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
- import openai
+ from langfuse.openai import openai
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the observe()
decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import. 2 3```diff 4- import openai 5+ from langfuse.openai import openai 6``` 7 8Langfuse automatically tracks: 9 10- All prompts/completions with support for streaming, async and functions 11- Latencies 12- API Errors 13- Model usage (tokens) and cost (USD) 14 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK. 16 17See docs for more details: https://langfuse.com/docs/integrations/openai 18""" 19 20import logging 21import types 22from collections import defaultdict 23from dataclasses import dataclass 24from inspect import isclass 25from typing import Optional 26 27 28import openai.resources 29from openai._types import NotGiven 30from packaging.version import Version 31from pydantic import BaseModel 32from wrapt import wrap_function_wrapper 33 34from langfuse import Langfuse 35from langfuse.client import StatefulGenerationClient 36from langfuse.decorators import langfuse_context 37from langfuse.media import LangfuseMedia 38from langfuse.utils import _get_timestamp 39from langfuse.utils.langfuse_singleton import LangfuseSingleton 40 41try: 42 import openai 43except ImportError: 44 raise ModuleNotFoundError( 45 "Please install OpenAI to use this feature: 'pip install openai'" 46 ) 47 48try: 49 from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401 50except ImportError: 51 AsyncAzureOpenAI = None 52 AsyncOpenAI = None 53 AzureOpenAI = None 54 OpenAI = None 55 56log = logging.getLogger("langfuse") 57 58 59@dataclass 60class OpenAiDefinition: 61 module: str 62 object: str 63 method: str 64 type: str 65 sync: bool 66 min_version: Optional[str] = None 67 68 69OPENAI_METHODS_V0 = [ 70 OpenAiDefinition( 71 module="openai", 72 object="ChatCompletion", 73 method="create", 74 type="chat", 75 sync=True, 76 ), 77 OpenAiDefinition( 78 module="openai", 79 object="Completion", 80 method="create", 81 type="completion", 82 sync=True, 83 ), 84] 85 86 87OPENAI_METHODS_V1 = [ 88 OpenAiDefinition( 89 module="openai.resources.chat.completions", 90 object="Completions", 91 method="create", 92 type="chat", 93 sync=True, 94 ), 95 OpenAiDefinition( 96 module="openai.resources.completions", 97 object="Completions", 98 method="create", 99 type="completion", 100 sync=True, 101 ), 102 OpenAiDefinition( 103 module="openai.resources.chat.completions", 104 object="AsyncCompletions", 105 method="create", 106 type="chat", 107 sync=False, 108 ), 109 OpenAiDefinition( 110 module="openai.resources.completions", 111 object="AsyncCompletions", 112 method="create", 113 type="completion", 114 sync=False, 115 ), 116 OpenAiDefinition( 117 module="openai.resources.beta.chat.completions", 118 object="Completions", 119 method="parse", 120 type="chat", 121 sync=True, 122 min_version="1.50.0", 123 ), 124 OpenAiDefinition( 125 module="openai.resources.beta.chat.completions", 126 object="AsyncCompletions", 127 method="parse", 128 type="chat", 129 sync=False, 130 min_version="1.50.0", 131 ), 132] 133 134 135class OpenAiArgsExtractor: 136 def __init__( 137 self, 138 name=None, 139 metadata=None, 140 trace_id=None, 141 session_id=None, 142 user_id=None, 143 tags=None, 144 parent_observation_id=None, 145 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 146 **kwargs, 147 ): 148 self.args = {} 149 self.args["name"] = name 150 self.args["metadata"] = ( 151 metadata 152 if "response_format" not in kwargs 153 else { 154 **(metadata or {}), 155 "response_format": kwargs["response_format"].model_json_schema() 156 if isclass(kwargs["response_format"]) 157 and issubclass(kwargs["response_format"], BaseModel) 158 else kwargs["response_format"], 159 } 160 ) 161 self.args["trace_id"] = trace_id 162 self.args["session_id"] = session_id 163 self.args["user_id"] = user_id 164 self.args["tags"] = tags 165 self.args["parent_observation_id"] = parent_observation_id 166 self.args["langfuse_prompt"] = langfuse_prompt 167 self.kwargs = kwargs 168 169 def get_langfuse_args(self): 170 return {**self.args, **self.kwargs} 171 172 def get_openai_args(self): 173 return self.kwargs 174 175 176def _langfuse_wrapper(func): 177 def _with_langfuse(open_ai_definitions, initialize): 178 def wrapper(wrapped, instance, args, kwargs): 179 return func(open_ai_definitions, initialize, wrapped, args, kwargs) 180 181 return wrapper 182 183 return _with_langfuse 184 185 186def _extract_chat_prompt(kwargs: any): 187 """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions""" 188 prompt = {} 189 190 if kwargs.get("functions") is not None: 191 prompt.update({"functions": kwargs["functions"]}) 192 193 if kwargs.get("function_call") is not None: 194 prompt.update({"function_call": kwargs["function_call"]}) 195 196 if kwargs.get("tools") is not None: 197 prompt.update({"tools": kwargs["tools"]}) 198 199 if prompt: 200 # uf user provided functions, we need to send these together with messages to langfuse 201 prompt.update( 202 { 203 "messages": [ 204 _process_message(message) for message in kwargs.get("messages", []) 205 ], 206 } 207 ) 208 return prompt 209 else: 210 # vanilla case, only send messages in openai format to langfuse 211 return [_process_message(message) for message in kwargs.get("messages", [])] 212 213 214def _process_message(message): 215 if not isinstance(message, dict): 216 return message 217 218 processed_message = {**message} 219 220 content = processed_message.get("content", None) 221 if not isinstance(content, list): 222 return processed_message 223 224 processed_content = [] 225 226 for content_part in content: 227 if content_part.get("type") == "input_audio": 228 audio_base64 = content_part.get("input_audio", {}).get("data", None) 229 format = content_part.get("input_audio", {}).get("format", "wav") 230 231 if audio_base64 is not None: 232 base64_data_uri = f"data:audio/{format};base64,{audio_base64}" 233 234 processed_content.append( 235 { 236 "type": "input_audio", 237 "input_audio": { 238 "data": LangfuseMedia(base64_data_uri=base64_data_uri), 239 "format": format, 240 }, 241 } 242 ) 243 else: 244 processed_content.append(content_part) 245 246 processed_message["content"] = processed_content 247 248 return processed_message 249 250 251def _extract_chat_response(kwargs: any): 252 """Extracts the llm output from the response.""" 253 response = { 254 "role": kwargs.get("role", None), 255 } 256 257 audio = None 258 259 if kwargs.get("function_call") is not None: 260 response.update({"function_call": kwargs["function_call"]}) 261 262 if kwargs.get("tool_calls") is not None: 263 response.update({"tool_calls": kwargs["tool_calls"]}) 264 265 if kwargs.get("audio") is not None: 266 audio = kwargs["audio"].__dict__ 267 268 if "data" in audio and audio["data"] is not None: 269 base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}" 270 audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri) 271 272 response.update( 273 { 274 "content": kwargs.get("content", None), 275 } 276 ) 277 278 if audio is not None: 279 response.update({"audio": audio}) 280 281 return response 282 283 284def _get_langfuse_data_from_kwargs( 285 resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs 286): 287 name = kwargs.get("name", "OpenAI-generation") 288 289 if name is None: 290 name = "OpenAI-generation" 291 292 if name is not None and not isinstance(name, str): 293 raise TypeError("name must be a string") 294 295 decorator_context_observation_id = langfuse_context.get_current_observation_id() 296 decorator_context_trace_id = langfuse_context.get_current_trace_id() 297 298 trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id 299 if trace_id is not None and not isinstance(trace_id, str): 300 raise TypeError("trace_id must be a string") 301 302 session_id = kwargs.get("session_id", None) 303 if session_id is not None and not isinstance(session_id, str): 304 raise TypeError("session_id must be a string") 305 306 user_id = kwargs.get("user_id", None) 307 if user_id is not None and not isinstance(user_id, str): 308 raise TypeError("user_id must be a string") 309 310 tags = kwargs.get("tags", None) 311 if tags is not None and ( 312 not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags) 313 ): 314 raise TypeError("tags must be a list of strings") 315 316 # Update trace params in decorator context if specified in openai call 317 if decorator_context_trace_id: 318 langfuse_context.update_current_trace( 319 session_id=session_id, user_id=user_id, tags=tags 320 ) 321 322 parent_observation_id = kwargs.get("parent_observation_id", None) or ( 323 decorator_context_observation_id 324 if decorator_context_observation_id != decorator_context_trace_id 325 else None 326 ) 327 if parent_observation_id is not None and not isinstance(parent_observation_id, str): 328 raise TypeError("parent_observation_id must be a string") 329 if parent_observation_id is not None and trace_id is None: 330 raise ValueError("parent_observation_id requires trace_id to be set") 331 332 metadata = kwargs.get("metadata", {}) 333 334 if metadata is not None and not isinstance(metadata, dict): 335 raise TypeError("metadata must be a dictionary") 336 337 model = kwargs.get("model", None) or None 338 339 prompt = None 340 341 if resource.type == "completion": 342 prompt = kwargs.get("prompt", None) 343 elif resource.type == "chat": 344 prompt = _extract_chat_prompt(kwargs) 345 346 is_nested_trace = False 347 if trace_id: 348 is_nested_trace = True 349 langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags) 350 else: 351 trace_id = ( 352 decorator_context_trace_id 353 or langfuse.trace( 354 session_id=session_id, 355 user_id=user_id, 356 tags=tags, 357 name=name, 358 input=prompt, 359 metadata=metadata, 360 ).id 361 ) 362 363 parsed_temperature = ( 364 kwargs.get("temperature", 1) 365 if not isinstance(kwargs.get("temperature", 1), NotGiven) 366 else 1 367 ) 368 369 parsed_max_tokens = ( 370 kwargs.get("max_tokens", float("inf")) 371 if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven) 372 else float("inf") 373 ) 374 375 parsed_top_p = ( 376 kwargs.get("top_p", 1) 377 if not isinstance(kwargs.get("top_p", 1), NotGiven) 378 else 1 379 ) 380 381 parsed_frequency_penalty = ( 382 kwargs.get("frequency_penalty", 0) 383 if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven) 384 else 0 385 ) 386 387 parsed_presence_penalty = ( 388 kwargs.get("presence_penalty", 0) 389 if not isinstance(kwargs.get("presence_penalty", 0), NotGiven) 390 else 0 391 ) 392 393 parsed_seed = ( 394 kwargs.get("seed", None) 395 if not isinstance(kwargs.get("seed", None), NotGiven) 396 else None 397 ) 398 399 parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1 400 401 modelParameters = { 402 "temperature": parsed_temperature, 403 "max_tokens": parsed_max_tokens, # casing? 404 "top_p": parsed_top_p, 405 "frequency_penalty": parsed_frequency_penalty, 406 "presence_penalty": parsed_presence_penalty, 407 } 408 if parsed_n is not None and parsed_n > 1: 409 modelParameters["n"] = parsed_n 410 411 if parsed_seed is not None: 412 modelParameters["seed"] = parsed_seed 413 414 langfuse_prompt = kwargs.get("langfuse_prompt", None) 415 416 return { 417 "name": name, 418 "metadata": metadata, 419 "trace_id": trace_id, 420 "parent_observation_id": parent_observation_id, 421 "user_id": user_id, 422 "start_time": start_time, 423 "input": prompt, 424 "model_parameters": modelParameters, 425 "model": model or None, 426 "prompt": langfuse_prompt, 427 }, is_nested_trace 428 429 430def _create_langfuse_update( 431 completion, 432 generation: StatefulGenerationClient, 433 completion_start_time, 434 model=None, 435 usage=None, 436): 437 update = { 438 "end_time": _get_timestamp(), 439 "output": completion, 440 "completion_start_time": completion_start_time, 441 } 442 if model is not None: 443 update["model"] = model 444 445 if usage is not None: 446 update["usage"] = usage 447 448 generation.update(**update) 449 450 451def _extract_streamed_openai_response(resource, chunks): 452 completion = defaultdict(str) if resource.type == "chat" else "" 453 model = None 454 455 for chunk in chunks: 456 if _is_openai_v1(): 457 chunk = chunk.__dict__ 458 459 model = model or chunk.get("model", None) or None 460 usage = chunk.get("usage", None) 461 462 choices = chunk.get("choices", []) 463 464 for choice in choices: 465 if _is_openai_v1(): 466 choice = choice.__dict__ 467 if resource.type == "chat": 468 delta = choice.get("delta", None) 469 470 if _is_openai_v1(): 471 delta = delta.__dict__ 472 473 if delta.get("role", None) is not None: 474 completion["role"] = delta["role"] 475 476 if delta.get("content", None) is not None: 477 completion["content"] = ( 478 delta.get("content", None) 479 if completion["content"] is None 480 else completion["content"] + delta.get("content", None) 481 ) 482 elif delta.get("function_call", None) is not None: 483 curr = completion["function_call"] 484 tool_call_chunk = delta.get("function_call", None) 485 486 if not curr: 487 completion["function_call"] = { 488 "name": getattr(tool_call_chunk, "name", ""), 489 "arguments": getattr(tool_call_chunk, "arguments", ""), 490 } 491 492 else: 493 curr["name"] = curr["name"] or getattr( 494 tool_call_chunk, "name", None 495 ) 496 curr["arguments"] += getattr(tool_call_chunk, "arguments", "") 497 498 elif delta.get("tool_calls", None) is not None: 499 curr = completion["tool_calls"] 500 tool_call_chunk = getattr( 501 delta.get("tool_calls", None)[0], "function", None 502 ) 503 504 if not curr: 505 completion["tool_calls"] = [ 506 { 507 "name": getattr(tool_call_chunk, "name", ""), 508 "arguments": getattr(tool_call_chunk, "arguments", ""), 509 } 510 ] 511 512 elif getattr(tool_call_chunk, "name", None) is not None: 513 curr.append( 514 { 515 "name": getattr(tool_call_chunk, "name", None), 516 "arguments": getattr( 517 tool_call_chunk, "arguments", None 518 ), 519 } 520 ) 521 522 else: 523 curr[-1]["name"] = curr[-1]["name"] or getattr( 524 tool_call_chunk, "name", None 525 ) 526 curr[-1]["arguments"] += getattr( 527 tool_call_chunk, "arguments", None 528 ) 529 530 if resource.type == "completion": 531 completion += choice.get("text", None) 532 533 def get_response_for_chat(): 534 return ( 535 completion["content"] 536 or ( 537 completion["function_call"] 538 and { 539 "role": "assistant", 540 "function_call": completion["function_call"], 541 } 542 ) 543 or ( 544 completion["tool_calls"] 545 and { 546 "role": "assistant", 547 # "tool_calls": [{"function": completion["tool_calls"]}], 548 "tool_calls": [ 549 {"function": data} for data in completion["tool_calls"] 550 ], 551 } 552 ) 553 or None 554 ) 555 556 return ( 557 model, 558 get_response_for_chat() if resource.type == "chat" else completion, 559 usage.__dict__ if _is_openai_v1() and usage is not None else usage, 560 ) 561 562 563def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): 564 if response is None: 565 return None, "<NoneType response returned from OpenAI>", None 566 567 model = response.get("model", None) or None 568 569 completion = None 570 if resource.type == "completion": 571 choices = response.get("choices", []) 572 if len(choices) > 0: 573 choice = choices[-1] 574 575 completion = choice.text if _is_openai_v1() else choice.get("text", None) 576 elif resource.type == "chat": 577 choices = response.get("choices", []) 578 if len(choices) > 0: 579 # If multiple choices were generated, we'll show all of them in the UI as a list. 580 if len(choices) > 1: 581 completion = [ 582 _extract_chat_response(choice.message.__dict__) 583 if _is_openai_v1() 584 else choice.get("message", None) 585 for choice in choices 586 ] 587 else: 588 choice = choices[0] 589 completion = ( 590 _extract_chat_response(choice.message.__dict__) 591 if _is_openai_v1() 592 else choice.get("message", None) 593 ) 594 595 usage = response.get("usage", None) 596 597 return ( 598 model, 599 completion, 600 usage.__dict__ if _is_openai_v1() and usage is not None else usage, 601 ) 602 603 604def _is_openai_v1(): 605 return Version(openai.__version__) >= Version("1.0.0") 606 607 608def _is_streaming_response(response): 609 return ( 610 isinstance(response, types.GeneratorType) 611 or isinstance(response, types.AsyncGeneratorType) 612 or (_is_openai_v1() and isinstance(response, openai.Stream)) 613 or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) 614 ) 615 616 617@_langfuse_wrapper 618def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs): 619 new_langfuse: Langfuse = initialize() 620 621 start_time = _get_timestamp() 622 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 623 624 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 625 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 626 ) 627 generation = new_langfuse.generation(**generation) 628 try: 629 openai_response = wrapped(**arg_extractor.get_openai_args()) 630 631 if _is_streaming_response(openai_response): 632 return LangfuseResponseGeneratorSync( 633 resource=open_ai_resource, 634 response=openai_response, 635 generation=generation, 636 langfuse=new_langfuse, 637 is_nested_trace=is_nested_trace, 638 ) 639 640 else: 641 model, completion, usage = _get_langfuse_data_from_default_response( 642 open_ai_resource, 643 (openai_response and openai_response.__dict__) 644 if _is_openai_v1() 645 else openai_response, 646 ) 647 generation.update( 648 model=model, output=completion, end_time=_get_timestamp(), usage=usage 649 ) 650 651 # Avoiding the trace-update if trace-id is provided by user. 652 if not is_nested_trace: 653 new_langfuse.trace(id=generation.trace_id, output=completion) 654 655 return openai_response 656 except Exception as ex: 657 log.warning(ex) 658 model = kwargs.get("model", None) or None 659 generation.update( 660 end_time=_get_timestamp(), 661 status_message=str(ex), 662 level="ERROR", 663 model=model, 664 usage={"input_cost": 0, "output_cost": 0, "total_cost": 0}, 665 ) 666 raise ex 667 668 669@_langfuse_wrapper 670async def _wrap_async( 671 open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs 672): 673 new_langfuse = initialize() 674 start_time = _get_timestamp() 675 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 676 677 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 678 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 679 ) 680 generation = new_langfuse.generation(**generation) 681 try: 682 openai_response = await wrapped(**arg_extractor.get_openai_args()) 683 684 if _is_streaming_response(openai_response): 685 return LangfuseResponseGeneratorAsync( 686 resource=open_ai_resource, 687 response=openai_response, 688 generation=generation, 689 langfuse=new_langfuse, 690 is_nested_trace=is_nested_trace, 691 ) 692 693 else: 694 model, completion, usage = _get_langfuse_data_from_default_response( 695 open_ai_resource, 696 (openai_response and openai_response.__dict__) 697 if _is_openai_v1() 698 else openai_response, 699 ) 700 generation.update( 701 model=model, 702 output=completion, 703 end_time=_get_timestamp(), 704 usage=usage, 705 ) 706 # Avoiding the trace-update if trace-id is provided by user. 707 if not is_nested_trace: 708 new_langfuse.trace(id=generation.trace_id, output=completion) 709 710 return openai_response 711 except Exception as ex: 712 model = kwargs.get("model", None) or None 713 generation.update( 714 end_time=_get_timestamp(), 715 status_message=str(ex), 716 level="ERROR", 717 model=model, 718 usage={"input_cost": 0, "output_cost": 0, "total_cost": 0}, 719 ) 720 raise ex 721 722 723class OpenAILangfuse: 724 _langfuse: Optional[Langfuse] = None 725 726 def initialize(self): 727 self._langfuse = LangfuseSingleton().get( 728 public_key=openai.langfuse_public_key, 729 secret_key=openai.langfuse_secret_key, 730 host=openai.langfuse_host, 731 debug=openai.langfuse_debug, 732 enabled=openai.langfuse_enabled, 733 sdk_integration="openai", 734 sample_rate=openai.langfuse_sample_rate, 735 ) 736 737 return self._langfuse 738 739 def flush(cls): 740 cls._langfuse.flush() 741 742 def langfuse_auth_check(self): 743 """Check if the provided Langfuse credentials (public and secret key) are valid. 744 745 Raises: 746 Exception: If no projects were found for the provided credentials. 747 748 Note: 749 This method is blocking. It is discouraged to use it in production code. 750 """ 751 if self._langfuse is None: 752 self.initialize() 753 754 return self._langfuse.auth_check() 755 756 def register_tracing(self): 757 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 758 759 for resource in resources: 760 if resource.min_version is not None and Version( 761 openai.__version__ 762 ) < Version(resource.min_version): 763 continue 764 765 wrap_function_wrapper( 766 resource.module, 767 f"{resource.object}.{resource.method}", 768 _wrap(resource, self.initialize) 769 if resource.sync 770 else _wrap_async(resource, self.initialize), 771 ) 772 773 setattr(openai, "langfuse_public_key", None) 774 setattr(openai, "langfuse_secret_key", None) 775 setattr(openai, "langfuse_host", None) 776 setattr(openai, "langfuse_debug", None) 777 setattr(openai, "langfuse_enabled", True) 778 setattr(openai, "langfuse_sample_rate", None) 779 setattr(openai, "langfuse_mask", None) 780 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 781 setattr(openai, "flush_langfuse", self.flush) 782 783 784modifier = OpenAILangfuse() 785modifier.register_tracing() 786 787 788# DEPRECATED: Use `openai.langfuse_auth_check()` instead 789def auth_check(): 790 if modifier._langfuse is None: 791 modifier.initialize() 792 793 return modifier._langfuse.auth_check() 794 795 796class LangfuseResponseGeneratorSync: 797 def __init__( 798 self, 799 *, 800 resource, 801 response, 802 generation, 803 langfuse, 804 is_nested_trace, 805 ): 806 self.items = [] 807 808 self.resource = resource 809 self.response = response 810 self.generation = generation 811 self.langfuse = langfuse 812 self.is_nested_trace = is_nested_trace 813 self.completion_start_time = None 814 815 def __iter__(self): 816 try: 817 for i in self.response: 818 self.items.append(i) 819 820 if self.completion_start_time is None: 821 self.completion_start_time = _get_timestamp() 822 823 yield i 824 finally: 825 self._finalize() 826 827 def __next__(self): 828 try: 829 item = self.response.__next__() 830 self.items.append(item) 831 832 if self.completion_start_time is None: 833 self.completion_start_time = _get_timestamp() 834 835 return item 836 837 except StopIteration: 838 self._finalize() 839 840 raise 841 842 def __enter__(self): 843 return self.__iter__() 844 845 def __exit__(self, exc_type, exc_value, traceback): 846 pass 847 848 def _finalize(self): 849 model, completion, usage = _extract_streamed_openai_response( 850 self.resource, self.items 851 ) 852 853 # Avoiding the trace-update if trace-id is provided by user. 854 if not self.is_nested_trace: 855 self.langfuse.trace(id=self.generation.trace_id, output=completion) 856 857 _create_langfuse_update( 858 completion, 859 self.generation, 860 self.completion_start_time, 861 model=model, 862 usage=usage, 863 ) 864 865 866class LangfuseResponseGeneratorAsync: 867 def __init__( 868 self, 869 *, 870 resource, 871 response, 872 generation, 873 langfuse, 874 is_nested_trace, 875 ): 876 self.items = [] 877 878 self.resource = resource 879 self.response = response 880 self.generation = generation 881 self.langfuse = langfuse 882 self.is_nested_trace = is_nested_trace 883 self.completion_start_time = None 884 885 async def __aiter__(self): 886 try: 887 async for i in self.response: 888 self.items.append(i) 889 890 if self.completion_start_time is None: 891 self.completion_start_time = _get_timestamp() 892 893 yield i 894 finally: 895 await self._finalize() 896 897 async def __anext__(self): 898 try: 899 item = await self.response.__anext__() 900 self.items.append(item) 901 902 if self.completion_start_time is None: 903 self.completion_start_time = _get_timestamp() 904 905 return item 906 907 except StopAsyncIteration: 908 await self._finalize() 909 910 raise 911 912 async def __aenter__(self): 913 return self.__aiter__() 914 915 async def __aexit__(self, exc_type, exc_value, traceback): 916 pass 917 918 async def _finalize(self): 919 model, completion, usage = _extract_streamed_openai_response( 920 self.resource, self.items 921 ) 922 923 # Avoiding the trace-update if trace-id is provided by user. 924 if not self.is_nested_trace: 925 self.langfuse.trace(id=self.generation.trace_id, output=completion) 926 927 _create_langfuse_update( 928 completion, 929 self.generation, 930 self.completion_start_time, 931 model=model, 932 usage=usage, 933 ) 934 935 async def close(self) -> None: 936 """Close the response and release the connection. 937 938 Automatically called if the response body is read to completion. 939 """ 940 await self.response.close()
log =
<Logger langfuse (WARNING)>
@dataclass
class
OpenAiDefinition:
60@dataclass 61class OpenAiDefinition: 62 module: str 63 object: str 64 method: str 65 type: str 66 sync: bool 67 min_version: Optional[str] = None
OPENAI_METHODS_V0 =
[OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 =
[OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0')]
class
OpenAiArgsExtractor:
136class OpenAiArgsExtractor: 137 def __init__( 138 self, 139 name=None, 140 metadata=None, 141 trace_id=None, 142 session_id=None, 143 user_id=None, 144 tags=None, 145 parent_observation_id=None, 146 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 147 **kwargs, 148 ): 149 self.args = {} 150 self.args["name"] = name 151 self.args["metadata"] = ( 152 metadata 153 if "response_format" not in kwargs 154 else { 155 **(metadata or {}), 156 "response_format": kwargs["response_format"].model_json_schema() 157 if isclass(kwargs["response_format"]) 158 and issubclass(kwargs["response_format"], BaseModel) 159 else kwargs["response_format"], 160 } 161 ) 162 self.args["trace_id"] = trace_id 163 self.args["session_id"] = session_id 164 self.args["user_id"] = user_id 165 self.args["tags"] = tags 166 self.args["parent_observation_id"] = parent_observation_id 167 self.args["langfuse_prompt"] = langfuse_prompt 168 self.kwargs = kwargs 169 170 def get_langfuse_args(self): 171 return {**self.args, **self.kwargs} 172 173 def get_openai_args(self): 174 return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
137 def __init__( 138 self, 139 name=None, 140 metadata=None, 141 trace_id=None, 142 session_id=None, 143 user_id=None, 144 tags=None, 145 parent_observation_id=None, 146 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 147 **kwargs, 148 ): 149 self.args = {} 150 self.args["name"] = name 151 self.args["metadata"] = ( 152 metadata 153 if "response_format" not in kwargs 154 else { 155 **(metadata or {}), 156 "response_format": kwargs["response_format"].model_json_schema() 157 if isclass(kwargs["response_format"]) 158 and issubclass(kwargs["response_format"], BaseModel) 159 else kwargs["response_format"], 160 } 161 ) 162 self.args["trace_id"] = trace_id 163 self.args["session_id"] = session_id 164 self.args["user_id"] = user_id 165 self.args["tags"] = tags 166 self.args["parent_observation_id"] = parent_observation_id 167 self.args["langfuse_prompt"] = langfuse_prompt 168 self.kwargs = kwargs
class
OpenAILangfuse:
724class OpenAILangfuse: 725 _langfuse: Optional[Langfuse] = None 726 727 def initialize(self): 728 self._langfuse = LangfuseSingleton().get( 729 public_key=openai.langfuse_public_key, 730 secret_key=openai.langfuse_secret_key, 731 host=openai.langfuse_host, 732 debug=openai.langfuse_debug, 733 enabled=openai.langfuse_enabled, 734 sdk_integration="openai", 735 sample_rate=openai.langfuse_sample_rate, 736 ) 737 738 return self._langfuse 739 740 def flush(cls): 741 cls._langfuse.flush() 742 743 def langfuse_auth_check(self): 744 """Check if the provided Langfuse credentials (public and secret key) are valid. 745 746 Raises: 747 Exception: If no projects were found for the provided credentials. 748 749 Note: 750 This method is blocking. It is discouraged to use it in production code. 751 """ 752 if self._langfuse is None: 753 self.initialize() 754 755 return self._langfuse.auth_check() 756 757 def register_tracing(self): 758 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 759 760 for resource in resources: 761 if resource.min_version is not None and Version( 762 openai.__version__ 763 ) < Version(resource.min_version): 764 continue 765 766 wrap_function_wrapper( 767 resource.module, 768 f"{resource.object}.{resource.method}", 769 _wrap(resource, self.initialize) 770 if resource.sync 771 else _wrap_async(resource, self.initialize), 772 ) 773 774 setattr(openai, "langfuse_public_key", None) 775 setattr(openai, "langfuse_secret_key", None) 776 setattr(openai, "langfuse_host", None) 777 setattr(openai, "langfuse_debug", None) 778 setattr(openai, "langfuse_enabled", True) 779 setattr(openai, "langfuse_sample_rate", None) 780 setattr(openai, "langfuse_mask", None) 781 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 782 setattr(openai, "flush_langfuse", self.flush)
def
initialize(self):
727 def initialize(self): 728 self._langfuse = LangfuseSingleton().get( 729 public_key=openai.langfuse_public_key, 730 secret_key=openai.langfuse_secret_key, 731 host=openai.langfuse_host, 732 debug=openai.langfuse_debug, 733 enabled=openai.langfuse_enabled, 734 sdk_integration="openai", 735 sample_rate=openai.langfuse_sample_rate, 736 ) 737 738 return self._langfuse
def
langfuse_auth_check(self):
743 def langfuse_auth_check(self): 744 """Check if the provided Langfuse credentials (public and secret key) are valid. 745 746 Raises: 747 Exception: If no projects were found for the provided credentials. 748 749 Note: 750 This method is blocking. It is discouraged to use it in production code. 751 """ 752 if self._langfuse is None: 753 self.initialize() 754 755 return self._langfuse.auth_check()
Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
def
register_tracing(self):
757 def register_tracing(self): 758 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 759 760 for resource in resources: 761 if resource.min_version is not None and Version( 762 openai.__version__ 763 ) < Version(resource.min_version): 764 continue 765 766 wrap_function_wrapper( 767 resource.module, 768 f"{resource.object}.{resource.method}", 769 _wrap(resource, self.initialize) 770 if resource.sync 771 else _wrap_async(resource, self.initialize), 772 ) 773 774 setattr(openai, "langfuse_public_key", None) 775 setattr(openai, "langfuse_secret_key", None) 776 setattr(openai, "langfuse_host", None) 777 setattr(openai, "langfuse_debug", None) 778 setattr(openai, "langfuse_enabled", True) 779 setattr(openai, "langfuse_sample_rate", None) 780 setattr(openai, "langfuse_mask", None) 781 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 782 setattr(openai, "flush_langfuse", self.flush)
modifier =
<OpenAILangfuse object>
def
auth_check():
class
LangfuseResponseGeneratorSync:
797class LangfuseResponseGeneratorSync: 798 def __init__( 799 self, 800 *, 801 resource, 802 response, 803 generation, 804 langfuse, 805 is_nested_trace, 806 ): 807 self.items = [] 808 809 self.resource = resource 810 self.response = response 811 self.generation = generation 812 self.langfuse = langfuse 813 self.is_nested_trace = is_nested_trace 814 self.completion_start_time = None 815 816 def __iter__(self): 817 try: 818 for i in self.response: 819 self.items.append(i) 820 821 if self.completion_start_time is None: 822 self.completion_start_time = _get_timestamp() 823 824 yield i 825 finally: 826 self._finalize() 827 828 def __next__(self): 829 try: 830 item = self.response.__next__() 831 self.items.append(item) 832 833 if self.completion_start_time is None: 834 self.completion_start_time = _get_timestamp() 835 836 return item 837 838 except StopIteration: 839 self._finalize() 840 841 raise 842 843 def __enter__(self): 844 return self.__iter__() 845 846 def __exit__(self, exc_type, exc_value, traceback): 847 pass 848 849 def _finalize(self): 850 model, completion, usage = _extract_streamed_openai_response( 851 self.resource, self.items 852 ) 853 854 # Avoiding the trace-update if trace-id is provided by user. 855 if not self.is_nested_trace: 856 self.langfuse.trace(id=self.generation.trace_id, output=completion) 857 858 _create_langfuse_update( 859 completion, 860 self.generation, 861 self.completion_start_time, 862 model=model, 863 usage=usage, 864 )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
798 def __init__( 799 self, 800 *, 801 resource, 802 response, 803 generation, 804 langfuse, 805 is_nested_trace, 806 ): 807 self.items = [] 808 809 self.resource = resource 810 self.response = response 811 self.generation = generation 812 self.langfuse = langfuse 813 self.is_nested_trace = is_nested_trace 814 self.completion_start_time = None
class
LangfuseResponseGeneratorAsync:
867class LangfuseResponseGeneratorAsync: 868 def __init__( 869 self, 870 *, 871 resource, 872 response, 873 generation, 874 langfuse, 875 is_nested_trace, 876 ): 877 self.items = [] 878 879 self.resource = resource 880 self.response = response 881 self.generation = generation 882 self.langfuse = langfuse 883 self.is_nested_trace = is_nested_trace 884 self.completion_start_time = None 885 886 async def __aiter__(self): 887 try: 888 async for i in self.response: 889 self.items.append(i) 890 891 if self.completion_start_time is None: 892 self.completion_start_time = _get_timestamp() 893 894 yield i 895 finally: 896 await self._finalize() 897 898 async def __anext__(self): 899 try: 900 item = await self.response.__anext__() 901 self.items.append(item) 902 903 if self.completion_start_time is None: 904 self.completion_start_time = _get_timestamp() 905 906 return item 907 908 except StopAsyncIteration: 909 await self._finalize() 910 911 raise 912 913 async def __aenter__(self): 914 return self.__aiter__() 915 916 async def __aexit__(self, exc_type, exc_value, traceback): 917 pass 918 919 async def _finalize(self): 920 model, completion, usage = _extract_streamed_openai_response( 921 self.resource, self.items 922 ) 923 924 # Avoiding the trace-update if trace-id is provided by user. 925 if not self.is_nested_trace: 926 self.langfuse.trace(id=self.generation.trace_id, output=completion) 927 928 _create_langfuse_update( 929 completion, 930 self.generation, 931 self.completion_start_time, 932 model=model, 933 usage=usage, 934 ) 935 936 async def close(self) -> None: 937 """Close the response and release the connection. 938 939 Automatically called if the response body is read to completion. 940 """ 941 await self.response.close()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
868 def __init__( 869 self, 870 *, 871 resource, 872 response, 873 generation, 874 langfuse, 875 is_nested_trace, 876 ): 877 self.items = [] 878 879 self.resource = resource 880 self.response = response 881 self.generation = generation 882 self.langfuse = langfuse 883 self.is_nested_trace = is_nested_trace 884 self.completion_start_time = None
async def
close(self) -> None:
936 async def close(self) -> None: 937 """Close the response and release the connection. 938 939 Automatically called if the response body is read to completion. 940 """ 941 await self.response.close()
Close the response and release the connection.
Automatically called if the response body is read to completion.