langfuse.openai
If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
- import openai
+ from langfuse.openai import openai
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the observe()
decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import. 2 3```diff 4- import openai 5+ from langfuse.openai import openai 6``` 7 8Langfuse automatically tracks: 9 10- All prompts/completions with support for streaming, async and functions 11- Latencies 12- API Errors 13- Model usage (tokens) and cost (USD) 14 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK. 16 17See docs for more details: https://langfuse.com/docs/integrations/openai 18""" 19 20import copy 21import logging 22import types 23 24from collections import defaultdict 25from typing import List, Optional 26 27from packaging.version import Version 28from wrapt import wrap_function_wrapper 29 30from langfuse import Langfuse 31from langfuse.client import StatefulGenerationClient 32from langfuse.decorators import langfuse_context 33from langfuse.utils import _get_timestamp 34from langfuse.utils.langfuse_singleton import LangfuseSingleton 35 36try: 37 import openai 38except ImportError: 39 raise ModuleNotFoundError( 40 "Please install OpenAI to use this feature: 'pip install openai'" 41 ) 42 43try: 44 from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401 45except ImportError: 46 AsyncAzureOpenAI = None 47 AsyncOpenAI = None 48 AzureOpenAI = None 49 OpenAI = None 50 51log = logging.getLogger("langfuse") 52 53 54class OpenAiDefinition: 55 module: str 56 object: str 57 method: str 58 type: str 59 sync: bool 60 61 def __init__(self, module: str, object: str, method: str, type: str, sync: bool): 62 self.module = module 63 self.object = object 64 self.method = method 65 self.type = type 66 self.sync = sync 67 68 69OPENAI_METHODS_V0 = [ 70 OpenAiDefinition( 71 module="openai", 72 object="ChatCompletion", 73 method="create", 74 type="chat", 75 sync=True, 76 ), 77 OpenAiDefinition( 78 module="openai", 79 object="Completion", 80 method="create", 81 type="completion", 82 sync=True, 83 ), 84] 85 86 87OPENAI_METHODS_V1 = [ 88 OpenAiDefinition( 89 module="openai.resources.chat.completions", 90 object="Completions", 91 method="create", 92 type="chat", 93 sync=True, 94 ), 95 OpenAiDefinition( 96 module="openai.resources.completions", 97 object="Completions", 98 method="create", 99 type="completion", 100 sync=True, 101 ), 102 OpenAiDefinition( 103 module="openai.resources.chat.completions", 104 object="AsyncCompletions", 105 method="create", 106 type="chat", 107 sync=False, 108 ), 109 OpenAiDefinition( 110 module="openai.resources.completions", 111 object="AsyncCompletions", 112 method="create", 113 type="completion", 114 sync=False, 115 ), 116] 117 118 119class OpenAiArgsExtractor: 120 def __init__( 121 self, 122 name=None, 123 metadata=None, 124 trace_id=None, 125 session_id=None, 126 user_id=None, 127 tags=None, 128 parent_observation_id=None, 129 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 130 **kwargs, 131 ): 132 self.args = {} 133 self.args["name"] = name 134 self.args["metadata"] = metadata 135 self.args["trace_id"] = trace_id 136 self.args["session_id"] = session_id 137 self.args["user_id"] = user_id 138 self.args["tags"] = tags 139 self.args["parent_observation_id"] = parent_observation_id 140 self.args["langfuse_prompt"] = langfuse_prompt 141 self.kwargs = kwargs 142 143 def get_langfuse_args(self): 144 return {**self.args, **self.kwargs} 145 146 def get_openai_args(self): 147 return self.kwargs 148 149 150def _langfuse_wrapper(func): 151 def _with_langfuse(open_ai_definitions, initialize): 152 def wrapper(wrapped, instance, args, kwargs): 153 return func(open_ai_definitions, initialize, wrapped, args, kwargs) 154 155 return wrapper 156 157 return _with_langfuse 158 159 160def _extract_chat_prompt(kwargs: any): 161 """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions""" 162 prompt = {} 163 164 if kwargs.get("functions") is not None: 165 prompt.update({"functions": kwargs["functions"]}) 166 167 if kwargs.get("function_call") is not None: 168 prompt.update({"function_call": kwargs["function_call"]}) 169 170 if kwargs.get("tools") is not None: 171 prompt.update({"tools": kwargs["tools"]}) 172 173 if prompt: 174 # uf user provided functions, we need to send these together with messages to langfuse 175 prompt.update( 176 { 177 "messages": _filter_image_data(kwargs.get("messages", [])), 178 } 179 ) 180 return prompt 181 else: 182 # vanilla case, only send messages in openai format to langfuse 183 return _filter_image_data(kwargs.get("messages", [])) 184 185 186def _extract_chat_response(kwargs: any): 187 """Extracts the llm output from the response.""" 188 response = { 189 "role": kwargs.get("role", None), 190 } 191 192 if kwargs.get("function_call") is not None: 193 response.update({"function_call": kwargs["function_call"]}) 194 195 if kwargs.get("tool_calls") is not None: 196 response.update({"tool_calls": kwargs["tool_calls"]}) 197 198 response.update( 199 { 200 "content": kwargs.get("content", None), 201 } 202 ) 203 return response 204 205 206def _get_langfuse_data_from_kwargs( 207 resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs 208): 209 name = kwargs.get("name", "OpenAI-generation") 210 211 if name is None: 212 name = "OpenAI-generation" 213 214 if name is not None and not isinstance(name, str): 215 raise TypeError("name must be a string") 216 217 decorator_context_observation_id = langfuse_context.get_current_observation_id() 218 decorator_context_trace_id = langfuse_context.get_current_trace_id() 219 220 trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id 221 if trace_id is not None and not isinstance(trace_id, str): 222 raise TypeError("trace_id must be a string") 223 224 session_id = kwargs.get("session_id", None) 225 if session_id is not None and not isinstance(session_id, str): 226 raise TypeError("session_id must be a string") 227 228 user_id = kwargs.get("user_id", None) 229 if user_id is not None and not isinstance(user_id, str): 230 raise TypeError("user_id must be a string") 231 232 tags = kwargs.get("tags", None) 233 if tags is not None and ( 234 not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags) 235 ): 236 raise TypeError("tags must be a list of strings") 237 238 # Update trace params in decorator context if specified in openai call 239 if decorator_context_trace_id: 240 langfuse_context.update_current_trace( 241 session_id=session_id, user_id=user_id, tags=tags 242 ) 243 244 parent_observation_id = kwargs.get("parent_observation_id", None) or ( 245 decorator_context_observation_id 246 if decorator_context_observation_id != decorator_context_trace_id 247 else None 248 ) 249 if parent_observation_id is not None and not isinstance(parent_observation_id, str): 250 raise TypeError("parent_observation_id must be a string") 251 if parent_observation_id is not None and trace_id is None: 252 raise ValueError("parent_observation_id requires trace_id to be set") 253 254 metadata = kwargs.get("metadata", {}) 255 256 if metadata is not None and not isinstance(metadata, dict): 257 raise TypeError("metadata must be a dictionary") 258 259 model = kwargs.get("model", None) or None 260 261 prompt = None 262 263 if resource.type == "completion": 264 prompt = kwargs.get("prompt", None) 265 elif resource.type == "chat": 266 prompt = _extract_chat_prompt(kwargs) 267 268 is_nested_trace = False 269 if trace_id: 270 is_nested_trace = True 271 langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags) 272 else: 273 trace_id = ( 274 decorator_context_trace_id 275 or langfuse.trace( 276 session_id=session_id, 277 user_id=user_id, 278 tags=tags, 279 name=name, 280 input=prompt, 281 metadata=metadata, 282 ).id 283 ) 284 285 modelParameters = { 286 "temperature": kwargs.get("temperature", 1), 287 "max_tokens": kwargs.get("max_tokens", float("inf")), # casing? 288 "top_p": kwargs.get("top_p", 1), 289 "frequency_penalty": kwargs.get("frequency_penalty", 0), 290 "presence_penalty": kwargs.get("presence_penalty", 0), 291 } 292 293 langfuse_prompt = kwargs.get("langfuse_prompt", None) 294 295 return { 296 "name": name, 297 "metadata": metadata, 298 "trace_id": trace_id, 299 "parent_observation_id": parent_observation_id, 300 "user_id": user_id, 301 "start_time": start_time, 302 "input": prompt, 303 "model_parameters": modelParameters, 304 "model": model or None, 305 "prompt": langfuse_prompt, 306 }, is_nested_trace 307 308 309def _create_langfuse_update( 310 completion, generation: StatefulGenerationClient, completion_start_time, model=None 311): 312 update = { 313 "end_time": _get_timestamp(), 314 "output": completion, 315 "completion_start_time": completion_start_time, 316 } 317 if model is not None: 318 update["model"] = model 319 320 generation.update(**update) 321 322 323def _extract_streamed_openai_response(resource, chunks): 324 completion = defaultdict(str) if resource.type == "chat" else "" 325 model = None 326 completion_start_time = None 327 328 for index, i in enumerate(chunks): 329 if index == 0: 330 completion_start_time = _get_timestamp() 331 332 if _is_openai_v1(): 333 i = i.__dict__ 334 335 model = model or i.get("model", None) or None 336 337 choices = i.get("choices", []) 338 339 for choice in choices: 340 if _is_openai_v1(): 341 choice = choice.__dict__ 342 if resource.type == "chat": 343 delta = choice.get("delta", None) 344 345 if _is_openai_v1(): 346 delta = delta.__dict__ 347 348 if delta.get("role", None) is not None: 349 completion["role"] = delta["role"] 350 351 if delta.get("content", None) is not None: 352 completion["content"] = ( 353 delta.get("content", None) 354 if completion["content"] is None 355 else completion["content"] + delta.get("content", None) 356 ) 357 elif delta.get("function_call", None) is not None: 358 curr = completion["function_call"] 359 tool_call_chunk = delta.get("function_call", None) 360 361 if not curr: 362 completion["function_call"] = { 363 "name": getattr(tool_call_chunk, "name", ""), 364 "arguments": getattr(tool_call_chunk, "arguments", ""), 365 } 366 367 else: 368 curr["name"] = curr["name"] or getattr( 369 tool_call_chunk, "name", None 370 ) 371 curr["arguments"] += getattr(tool_call_chunk, "arguments", "") 372 373 elif delta.get("tool_calls", None) is not None: 374 curr = completion["tool_calls"] 375 tool_call_chunk = getattr( 376 delta.get("tool_calls", None)[0], "function", None 377 ) 378 379 if not curr: 380 completion["tool_calls"] = { 381 "name": getattr(tool_call_chunk, "name", ""), 382 "arguments": getattr(tool_call_chunk, "arguments", ""), 383 } 384 385 else: 386 curr["name"] = curr["name"] or getattr( 387 tool_call_chunk, "name", None 388 ) 389 curr["arguments"] += getattr(tool_call_chunk, "arguments", None) 390 391 if resource.type == "completion": 392 completion += choice.get("text", None) 393 394 def get_response_for_chat(): 395 return ( 396 completion["content"] 397 or ( 398 completion["function_call"] 399 and { 400 "role": "assistant", 401 "function_call": completion["function_call"], 402 } 403 ) 404 or ( 405 completion["tool_calls"] 406 and { 407 "role": "assistant", 408 "tool_calls": [{"function": completion["tool_calls"]}], 409 } 410 ) 411 or None 412 ) 413 414 return ( 415 model, 416 completion_start_time, 417 get_response_for_chat() if resource.type == "chat" else completion, 418 ) 419 420 421def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): 422 model = response.get("model", None) or None 423 424 completion = None 425 if resource.type == "completion": 426 choices = response.get("choices", []) 427 if len(choices) > 0: 428 choice = choices[-1] 429 430 completion = choice.text if _is_openai_v1() else choice.get("text", None) 431 elif resource.type == "chat": 432 choices = response.get("choices", []) 433 if len(choices) > 0: 434 choice = choices[-1] 435 completion = ( 436 _extract_chat_response(choice.message.__dict__) 437 if _is_openai_v1() 438 else choice.get("message", None) 439 ) 440 441 usage = response.get("usage", None) 442 443 return model, completion, usage.__dict__ if _is_openai_v1() else usage 444 445 446def _is_openai_v1(): 447 return Version(openai.__version__) >= Version("1.0.0") 448 449 450def _is_streaming_response(response): 451 return ( 452 isinstance(response, types.GeneratorType) 453 or isinstance(response, types.AsyncGeneratorType) 454 or (_is_openai_v1() and isinstance(response, openai.Stream)) 455 or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) 456 ) 457 458 459@_langfuse_wrapper 460def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs): 461 new_langfuse: Langfuse = initialize() 462 463 start_time = _get_timestamp() 464 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 465 466 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 467 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 468 ) 469 generation = new_langfuse.generation(**generation) 470 try: 471 openai_response = wrapped(**arg_extractor.get_openai_args()) 472 473 if _is_streaming_response(openai_response): 474 return LangfuseResponseGeneratorSync( 475 resource=open_ai_resource, 476 response=openai_response, 477 generation=generation, 478 langfuse=new_langfuse, 479 is_nested_trace=is_nested_trace, 480 ) 481 482 else: 483 model, completion, usage = _get_langfuse_data_from_default_response( 484 open_ai_resource, 485 openai_response.__dict__ if _is_openai_v1() else openai_response, 486 ) 487 generation.update( 488 model=model, output=completion, end_time=_get_timestamp(), usage=usage 489 ) 490 491 # Avoiding the trace-update if trace-id is provided by user. 492 if not is_nested_trace: 493 new_langfuse.trace(id=generation.trace_id, output=completion) 494 495 return openai_response 496 except Exception as ex: 497 log.warning(ex) 498 model = kwargs.get("model", None) or None 499 generation.update( 500 end_time=_get_timestamp(), 501 status_message=str(ex), 502 level="ERROR", 503 model=model, 504 usage={"input_cost": 0, "output_cost": 0, "total_cost": 0}, 505 ) 506 raise ex 507 508 509@_langfuse_wrapper 510async def _wrap_async( 511 open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs 512): 513 new_langfuse = initialize() 514 start_time = _get_timestamp() 515 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 516 517 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 518 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 519 ) 520 generation = new_langfuse.generation(**generation) 521 try: 522 openai_response = await wrapped(**arg_extractor.get_openai_args()) 523 524 if _is_streaming_response(openai_response): 525 return LangfuseResponseGeneratorAsync( 526 resource=open_ai_resource, 527 response=openai_response, 528 generation=generation, 529 langfuse=new_langfuse, 530 is_nested_trace=is_nested_trace, 531 ) 532 533 else: 534 model, completion, usage = _get_langfuse_data_from_default_response( 535 open_ai_resource, 536 openai_response.__dict__ if _is_openai_v1() else openai_response, 537 ) 538 generation.update( 539 model=model, 540 output=completion, 541 end_time=_get_timestamp(), 542 usage=usage, 543 ) 544 # Avoiding the trace-update if trace-id is provided by user. 545 if not is_nested_trace: 546 new_langfuse.trace(id=generation.trace_id, output=completion) 547 548 return openai_response 549 except Exception as ex: 550 model = kwargs.get("model", None) or None 551 generation.update( 552 end_time=_get_timestamp(), 553 status_message=str(ex), 554 level="ERROR", 555 model=model, 556 usage={"input_cost": 0, "output_cost": 0, "total_cost": 0}, 557 ) 558 raise ex 559 560 561class OpenAILangfuse: 562 _langfuse: Optional[Langfuse] = None 563 564 def initialize(self): 565 self._langfuse = LangfuseSingleton().get( 566 public_key=openai.langfuse_public_key, 567 secret_key=openai.langfuse_secret_key, 568 host=openai.langfuse_host, 569 debug=openai.langfuse_debug, 570 enabled=openai.langfuse_enabled, 571 sdk_integration="openai", 572 ) 573 574 return self._langfuse 575 576 def flush(cls): 577 cls._langfuse.flush() 578 579 def langfuse_auth_check(self): 580 """Check if the provided Langfuse credentials (public and secret key) are valid. 581 582 Raises: 583 Exception: If no projects were found for the provided credentials. 584 585 Note: 586 This method is blocking. It is discouraged to use it in production code. 587 """ 588 if self._langfuse is None: 589 self.initialize() 590 591 return self._langfuse.auth_check() 592 593 def register_tracing(self): 594 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 595 596 for resource in resources: 597 wrap_function_wrapper( 598 resource.module, 599 f"{resource.object}.{resource.method}", 600 _wrap(resource, self.initialize) 601 if resource.sync 602 else _wrap_async(resource, self.initialize), 603 ) 604 605 setattr(openai, "langfuse_public_key", None) 606 setattr(openai, "langfuse_secret_key", None) 607 setattr(openai, "langfuse_host", None) 608 setattr(openai, "langfuse_debug", None) 609 setattr(openai, "langfuse_enabled", True) 610 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 611 setattr(openai, "flush_langfuse", self.flush) 612 613 614modifier = OpenAILangfuse() 615modifier.register_tracing() 616 617 618# DEPRECATED: Use `openai.langfuse_auth_check()` instead 619def auth_check(): 620 if modifier._langfuse is None: 621 modifier.initialize() 622 623 return modifier._langfuse.auth_check() 624 625 626def _filter_image_data(messages: List[dict]): 627 """https://platform.openai.com/docs/guides/vision?lang=python 628 629 The messages array remains the same, but the 'image_url' is removed from the 'content' array. 630 It should only be removed if the value starts with 'data:image/jpeg;base64,' 631 632 """ 633 output_messages = copy.deepcopy(messages) 634 635 for message in output_messages: 636 content = ( 637 message.get("content", None) 638 if isinstance(message, dict) 639 else getattr(message, "content", None) 640 ) 641 642 if content is not None: 643 for index, item in enumerate(content): 644 if isinstance(item, dict) and item.get("image_url", None) is not None: 645 url = item["image_url"]["url"] 646 if url.startswith("data:image/"): 647 del content[index]["image_url"] 648 649 return output_messages 650 651 652class LangfuseResponseGeneratorSync: 653 def __init__( 654 self, 655 *, 656 resource, 657 response, 658 generation, 659 langfuse, 660 is_nested_trace, 661 ): 662 self.items = [] 663 664 self.resource = resource 665 self.response = response 666 self.generation = generation 667 self.langfuse = langfuse 668 self.is_nested_trace = is_nested_trace 669 670 def __iter__(self): 671 try: 672 for i in self.response: 673 self.items.append(i) 674 675 yield i 676 finally: 677 self._finalize() 678 679 def __enter__(self): 680 return self.__iter__() 681 682 def __exit__(self, exc_type, exc_value, traceback): 683 pass 684 685 def _finalize(self): 686 model, completion_start_time, completion = _extract_streamed_openai_response( 687 self.resource, self.items 688 ) 689 690 # Avoiding the trace-update if trace-id is provided by user. 691 if not self.is_nested_trace: 692 self.langfuse.trace(id=self.generation.trace_id, output=completion) 693 694 _create_langfuse_update( 695 completion, self.generation, completion_start_time, model=model 696 ) 697 698 699class LangfuseResponseGeneratorAsync: 700 def __init__( 701 self, 702 *, 703 resource, 704 response, 705 generation, 706 langfuse, 707 is_nested_trace, 708 ): 709 self.items = [] 710 711 self.resource = resource 712 self.response = response 713 self.generation = generation 714 self.langfuse = langfuse 715 self.is_nested_trace = is_nested_trace 716 717 async def __aiter__(self): 718 try: 719 async for i in self.response: 720 self.items.append(i) 721 722 yield i 723 finally: 724 await self._finalize() 725 726 async def __aenter__(self): 727 return self.__aiter__() 728 729 async def __aexit__(self, exc_type, exc_value, traceback): 730 pass 731 732 async def _finalize(self): 733 model, completion_start_time, completion = _extract_streamed_openai_response( 734 self.resource, self.items 735 ) 736 737 # Avoiding the trace-update if trace-id is provided by user. 738 if not self.is_nested_trace: 739 self.langfuse.trace(id=self.generation.trace_id, output=completion) 740 741 _create_langfuse_update( 742 completion, self.generation, completion_start_time, model=model 743 )
log =
<Logger langfuse (WARNING)>
class
OpenAiDefinition:
55class OpenAiDefinition: 56 module: str 57 object: str 58 method: str 59 type: str 60 sync: bool 61 62 def __init__(self, module: str, object: str, method: str, type: str, sync: bool): 63 self.module = module 64 self.object = object 65 self.method = method 66 self.type = type 67 self.sync = sync
OPENAI_METHODS_V0 =
[<OpenAiDefinition object>, <OpenAiDefinition object>]
OPENAI_METHODS_V1 =
[<OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>]
class
OpenAiArgsExtractor:
120class OpenAiArgsExtractor: 121 def __init__( 122 self, 123 name=None, 124 metadata=None, 125 trace_id=None, 126 session_id=None, 127 user_id=None, 128 tags=None, 129 parent_observation_id=None, 130 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 131 **kwargs, 132 ): 133 self.args = {} 134 self.args["name"] = name 135 self.args["metadata"] = metadata 136 self.args["trace_id"] = trace_id 137 self.args["session_id"] = session_id 138 self.args["user_id"] = user_id 139 self.args["tags"] = tags 140 self.args["parent_observation_id"] = parent_observation_id 141 self.args["langfuse_prompt"] = langfuse_prompt 142 self.kwargs = kwargs 143 144 def get_langfuse_args(self): 145 return {**self.args, **self.kwargs} 146 147 def get_openai_args(self): 148 return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
121 def __init__( 122 self, 123 name=None, 124 metadata=None, 125 trace_id=None, 126 session_id=None, 127 user_id=None, 128 tags=None, 129 parent_observation_id=None, 130 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 131 **kwargs, 132 ): 133 self.args = {} 134 self.args["name"] = name 135 self.args["metadata"] = metadata 136 self.args["trace_id"] = trace_id 137 self.args["session_id"] = session_id 138 self.args["user_id"] = user_id 139 self.args["tags"] = tags 140 self.args["parent_observation_id"] = parent_observation_id 141 self.args["langfuse_prompt"] = langfuse_prompt 142 self.kwargs = kwargs
class
OpenAILangfuse:
562class OpenAILangfuse: 563 _langfuse: Optional[Langfuse] = None 564 565 def initialize(self): 566 self._langfuse = LangfuseSingleton().get( 567 public_key=openai.langfuse_public_key, 568 secret_key=openai.langfuse_secret_key, 569 host=openai.langfuse_host, 570 debug=openai.langfuse_debug, 571 enabled=openai.langfuse_enabled, 572 sdk_integration="openai", 573 ) 574 575 return self._langfuse 576 577 def flush(cls): 578 cls._langfuse.flush() 579 580 def langfuse_auth_check(self): 581 """Check if the provided Langfuse credentials (public and secret key) are valid. 582 583 Raises: 584 Exception: If no projects were found for the provided credentials. 585 586 Note: 587 This method is blocking. It is discouraged to use it in production code. 588 """ 589 if self._langfuse is None: 590 self.initialize() 591 592 return self._langfuse.auth_check() 593 594 def register_tracing(self): 595 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 596 597 for resource in resources: 598 wrap_function_wrapper( 599 resource.module, 600 f"{resource.object}.{resource.method}", 601 _wrap(resource, self.initialize) 602 if resource.sync 603 else _wrap_async(resource, self.initialize), 604 ) 605 606 setattr(openai, "langfuse_public_key", None) 607 setattr(openai, "langfuse_secret_key", None) 608 setattr(openai, "langfuse_host", None) 609 setattr(openai, "langfuse_debug", None) 610 setattr(openai, "langfuse_enabled", True) 611 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 612 setattr(openai, "flush_langfuse", self.flush)
def
initialize(self):
565 def initialize(self): 566 self._langfuse = LangfuseSingleton().get( 567 public_key=openai.langfuse_public_key, 568 secret_key=openai.langfuse_secret_key, 569 host=openai.langfuse_host, 570 debug=openai.langfuse_debug, 571 enabled=openai.langfuse_enabled, 572 sdk_integration="openai", 573 ) 574 575 return self._langfuse
def
langfuse_auth_check(self):
580 def langfuse_auth_check(self): 581 """Check if the provided Langfuse credentials (public and secret key) are valid. 582 583 Raises: 584 Exception: If no projects were found for the provided credentials. 585 586 Note: 587 This method is blocking. It is discouraged to use it in production code. 588 """ 589 if self._langfuse is None: 590 self.initialize() 591 592 return self._langfuse.auth_check()
Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
def
register_tracing(self):
594 def register_tracing(self): 595 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 596 597 for resource in resources: 598 wrap_function_wrapper( 599 resource.module, 600 f"{resource.object}.{resource.method}", 601 _wrap(resource, self.initialize) 602 if resource.sync 603 else _wrap_async(resource, self.initialize), 604 ) 605 606 setattr(openai, "langfuse_public_key", None) 607 setattr(openai, "langfuse_secret_key", None) 608 setattr(openai, "langfuse_host", None) 609 setattr(openai, "langfuse_debug", None) 610 setattr(openai, "langfuse_enabled", True) 611 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 612 setattr(openai, "flush_langfuse", self.flush)
modifier =
<OpenAILangfuse object>
def
auth_check():
class
LangfuseResponseGeneratorSync:
653class LangfuseResponseGeneratorSync: 654 def __init__( 655 self, 656 *, 657 resource, 658 response, 659 generation, 660 langfuse, 661 is_nested_trace, 662 ): 663 self.items = [] 664 665 self.resource = resource 666 self.response = response 667 self.generation = generation 668 self.langfuse = langfuse 669 self.is_nested_trace = is_nested_trace 670 671 def __iter__(self): 672 try: 673 for i in self.response: 674 self.items.append(i) 675 676 yield i 677 finally: 678 self._finalize() 679 680 def __enter__(self): 681 return self.__iter__() 682 683 def __exit__(self, exc_type, exc_value, traceback): 684 pass 685 686 def _finalize(self): 687 model, completion_start_time, completion = _extract_streamed_openai_response( 688 self.resource, self.items 689 ) 690 691 # Avoiding the trace-update if trace-id is provided by user. 692 if not self.is_nested_trace: 693 self.langfuse.trace(id=self.generation.trace_id, output=completion) 694 695 _create_langfuse_update( 696 completion, self.generation, completion_start_time, model=model 697 )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
654 def __init__( 655 self, 656 *, 657 resource, 658 response, 659 generation, 660 langfuse, 661 is_nested_trace, 662 ): 663 self.items = [] 664 665 self.resource = resource 666 self.response = response 667 self.generation = generation 668 self.langfuse = langfuse 669 self.is_nested_trace = is_nested_trace
class
LangfuseResponseGeneratorAsync:
700class LangfuseResponseGeneratorAsync: 701 def __init__( 702 self, 703 *, 704 resource, 705 response, 706 generation, 707 langfuse, 708 is_nested_trace, 709 ): 710 self.items = [] 711 712 self.resource = resource 713 self.response = response 714 self.generation = generation 715 self.langfuse = langfuse 716 self.is_nested_trace = is_nested_trace 717 718 async def __aiter__(self): 719 try: 720 async for i in self.response: 721 self.items.append(i) 722 723 yield i 724 finally: 725 await self._finalize() 726 727 async def __aenter__(self): 728 return self.__aiter__() 729 730 async def __aexit__(self, exc_type, exc_value, traceback): 731 pass 732 733 async def _finalize(self): 734 model, completion_start_time, completion = _extract_streamed_openai_response( 735 self.resource, self.items 736 ) 737 738 # Avoiding the trace-update if trace-id is provided by user. 739 if not self.is_nested_trace: 740 self.langfuse.trace(id=self.generation.trace_id, output=completion) 741 742 _create_langfuse_update( 743 completion, self.generation, completion_start_time, model=model 744 )
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
701 def __init__( 702 self, 703 *, 704 resource, 705 response, 706 generation, 707 langfuse, 708 is_nested_trace, 709 ): 710 self.items = [] 711 712 self.resource = resource 713 self.response = response 714 self.generation = generation 715 self.langfuse = langfuse 716 self.is_nested_trace = is_nested_trace