langfuse.openai

If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.

- import openai
+ from langfuse.openai import openai

Langfuse automatically tracks:

  • All prompts/completions with support for streaming, async and functions
  • Latencies
  • API Errors
  • Model usage (tokens) and cost (USD)

The integration is fully interoperable with the observe() decorator and the low-level tracing SDK.

See docs for more details: https://langfuse.com/docs/integrations/openai

   1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
   2
   3```diff
   4- import openai
   5+ from langfuse.openai import openai
   6```
   7
   8Langfuse automatically tracks:
   9
  10- All prompts/completions with support for streaming, async and functions
  11- Latencies
  12- API Errors
  13- Model usage (tokens) and cost (USD)
  14
  15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
  16
  17See docs for more details: https://langfuse.com/docs/integrations/openai
  18"""
  19
  20import logging
  21import types
  22from collections import defaultdict
  23from dataclasses import dataclass
  24from inspect import isclass
  25from typing import Optional
  26
  27from openai._types import NotGiven
  28from packaging.version import Version
  29from pydantic import BaseModel
  30from wrapt import wrap_function_wrapper
  31
  32from langfuse import Langfuse
  33from langfuse.client import StatefulGenerationClient
  34from langfuse.decorators import langfuse_context
  35from langfuse.media import LangfuseMedia
  36from langfuse.utils import _get_timestamp
  37from langfuse.utils.langfuse_singleton import LangfuseSingleton
  38
  39try:
  40    import openai
  41except ImportError:
  42    raise ModuleNotFoundError(
  43        "Please install OpenAI to use this feature: 'pip install openai'"
  44    )
  45
  46try:
  47    from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI  # noqa: F401
  48except ImportError:
  49    AsyncAzureOpenAI = None
  50    AsyncOpenAI = None
  51    AzureOpenAI = None
  52    OpenAI = None
  53
  54log = logging.getLogger("langfuse")
  55
  56
  57@dataclass
  58class OpenAiDefinition:
  59    module: str
  60    object: str
  61    method: str
  62    type: str
  63    sync: bool
  64    min_version: Optional[str] = None
  65
  66
  67OPENAI_METHODS_V0 = [
  68    OpenAiDefinition(
  69        module="openai",
  70        object="ChatCompletion",
  71        method="create",
  72        type="chat",
  73        sync=True,
  74    ),
  75    OpenAiDefinition(
  76        module="openai",
  77        object="Completion",
  78        method="create",
  79        type="completion",
  80        sync=True,
  81    ),
  82]
  83
  84
  85OPENAI_METHODS_V1 = [
  86    OpenAiDefinition(
  87        module="openai.resources.chat.completions",
  88        object="Completions",
  89        method="create",
  90        type="chat",
  91        sync=True,
  92    ),
  93    OpenAiDefinition(
  94        module="openai.resources.completions",
  95        object="Completions",
  96        method="create",
  97        type="completion",
  98        sync=True,
  99    ),
 100    OpenAiDefinition(
 101        module="openai.resources.chat.completions",
 102        object="AsyncCompletions",
 103        method="create",
 104        type="chat",
 105        sync=False,
 106    ),
 107    OpenAiDefinition(
 108        module="openai.resources.completions",
 109        object="AsyncCompletions",
 110        method="create",
 111        type="completion",
 112        sync=False,
 113    ),
 114    OpenAiDefinition(
 115        module="openai.resources.beta.chat.completions",
 116        object="Completions",
 117        method="parse",
 118        type="chat",
 119        sync=True,
 120        min_version="1.50.0",
 121    ),
 122    OpenAiDefinition(
 123        module="openai.resources.beta.chat.completions",
 124        object="AsyncCompletions",
 125        method="parse",
 126        type="chat",
 127        sync=False,
 128        min_version="1.50.0",
 129    ),
 130    OpenAiDefinition(
 131        module="openai.resources.responses",
 132        object="Responses",
 133        method="create",
 134        type="chat",
 135        sync=True,
 136        min_version="1.66.0",
 137    ),
 138    OpenAiDefinition(
 139        module="openai.resources.responses",
 140        object="AsyncResponses",
 141        method="create",
 142        type="chat",
 143        sync=False,
 144        min_version="1.66.0",
 145    ),
 146]
 147
 148
 149class OpenAiArgsExtractor:
 150    def __init__(
 151        self,
 152        name=None,
 153        metadata=None,
 154        trace_id=None,
 155        session_id=None,
 156        user_id=None,
 157        tags=None,
 158        parent_observation_id=None,
 159        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
 160        **kwargs,
 161    ):
 162        self.args = {}
 163        self.args["name"] = name
 164        self.args["metadata"] = (
 165            metadata
 166            if "response_format" not in kwargs
 167            else {
 168                **(metadata or {}),
 169                "response_format": kwargs["response_format"].model_json_schema()
 170                if isclass(kwargs["response_format"])
 171                and issubclass(kwargs["response_format"], BaseModel)
 172                else kwargs["response_format"],
 173            }
 174        )
 175        self.args["trace_id"] = trace_id
 176        self.args["session_id"] = session_id
 177        self.args["user_id"] = user_id
 178        self.args["tags"] = tags
 179        self.args["parent_observation_id"] = parent_observation_id
 180        self.args["langfuse_prompt"] = langfuse_prompt
 181        self.kwargs = kwargs
 182
 183    def get_langfuse_args(self):
 184        return {**self.args, **self.kwargs}
 185
 186    def get_openai_args(self):
 187        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
 188        # https://platform.openai.com/docs/guides/distillation
 189        if self.kwargs.get("store", False):
 190            self.kwargs["metadata"] = (
 191                {} if self.args.get("metadata", None) is None else self.args["metadata"]
 192            )
 193
 194            # OpenAI does not support non-string type values in metadata when using
 195            # model distillation feature
 196            self.kwargs["metadata"].pop("response_format", None)
 197
 198        return self.kwargs
 199
 200
 201def _langfuse_wrapper(func):
 202    def _with_langfuse(open_ai_definitions, initialize):
 203        def wrapper(wrapped, instance, args, kwargs):
 204            return func(open_ai_definitions, initialize, wrapped, args, kwargs)
 205
 206        return wrapper
 207
 208    return _with_langfuse
 209
 210
 211def _extract_chat_prompt(kwargs: any):
 212    """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
 213    prompt = {}
 214
 215    if kwargs.get("functions") is not None:
 216        prompt.update({"functions": kwargs["functions"]})
 217
 218    if kwargs.get("function_call") is not None:
 219        prompt.update({"function_call": kwargs["function_call"]})
 220
 221    if kwargs.get("tools") is not None:
 222        prompt.update({"tools": kwargs["tools"]})
 223
 224    if prompt:
 225        # uf user provided functions, we need to send these together with messages to langfuse
 226        prompt.update(
 227            {
 228                "messages": [
 229                    _process_message(message) for message in kwargs.get("messages", [])
 230                ],
 231            }
 232        )
 233        return prompt
 234    else:
 235        # vanilla case, only send messages in openai format to langfuse
 236        return [_process_message(message) for message in kwargs.get("messages", [])]
 237
 238
 239def _process_message(message):
 240    if not isinstance(message, dict):
 241        return message
 242
 243    processed_message = {**message}
 244
 245    content = processed_message.get("content", None)
 246    if not isinstance(content, list):
 247        return processed_message
 248
 249    processed_content = []
 250
 251    for content_part in content:
 252        if content_part.get("type") == "input_audio":
 253            audio_base64 = content_part.get("input_audio", {}).get("data", None)
 254            format = content_part.get("input_audio", {}).get("format", "wav")
 255
 256            if audio_base64 is not None:
 257                base64_data_uri = f"data:audio/{format};base64,{audio_base64}"
 258
 259                processed_content.append(
 260                    {
 261                        "type": "input_audio",
 262                        "input_audio": {
 263                            "data": LangfuseMedia(base64_data_uri=base64_data_uri),
 264                            "format": format,
 265                        },
 266                    }
 267                )
 268        else:
 269            processed_content.append(content_part)
 270
 271    processed_message["content"] = processed_content
 272
 273    return processed_message
 274
 275
 276def _extract_chat_response(kwargs: any):
 277    """Extracts the llm output from the response."""
 278    response = {
 279        "role": kwargs.get("role", None),
 280    }
 281
 282    audio = None
 283
 284    if kwargs.get("function_call") is not None:
 285        response.update({"function_call": kwargs["function_call"]})
 286
 287    if kwargs.get("tool_calls") is not None:
 288        response.update({"tool_calls": kwargs["tool_calls"]})
 289
 290    if kwargs.get("audio") is not None:
 291        audio = kwargs["audio"].__dict__
 292
 293        if "data" in audio and audio["data"] is not None:
 294            base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}"
 295            audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri)
 296
 297    response.update(
 298        {
 299            "content": kwargs.get("content", None),
 300        }
 301    )
 302
 303    if audio is not None:
 304        response.update({"audio": audio})
 305
 306    return response
 307
 308
 309def _get_langfuse_data_from_kwargs(
 310    resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
 311):
 312    name = kwargs.get("name", "OpenAI-generation")
 313
 314    if name is None:
 315        name = "OpenAI-generation"
 316
 317    if name is not None and not isinstance(name, str):
 318        raise TypeError("name must be a string")
 319
 320    decorator_context_observation_id = langfuse_context.get_current_observation_id()
 321    decorator_context_trace_id = langfuse_context.get_current_trace_id()
 322
 323    trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
 324    if trace_id is not None and not isinstance(trace_id, str):
 325        raise TypeError("trace_id must be a string")
 326
 327    session_id = kwargs.get("session_id", None)
 328    if session_id is not None and not isinstance(session_id, str):
 329        raise TypeError("session_id must be a string")
 330
 331    user_id = kwargs.get("user_id", None)
 332    if user_id is not None and not isinstance(user_id, str):
 333        raise TypeError("user_id must be a string")
 334
 335    tags = kwargs.get("tags", None)
 336    if tags is not None and (
 337        not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
 338    ):
 339        raise TypeError("tags must be a list of strings")
 340
 341    # Update trace params in decorator context if specified in openai call
 342    if decorator_context_trace_id:
 343        langfuse_context.update_current_trace(
 344            session_id=session_id, user_id=user_id, tags=tags
 345        )
 346
 347    parent_observation_id = kwargs.get("parent_observation_id", None) or (
 348        decorator_context_observation_id
 349        if decorator_context_observation_id != decorator_context_trace_id
 350        else None
 351    )
 352    if parent_observation_id is not None and not isinstance(parent_observation_id, str):
 353        raise TypeError("parent_observation_id must be a string")
 354    if parent_observation_id is not None and trace_id is None:
 355        raise ValueError("parent_observation_id requires trace_id to be set")
 356
 357    metadata = kwargs.get("metadata", {})
 358
 359    if metadata is not None and not isinstance(metadata, dict):
 360        raise TypeError("metadata must be a dictionary")
 361
 362    model = kwargs.get("model", None) or None
 363
 364    prompt = None
 365
 366    if resource.type == "completion":
 367        prompt = kwargs.get("prompt", None)
 368
 369    elif resource.object == "Responses":
 370        prompt = kwargs.get("input", None)
 371
 372    elif resource.type == "chat":
 373        prompt = _extract_chat_prompt(kwargs)
 374
 375    is_nested_trace = False
 376    if trace_id:
 377        is_nested_trace = True
 378        langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
 379    else:
 380        trace_id = (
 381            decorator_context_trace_id
 382            or langfuse.trace(
 383                session_id=session_id,
 384                user_id=user_id,
 385                tags=tags,
 386                name=name,
 387                input=prompt,
 388                metadata=metadata,
 389            ).id
 390        )
 391
 392    parsed_temperature = (
 393        kwargs.get("temperature", 1)
 394        if not isinstance(kwargs.get("temperature", 1), NotGiven)
 395        else 1
 396    )
 397
 398    parsed_max_tokens = (
 399        kwargs.get("max_tokens", float("inf"))
 400        if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven)
 401        else float("inf")
 402    )
 403
 404    parsed_top_p = (
 405        kwargs.get("top_p", 1)
 406        if not isinstance(kwargs.get("top_p", 1), NotGiven)
 407        else 1
 408    )
 409
 410    parsed_frequency_penalty = (
 411        kwargs.get("frequency_penalty", 0)
 412        if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven)
 413        else 0
 414    )
 415
 416    parsed_presence_penalty = (
 417        kwargs.get("presence_penalty", 0)
 418        if not isinstance(kwargs.get("presence_penalty", 0), NotGiven)
 419        else 0
 420    )
 421
 422    parsed_seed = (
 423        kwargs.get("seed", None)
 424        if not isinstance(kwargs.get("seed", None), NotGiven)
 425        else None
 426    )
 427
 428    parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1
 429
 430    modelParameters = {
 431        "temperature": parsed_temperature,
 432        "max_tokens": parsed_max_tokens,  # casing?
 433        "top_p": parsed_top_p,
 434        "frequency_penalty": parsed_frequency_penalty,
 435        "presence_penalty": parsed_presence_penalty,
 436    }
 437    if parsed_n is not None and parsed_n > 1:
 438        modelParameters["n"] = parsed_n
 439
 440    if parsed_seed is not None:
 441        modelParameters["seed"] = parsed_seed
 442
 443    langfuse_prompt = kwargs.get("langfuse_prompt", None)
 444
 445    return {
 446        "name": name,
 447        "metadata": metadata,
 448        "trace_id": trace_id,
 449        "parent_observation_id": parent_observation_id,
 450        "user_id": user_id,
 451        "start_time": start_time,
 452        "input": prompt,
 453        "model_parameters": modelParameters,
 454        "model": model or None,
 455        "prompt": langfuse_prompt,
 456    }, is_nested_trace
 457
 458
 459def _create_langfuse_update(
 460    completion,
 461    generation: StatefulGenerationClient,
 462    completion_start_time,
 463    model=None,
 464    usage=None,
 465    metadata=None,
 466):
 467    update = {
 468        "end_time": _get_timestamp(),
 469        "output": completion,
 470        "completion_start_time": completion_start_time,
 471    }
 472    if model is not None:
 473        update["model"] = model
 474
 475    if metadata is not None:
 476        update["metadata"] = metadata
 477
 478    if usage is not None:
 479        update["usage"] = _parse_usage(usage)
 480
 481    generation.update(**update)
 482
 483
 484def _parse_usage(usage=None):
 485    if usage is None:
 486        return
 487
 488    usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__.copy()
 489
 490    for tokens_details in [
 491        "prompt_tokens_details",
 492        "completion_tokens_details",
 493        "input_token_details",
 494        "output_token_details",
 495    ]:
 496        if tokens_details in usage_dict and usage_dict[tokens_details] is not None:
 497            tokens_details_dict = (
 498                usage_dict[tokens_details]
 499                if isinstance(usage_dict[tokens_details], dict)
 500                else usage_dict[tokens_details].__dict__
 501            )
 502            usage_dict[tokens_details] = {
 503                k: v for k, v in tokens_details_dict.items() if v is not None
 504            }
 505
 506    return usage_dict
 507
 508
 509def _extract_streamed_response_api_response(chunks):
 510    completion, model, usage = None, None, None
 511    metadata = {}
 512
 513    for raw_chunk in chunks:
 514        chunk = raw_chunk.__dict__
 515        if raw_response := chunk.get("response", None):
 516            usage = chunk.get("usage", None)
 517            response = raw_response.__dict__
 518            model = response.get("model")
 519
 520            for key, val in response.items():
 521                if key not in ["created_at", "model", "output", "usage", "text"]:
 522                    metadata[key] = val
 523
 524                if key == "output":
 525                    output = val
 526                    if not isinstance(output, list):
 527                        completion = output
 528                    elif len(output) > 1:
 529                        completion = output
 530                    elif len(output) == 1:
 531                        completion = output[0]
 532
 533    return (model, completion, usage, metadata)
 534
 535
 536def _extract_streamed_openai_response(resource, chunks):
 537    completion = defaultdict(str) if resource.type == "chat" else ""
 538    model, usage = None, None
 539
 540    for chunk in chunks:
 541        if _is_openai_v1():
 542            chunk = chunk.__dict__
 543
 544        model = model or chunk.get("model", None) or None
 545        usage = chunk.get("usage", None)
 546
 547        choices = chunk.get("choices", [])
 548
 549        for choice in choices:
 550            if _is_openai_v1():
 551                choice = choice.__dict__
 552            if resource.type == "chat":
 553                delta = choice.get("delta", None)
 554
 555                if _is_openai_v1():
 556                    delta = delta.__dict__
 557
 558                if delta.get("role", None) is not None:
 559                    completion["role"] = delta["role"]
 560
 561                if delta.get("content", None) is not None:
 562                    completion["content"] = (
 563                        delta.get("content", None)
 564                        if completion["content"] is None
 565                        else completion["content"] + delta.get("content", None)
 566                    )
 567                elif delta.get("function_call", None) is not None:
 568                    curr = completion["function_call"]
 569                    tool_call_chunk = delta.get("function_call", None)
 570
 571                    if not curr:
 572                        completion["function_call"] = {
 573                            "name": getattr(tool_call_chunk, "name", ""),
 574                            "arguments": getattr(tool_call_chunk, "arguments", ""),
 575                        }
 576
 577                    else:
 578                        curr["name"] = curr["name"] or getattr(
 579                            tool_call_chunk, "name", None
 580                        )
 581                        curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
 582
 583                elif delta.get("tool_calls", None) is not None:
 584                    curr = completion["tool_calls"]
 585                    tool_call_chunk = getattr(
 586                        delta.get("tool_calls", None)[0], "function", None
 587                    )
 588
 589                    if not curr:
 590                        completion["tool_calls"] = [
 591                            {
 592                                "name": getattr(tool_call_chunk, "name", ""),
 593                                "arguments": getattr(tool_call_chunk, "arguments", ""),
 594                            }
 595                        ]
 596
 597                    elif getattr(tool_call_chunk, "name", None) is not None:
 598                        curr.append(
 599                            {
 600                                "name": getattr(tool_call_chunk, "name", None),
 601                                "arguments": getattr(
 602                                    tool_call_chunk, "arguments", None
 603                                ),
 604                            }
 605                        )
 606
 607                    else:
 608                        curr[-1]["name"] = curr[-1]["name"] or getattr(
 609                            tool_call_chunk, "name", None
 610                        )
 611                        curr[-1]["arguments"] += getattr(
 612                            tool_call_chunk, "arguments", None
 613                        )
 614
 615            if resource.type == "completion":
 616                completion += choice.get("text", "")
 617
 618    def get_response_for_chat():
 619        return (
 620            completion["content"]
 621            or (
 622                completion["function_call"]
 623                and {
 624                    "role": "assistant",
 625                    "function_call": completion["function_call"],
 626                }
 627            )
 628            or (
 629                completion["tool_calls"]
 630                and {
 631                    "role": "assistant",
 632                    # "tool_calls": [{"function": completion["tool_calls"]}],
 633                    "tool_calls": [
 634                        {"function": data} for data in completion["tool_calls"]
 635                    ],
 636                }
 637            )
 638            or None
 639        )
 640
 641    return (
 642        model,
 643        get_response_for_chat() if resource.type == "chat" else completion,
 644        usage,
 645        None,
 646    )
 647
 648
 649def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
 650    if response is None:
 651        return None, "<NoneType response returned from OpenAI>", None
 652
 653    model = response.get("model", None) or None
 654
 655    completion = None
 656
 657    if resource.type == "completion":
 658        choices = response.get("choices", [])
 659        if len(choices) > 0:
 660            choice = choices[-1]
 661
 662            completion = choice.text if _is_openai_v1() else choice.get("text", None)
 663
 664    elif resource.object == "Responses":
 665        output = response.get("output", {})
 666
 667        if not isinstance(output, list):
 668            completion = output
 669        elif len(output) > 1:
 670            completion = output
 671        elif len(output) == 1:
 672            completion = output[0]
 673
 674    elif resource.type == "chat":
 675        choices = response.get("choices", [])
 676        if len(choices) > 0:
 677            # If multiple choices were generated, we'll show all of them in the UI as a list.
 678            if len(choices) > 1:
 679                completion = [
 680                    _extract_chat_response(choice.message.__dict__)
 681                    if _is_openai_v1()
 682                    else choice.get("message", None)
 683                    for choice in choices
 684                ]
 685            else:
 686                choice = choices[0]
 687                completion = (
 688                    _extract_chat_response(choice.message.__dict__)
 689                    if _is_openai_v1()
 690                    else choice.get("message", None)
 691                )
 692
 693    usage = _parse_usage(response.get("usage", None))
 694
 695    return (model, completion, usage)
 696
 697
 698def _is_openai_v1():
 699    return Version(openai.__version__) >= Version("1.0.0")
 700
 701
 702def _is_streaming_response(response):
 703    return (
 704        isinstance(response, types.GeneratorType)
 705        or isinstance(response, types.AsyncGeneratorType)
 706        or (_is_openai_v1() and isinstance(response, openai.Stream))
 707        or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
 708    )
 709
 710
 711@_langfuse_wrapper
 712def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
 713    new_langfuse: Langfuse = initialize()
 714
 715    start_time = _get_timestamp()
 716    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
 717
 718    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
 719        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
 720    )
 721    generation = new_langfuse.generation(**generation)
 722    try:
 723        openai_response = wrapped(**arg_extractor.get_openai_args())
 724
 725        if _is_streaming_response(openai_response):
 726            return LangfuseResponseGeneratorSync(
 727                resource=open_ai_resource,
 728                response=openai_response,
 729                generation=generation,
 730                langfuse=new_langfuse,
 731                is_nested_trace=is_nested_trace,
 732            )
 733
 734        else:
 735            model, completion, usage = _get_langfuse_data_from_default_response(
 736                open_ai_resource,
 737                (openai_response and openai_response.__dict__)
 738                if _is_openai_v1()
 739                else openai_response,
 740            )
 741            generation.update(
 742                model=model,
 743                output=completion,
 744                end_time=_get_timestamp(),
 745                usage=usage,  # backward compat for all V2 self hosters
 746                usage_details=usage,
 747            )
 748
 749            # Avoiding the trace-update if trace-id is provided by user.
 750            if not is_nested_trace:
 751                new_langfuse.trace(id=generation.trace_id, output=completion)
 752
 753        return openai_response
 754    except Exception as ex:
 755        log.warning(ex)
 756        model = kwargs.get("model", None) or None
 757        generation.update(
 758            end_time=_get_timestamp(),
 759            status_message=str(ex),
 760            level="ERROR",
 761            model=model,
 762            usage={
 763                "input_cost": 0,
 764                "output_cost": 0,
 765                "total_cost": 0,
 766            },  # backward compat for all V2 self hosters
 767            cost_details={"input": 0, "output": 0, "total": 0},
 768        )
 769        raise ex
 770
 771
 772@_langfuse_wrapper
 773async def _wrap_async(
 774    open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
 775):
 776    new_langfuse = initialize()
 777    start_time = _get_timestamp()
 778    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
 779
 780    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
 781        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
 782    )
 783    generation = new_langfuse.generation(**generation)
 784    try:
 785        openai_response = await wrapped(**arg_extractor.get_openai_args())
 786
 787        if _is_streaming_response(openai_response):
 788            return LangfuseResponseGeneratorAsync(
 789                resource=open_ai_resource,
 790                response=openai_response,
 791                generation=generation,
 792                langfuse=new_langfuse,
 793                is_nested_trace=is_nested_trace,
 794            )
 795
 796        else:
 797            model, completion, usage = _get_langfuse_data_from_default_response(
 798                open_ai_resource,
 799                (openai_response and openai_response.__dict__)
 800                if _is_openai_v1()
 801                else openai_response,
 802            )
 803            generation.update(
 804                model=model,
 805                output=completion,
 806                end_time=_get_timestamp(),
 807                usage=usage,  # backward compat for all V2 self hosters
 808                usage_details=usage,
 809            )
 810            # Avoiding the trace-update if trace-id is provided by user.
 811            if not is_nested_trace:
 812                new_langfuse.trace(id=generation.trace_id, output=completion)
 813
 814        return openai_response
 815    except Exception as ex:
 816        model = kwargs.get("model", None) or None
 817        generation.update(
 818            end_time=_get_timestamp(),
 819            status_message=str(ex),
 820            level="ERROR",
 821            model=model,
 822            usage={
 823                "input_cost": 0,
 824                "output_cost": 0,
 825                "total_cost": 0,
 826            },  # Backward compat for all V2 self hosters
 827            cost_details={"input": 0, "output": 0, "total": 0},
 828        )
 829        raise ex
 830
 831
 832class OpenAILangfuse:
 833    _langfuse: Optional[Langfuse] = None
 834
 835    def initialize(self):
 836        self._langfuse = LangfuseSingleton().get(
 837            public_key=openai.langfuse_public_key,
 838            secret_key=openai.langfuse_secret_key,
 839            host=openai.langfuse_host,
 840            debug=openai.langfuse_debug,
 841            enabled=openai.langfuse_enabled,
 842            sdk_integration="openai",
 843            sample_rate=openai.langfuse_sample_rate,
 844            environment=openai.langfuse_environment,
 845            mask=openai.langfuse_mask,
 846        )
 847
 848        return self._langfuse
 849
 850    def flush(cls):
 851        cls._langfuse.flush()
 852
 853    def langfuse_auth_check(self):
 854        """Check if the provided Langfuse credentials (public and secret key) are valid.
 855
 856        Raises:
 857            Exception: If no projects were found for the provided credentials.
 858
 859        Note:
 860            This method is blocking. It is discouraged to use it in production code.
 861        """
 862        if self._langfuse is None:
 863            self.initialize()
 864
 865        return self._langfuse.auth_check()
 866
 867    def register_tracing(self):
 868        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
 869
 870        for resource in resources:
 871            if resource.min_version is not None and Version(
 872                openai.__version__
 873            ) < Version(resource.min_version):
 874                continue
 875
 876            wrap_function_wrapper(
 877                resource.module,
 878                f"{resource.object}.{resource.method}",
 879                _wrap(resource, self.initialize)
 880                if resource.sync
 881                else _wrap_async(resource, self.initialize),
 882            )
 883
 884        setattr(openai, "langfuse_public_key", None)
 885        setattr(openai, "langfuse_secret_key", None)
 886        setattr(openai, "langfuse_host", None)
 887        setattr(openai, "langfuse_debug", None)
 888        setattr(openai, "langfuse_enabled", True)
 889        setattr(openai, "langfuse_sample_rate", None)
 890        setattr(openai, "langfuse_environment", None)
 891        setattr(openai, "langfuse_mask", None)
 892        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
 893        setattr(openai, "flush_langfuse", self.flush)
 894
 895
 896modifier = OpenAILangfuse()
 897modifier.register_tracing()
 898
 899
 900# DEPRECATED: Use `openai.langfuse_auth_check()` instead
 901def auth_check():
 902    if modifier._langfuse is None:
 903        modifier.initialize()
 904
 905    return modifier._langfuse.auth_check()
 906
 907
 908class LangfuseResponseGeneratorSync:
 909    def __init__(
 910        self,
 911        *,
 912        resource,
 913        response,
 914        generation,
 915        langfuse,
 916        is_nested_trace,
 917    ):
 918        self.items = []
 919
 920        self.resource = resource
 921        self.response = response
 922        self.generation = generation
 923        self.langfuse = langfuse
 924        self.is_nested_trace = is_nested_trace
 925        self.completion_start_time = None
 926
 927    def __iter__(self):
 928        try:
 929            for i in self.response:
 930                self.items.append(i)
 931
 932                if self.completion_start_time is None:
 933                    self.completion_start_time = _get_timestamp()
 934
 935                yield i
 936        finally:
 937            self._finalize()
 938
 939    def __next__(self):
 940        try:
 941            item = self.response.__next__()
 942            self.items.append(item)
 943
 944            if self.completion_start_time is None:
 945                self.completion_start_time = _get_timestamp()
 946
 947            return item
 948
 949        except StopIteration:
 950            self._finalize()
 951
 952            raise
 953
 954    def __enter__(self):
 955        return self.__iter__()
 956
 957    def __exit__(self, exc_type, exc_value, traceback):
 958        pass
 959
 960    def _finalize(self):
 961        model, completion, usage, metadata = (
 962            _extract_streamed_response_api_response(self.items)
 963            if self.resource.object == "Responses"
 964            else _extract_streamed_openai_response(self.resource, self.items)
 965        )
 966
 967        # Avoiding the trace-update if trace-id is provided by user.
 968        if not self.is_nested_trace:
 969            self.langfuse.trace(id=self.generation.trace_id, output=completion)
 970
 971        _create_langfuse_update(
 972            completion,
 973            self.generation,
 974            self.completion_start_time,
 975            model=model,
 976            usage=usage,
 977            metadata=metadata,
 978        )
 979
 980
 981class LangfuseResponseGeneratorAsync:
 982    def __init__(
 983        self,
 984        *,
 985        resource,
 986        response,
 987        generation,
 988        langfuse,
 989        is_nested_trace,
 990    ):
 991        self.items = []
 992
 993        self.resource = resource
 994        self.response = response
 995        self.generation = generation
 996        self.langfuse = langfuse
 997        self.is_nested_trace = is_nested_trace
 998        self.completion_start_time = None
 999
1000    async def __aiter__(self):
1001        try:
1002            async for i in self.response:
1003                self.items.append(i)
1004
1005                if self.completion_start_time is None:
1006                    self.completion_start_time = _get_timestamp()
1007
1008                yield i
1009        finally:
1010            await self._finalize()
1011
1012    async def __anext__(self):
1013        try:
1014            item = await self.response.__anext__()
1015            self.items.append(item)
1016
1017            if self.completion_start_time is None:
1018                self.completion_start_time = _get_timestamp()
1019
1020            return item
1021
1022        except StopAsyncIteration:
1023            await self._finalize()
1024
1025            raise
1026
1027    async def __aenter__(self):
1028        return self.__aiter__()
1029
1030    async def __aexit__(self, exc_type, exc_value, traceback):
1031        pass
1032
1033    async def _finalize(self):
1034        model, completion, usage, metadata = (
1035            _extract_streamed_response_api_response(self.items)
1036            if self.resource.object == "Responses"
1037            else _extract_streamed_openai_response(self.resource, self.items)
1038        )
1039
1040        # Avoiding the trace-update if trace-id is provided by user.
1041        if not self.is_nested_trace:
1042            self.langfuse.trace(id=self.generation.trace_id, output=completion)
1043
1044        _create_langfuse_update(
1045            completion,
1046            self.generation,
1047            self.completion_start_time,
1048            model=model,
1049            usage=usage,
1050            metadata=metadata,
1051        )
1052
1053    async def close(self) -> None:
1054        """Close the response and release the connection.
1055
1056        Automatically called if the response body is read to completion.
1057        """
1058        await self.response.close()
1059
1060    async def aclose(self) -> None:
1061        """Close the response and release the connection.
1062
1063        Automatically called if the response body is read to completion.
1064        """
1065        await self.response.aclose()
log = <Logger langfuse (WARNING)>
@dataclass
class OpenAiDefinition:
58@dataclass
59class OpenAiDefinition:
60    module: str
61    object: str
62    method: str
63    type: str
64    sync: bool
65    min_version: Optional[str] = None
OpenAiDefinition( module: str, object: str, method: str, type: str, sync: bool, min_version: Optional[str] = None)
module: str
object: str
method: str
type: str
sync: bool
min_version: Optional[str] = None
OPENAI_METHODS_V0 = [OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 = [OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.responses', object='Responses', method='create', type='chat', sync=True, min_version='1.66.0'), OpenAiDefinition(module='openai.resources.responses', object='AsyncResponses', method='create', type='chat', sync=False, min_version='1.66.0')]
class OpenAiArgsExtractor:
150class OpenAiArgsExtractor:
151    def __init__(
152        self,
153        name=None,
154        metadata=None,
155        trace_id=None,
156        session_id=None,
157        user_id=None,
158        tags=None,
159        parent_observation_id=None,
160        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
161        **kwargs,
162    ):
163        self.args = {}
164        self.args["name"] = name
165        self.args["metadata"] = (
166            metadata
167            if "response_format" not in kwargs
168            else {
169                **(metadata or {}),
170                "response_format": kwargs["response_format"].model_json_schema()
171                if isclass(kwargs["response_format"])
172                and issubclass(kwargs["response_format"], BaseModel)
173                else kwargs["response_format"],
174            }
175        )
176        self.args["trace_id"] = trace_id
177        self.args["session_id"] = session_id
178        self.args["user_id"] = user_id
179        self.args["tags"] = tags
180        self.args["parent_observation_id"] = parent_observation_id
181        self.args["langfuse_prompt"] = langfuse_prompt
182        self.kwargs = kwargs
183
184    def get_langfuse_args(self):
185        return {**self.args, **self.kwargs}
186
187    def get_openai_args(self):
188        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
189        # https://platform.openai.com/docs/guides/distillation
190        if self.kwargs.get("store", False):
191            self.kwargs["metadata"] = (
192                {} if self.args.get("metadata", None) is None else self.args["metadata"]
193            )
194
195            # OpenAI does not support non-string type values in metadata when using
196            # model distillation feature
197            self.kwargs["metadata"].pop("response_format", None)
198
199        return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
151    def __init__(
152        self,
153        name=None,
154        metadata=None,
155        trace_id=None,
156        session_id=None,
157        user_id=None,
158        tags=None,
159        parent_observation_id=None,
160        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
161        **kwargs,
162    ):
163        self.args = {}
164        self.args["name"] = name
165        self.args["metadata"] = (
166            metadata
167            if "response_format" not in kwargs
168            else {
169                **(metadata or {}),
170                "response_format": kwargs["response_format"].model_json_schema()
171                if isclass(kwargs["response_format"])
172                and issubclass(kwargs["response_format"], BaseModel)
173                else kwargs["response_format"],
174            }
175        )
176        self.args["trace_id"] = trace_id
177        self.args["session_id"] = session_id
178        self.args["user_id"] = user_id
179        self.args["tags"] = tags
180        self.args["parent_observation_id"] = parent_observation_id
181        self.args["langfuse_prompt"] = langfuse_prompt
182        self.kwargs = kwargs
args
kwargs
def get_langfuse_args(self):
184    def get_langfuse_args(self):
185        return {**self.args, **self.kwargs}
def get_openai_args(self):
187    def get_openai_args(self):
188        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
189        # https://platform.openai.com/docs/guides/distillation
190        if self.kwargs.get("store", False):
191            self.kwargs["metadata"] = (
192                {} if self.args.get("metadata", None) is None else self.args["metadata"]
193            )
194
195            # OpenAI does not support non-string type values in metadata when using
196            # model distillation feature
197            self.kwargs["metadata"].pop("response_format", None)
198
199        return self.kwargs
class OpenAILangfuse:
833class OpenAILangfuse:
834    _langfuse: Optional[Langfuse] = None
835
836    def initialize(self):
837        self._langfuse = LangfuseSingleton().get(
838            public_key=openai.langfuse_public_key,
839            secret_key=openai.langfuse_secret_key,
840            host=openai.langfuse_host,
841            debug=openai.langfuse_debug,
842            enabled=openai.langfuse_enabled,
843            sdk_integration="openai",
844            sample_rate=openai.langfuse_sample_rate,
845            environment=openai.langfuse_environment,
846            mask=openai.langfuse_mask,
847        )
848
849        return self._langfuse
850
851    def flush(cls):
852        cls._langfuse.flush()
853
854    def langfuse_auth_check(self):
855        """Check if the provided Langfuse credentials (public and secret key) are valid.
856
857        Raises:
858            Exception: If no projects were found for the provided credentials.
859
860        Note:
861            This method is blocking. It is discouraged to use it in production code.
862        """
863        if self._langfuse is None:
864            self.initialize()
865
866        return self._langfuse.auth_check()
867
868    def register_tracing(self):
869        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
870
871        for resource in resources:
872            if resource.min_version is not None and Version(
873                openai.__version__
874            ) < Version(resource.min_version):
875                continue
876
877            wrap_function_wrapper(
878                resource.module,
879                f"{resource.object}.{resource.method}",
880                _wrap(resource, self.initialize)
881                if resource.sync
882                else _wrap_async(resource, self.initialize),
883            )
884
885        setattr(openai, "langfuse_public_key", None)
886        setattr(openai, "langfuse_secret_key", None)
887        setattr(openai, "langfuse_host", None)
888        setattr(openai, "langfuse_debug", None)
889        setattr(openai, "langfuse_enabled", True)
890        setattr(openai, "langfuse_sample_rate", None)
891        setattr(openai, "langfuse_environment", None)
892        setattr(openai, "langfuse_mask", None)
893        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
894        setattr(openai, "flush_langfuse", self.flush)
def initialize(self):
836    def initialize(self):
837        self._langfuse = LangfuseSingleton().get(
838            public_key=openai.langfuse_public_key,
839            secret_key=openai.langfuse_secret_key,
840            host=openai.langfuse_host,
841            debug=openai.langfuse_debug,
842            enabled=openai.langfuse_enabled,
843            sdk_integration="openai",
844            sample_rate=openai.langfuse_sample_rate,
845            environment=openai.langfuse_environment,
846            mask=openai.langfuse_mask,
847        )
848
849        return self._langfuse
def flush(cls):
851    def flush(cls):
852        cls._langfuse.flush()
def langfuse_auth_check(self):
854    def langfuse_auth_check(self):
855        """Check if the provided Langfuse credentials (public and secret key) are valid.
856
857        Raises:
858            Exception: If no projects were found for the provided credentials.
859
860        Note:
861            This method is blocking. It is discouraged to use it in production code.
862        """
863        if self._langfuse is None:
864            self.initialize()
865
866        return self._langfuse.auth_check()

Check if the provided Langfuse credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def register_tracing(self):
868    def register_tracing(self):
869        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
870
871        for resource in resources:
872            if resource.min_version is not None and Version(
873                openai.__version__
874            ) < Version(resource.min_version):
875                continue
876
877            wrap_function_wrapper(
878                resource.module,
879                f"{resource.object}.{resource.method}",
880                _wrap(resource, self.initialize)
881                if resource.sync
882                else _wrap_async(resource, self.initialize),
883            )
884
885        setattr(openai, "langfuse_public_key", None)
886        setattr(openai, "langfuse_secret_key", None)
887        setattr(openai, "langfuse_host", None)
888        setattr(openai, "langfuse_debug", None)
889        setattr(openai, "langfuse_enabled", True)
890        setattr(openai, "langfuse_sample_rate", None)
891        setattr(openai, "langfuse_environment", None)
892        setattr(openai, "langfuse_mask", None)
893        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
894        setattr(openai, "flush_langfuse", self.flush)
modifier = <OpenAILangfuse object>
def auth_check():
902def auth_check():
903    if modifier._langfuse is None:
904        modifier.initialize()
905
906    return modifier._langfuse.auth_check()
class LangfuseResponseGeneratorSync:
909class LangfuseResponseGeneratorSync:
910    def __init__(
911        self,
912        *,
913        resource,
914        response,
915        generation,
916        langfuse,
917        is_nested_trace,
918    ):
919        self.items = []
920
921        self.resource = resource
922        self.response = response
923        self.generation = generation
924        self.langfuse = langfuse
925        self.is_nested_trace = is_nested_trace
926        self.completion_start_time = None
927
928    def __iter__(self):
929        try:
930            for i in self.response:
931                self.items.append(i)
932
933                if self.completion_start_time is None:
934                    self.completion_start_time = _get_timestamp()
935
936                yield i
937        finally:
938            self._finalize()
939
940    def __next__(self):
941        try:
942            item = self.response.__next__()
943            self.items.append(item)
944
945            if self.completion_start_time is None:
946                self.completion_start_time = _get_timestamp()
947
948            return item
949
950        except StopIteration:
951            self._finalize()
952
953            raise
954
955    def __enter__(self):
956        return self.__iter__()
957
958    def __exit__(self, exc_type, exc_value, traceback):
959        pass
960
961    def _finalize(self):
962        model, completion, usage, metadata = (
963            _extract_streamed_response_api_response(self.items)
964            if self.resource.object == "Responses"
965            else _extract_streamed_openai_response(self.resource, self.items)
966        )
967
968        # Avoiding the trace-update if trace-id is provided by user.
969        if not self.is_nested_trace:
970            self.langfuse.trace(id=self.generation.trace_id, output=completion)
971
972        _create_langfuse_update(
973            completion,
974            self.generation,
975            self.completion_start_time,
976            model=model,
977            usage=usage,
978            metadata=metadata,
979        )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
910    def __init__(
911        self,
912        *,
913        resource,
914        response,
915        generation,
916        langfuse,
917        is_nested_trace,
918    ):
919        self.items = []
920
921        self.resource = resource
922        self.response = response
923        self.generation = generation
924        self.langfuse = langfuse
925        self.is_nested_trace = is_nested_trace
926        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
class LangfuseResponseGeneratorAsync:
 982class LangfuseResponseGeneratorAsync:
 983    def __init__(
 984        self,
 985        *,
 986        resource,
 987        response,
 988        generation,
 989        langfuse,
 990        is_nested_trace,
 991    ):
 992        self.items = []
 993
 994        self.resource = resource
 995        self.response = response
 996        self.generation = generation
 997        self.langfuse = langfuse
 998        self.is_nested_trace = is_nested_trace
 999        self.completion_start_time = None
1000
1001    async def __aiter__(self):
1002        try:
1003            async for i in self.response:
1004                self.items.append(i)
1005
1006                if self.completion_start_time is None:
1007                    self.completion_start_time = _get_timestamp()
1008
1009                yield i
1010        finally:
1011            await self._finalize()
1012
1013    async def __anext__(self):
1014        try:
1015            item = await self.response.__anext__()
1016            self.items.append(item)
1017
1018            if self.completion_start_time is None:
1019                self.completion_start_time = _get_timestamp()
1020
1021            return item
1022
1023        except StopAsyncIteration:
1024            await self._finalize()
1025
1026            raise
1027
1028    async def __aenter__(self):
1029        return self.__aiter__()
1030
1031    async def __aexit__(self, exc_type, exc_value, traceback):
1032        pass
1033
1034    async def _finalize(self):
1035        model, completion, usage, metadata = (
1036            _extract_streamed_response_api_response(self.items)
1037            if self.resource.object == "Responses"
1038            else _extract_streamed_openai_response(self.resource, self.items)
1039        )
1040
1041        # Avoiding the trace-update if trace-id is provided by user.
1042        if not self.is_nested_trace:
1043            self.langfuse.trace(id=self.generation.trace_id, output=completion)
1044
1045        _create_langfuse_update(
1046            completion,
1047            self.generation,
1048            self.completion_start_time,
1049            model=model,
1050            usage=usage,
1051            metadata=metadata,
1052        )
1053
1054    async def close(self) -> None:
1055        """Close the response and release the connection.
1056
1057        Automatically called if the response body is read to completion.
1058        """
1059        await self.response.close()
1060
1061    async def aclose(self) -> None:
1062        """Close the response and release the connection.
1063
1064        Automatically called if the response body is read to completion.
1065        """
1066        await self.response.aclose()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
983    def __init__(
984        self,
985        *,
986        resource,
987        response,
988        generation,
989        langfuse,
990        is_nested_trace,
991    ):
992        self.items = []
993
994        self.resource = resource
995        self.response = response
996        self.generation = generation
997        self.langfuse = langfuse
998        self.is_nested_trace = is_nested_trace
999        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
async def close(self) -> None:
1054    async def close(self) -> None:
1055        """Close the response and release the connection.
1056
1057        Automatically called if the response body is read to completion.
1058        """
1059        await self.response.close()

Close the response and release the connection.

Automatically called if the response body is read to completion.

async def aclose(self) -> None:
1061    async def aclose(self) -> None:
1062        """Close the response and release the connection.
1063
1064        Automatically called if the response body is read to completion.
1065        """
1066        await self.response.aclose()

Close the response and release the connection.

Automatically called if the response body is read to completion.