langfuse.openai

If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.

- import openai
+ from langfuse.openai import openai

Langfuse automatically tracks:

  • All prompts/completions with support for streaming, async and functions
  • Latencies
  • API Errors
  • Model usage (tokens) and cost (USD)

The integration is fully interoperable with the observe() decorator and the low-level tracing SDK.

See docs for more details: https://langfuse.com/docs/integrations/openai

  1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
  2
  3```diff
  4- import openai
  5+ from langfuse.openai import openai
  6```
  7
  8Langfuse automatically tracks:
  9
 10- All prompts/completions with support for streaming, async and functions
 11- Latencies
 12- API Errors
 13- Model usage (tokens) and cost (USD)
 14
 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
 16
 17See docs for more details: https://langfuse.com/docs/integrations/openai
 18"""
 19
 20import logging
 21import types
 22from collections import defaultdict
 23from dataclasses import dataclass
 24from inspect import isclass
 25from typing import Optional
 26
 27import openai.resources
 28from openai._types import NotGiven
 29from packaging.version import Version
 30from pydantic import BaseModel
 31from wrapt import wrap_function_wrapper
 32
 33from langfuse import Langfuse
 34from langfuse.client import StatefulGenerationClient
 35from langfuse.decorators import langfuse_context
 36from langfuse.media import LangfuseMedia
 37from langfuse.utils import _get_timestamp
 38from langfuse.utils.langfuse_singleton import LangfuseSingleton
 39
 40try:
 41    import openai
 42except ImportError:
 43    raise ModuleNotFoundError(
 44        "Please install OpenAI to use this feature: 'pip install openai'"
 45    )
 46
 47try:
 48    from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI  # noqa: F401
 49except ImportError:
 50    AsyncAzureOpenAI = None
 51    AsyncOpenAI = None
 52    AzureOpenAI = None
 53    OpenAI = None
 54
 55log = logging.getLogger("langfuse")
 56
 57
 58@dataclass
 59class OpenAiDefinition:
 60    module: str
 61    object: str
 62    method: str
 63    type: str
 64    sync: bool
 65    min_version: Optional[str] = None
 66
 67
 68OPENAI_METHODS_V0 = [
 69    OpenAiDefinition(
 70        module="openai",
 71        object="ChatCompletion",
 72        method="create",
 73        type="chat",
 74        sync=True,
 75    ),
 76    OpenAiDefinition(
 77        module="openai",
 78        object="Completion",
 79        method="create",
 80        type="completion",
 81        sync=True,
 82    ),
 83]
 84
 85
 86OPENAI_METHODS_V1 = [
 87    OpenAiDefinition(
 88        module="openai.resources.chat.completions",
 89        object="Completions",
 90        method="create",
 91        type="chat",
 92        sync=True,
 93    ),
 94    OpenAiDefinition(
 95        module="openai.resources.completions",
 96        object="Completions",
 97        method="create",
 98        type="completion",
 99        sync=True,
100    ),
101    OpenAiDefinition(
102        module="openai.resources.chat.completions",
103        object="AsyncCompletions",
104        method="create",
105        type="chat",
106        sync=False,
107    ),
108    OpenAiDefinition(
109        module="openai.resources.completions",
110        object="AsyncCompletions",
111        method="create",
112        type="completion",
113        sync=False,
114    ),
115    OpenAiDefinition(
116        module="openai.resources.beta.chat.completions",
117        object="Completions",
118        method="parse",
119        type="chat",
120        sync=True,
121        min_version="1.50.0",
122    ),
123    OpenAiDefinition(
124        module="openai.resources.beta.chat.completions",
125        object="AsyncCompletions",
126        method="parse",
127        type="chat",
128        sync=False,
129        min_version="1.50.0",
130    ),
131]
132
133
134class OpenAiArgsExtractor:
135    def __init__(
136        self,
137        name=None,
138        metadata=None,
139        trace_id=None,
140        session_id=None,
141        user_id=None,
142        tags=None,
143        parent_observation_id=None,
144        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
145        **kwargs,
146    ):
147        self.args = {}
148        self.args["name"] = name
149        self.args["metadata"] = (
150            metadata
151            if "response_format" not in kwargs
152            else {
153                **(metadata or {}),
154                "response_format": kwargs["response_format"].model_json_schema()
155                if isclass(kwargs["response_format"])
156                and issubclass(kwargs["response_format"], BaseModel)
157                else kwargs["response_format"],
158            }
159        )
160        self.args["trace_id"] = trace_id
161        self.args["session_id"] = session_id
162        self.args["user_id"] = user_id
163        self.args["tags"] = tags
164        self.args["parent_observation_id"] = parent_observation_id
165        self.args["langfuse_prompt"] = langfuse_prompt
166        self.kwargs = kwargs
167
168    def get_langfuse_args(self):
169        return {**self.args, **self.kwargs}
170
171    def get_openai_args(self):
172        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
173        # https://platform.openai.com/docs/guides/distillation
174        if self.kwargs.get("store", False):
175            self.kwargs["metadata"] = self.args.get("metadata", {})
176
177            # OpenAI does not support non-string type values in metadata when using
178            # model distillation feature
179            self.kwargs["metadata"].pop("response_format", None)
180
181        return self.kwargs
182
183
184def _langfuse_wrapper(func):
185    def _with_langfuse(open_ai_definitions, initialize):
186        def wrapper(wrapped, instance, args, kwargs):
187            return func(open_ai_definitions, initialize, wrapped, args, kwargs)
188
189        return wrapper
190
191    return _with_langfuse
192
193
194def _extract_chat_prompt(kwargs: any):
195    """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
196    prompt = {}
197
198    if kwargs.get("functions") is not None:
199        prompt.update({"functions": kwargs["functions"]})
200
201    if kwargs.get("function_call") is not None:
202        prompt.update({"function_call": kwargs["function_call"]})
203
204    if kwargs.get("tools") is not None:
205        prompt.update({"tools": kwargs["tools"]})
206
207    if prompt:
208        # uf user provided functions, we need to send these together with messages to langfuse
209        prompt.update(
210            {
211                "messages": [
212                    _process_message(message) for message in kwargs.get("messages", [])
213                ],
214            }
215        )
216        return prompt
217    else:
218        # vanilla case, only send messages in openai format to langfuse
219        return [_process_message(message) for message in kwargs.get("messages", [])]
220
221
222def _process_message(message):
223    if not isinstance(message, dict):
224        return message
225
226    processed_message = {**message}
227
228    content = processed_message.get("content", None)
229    if not isinstance(content, list):
230        return processed_message
231
232    processed_content = []
233
234    for content_part in content:
235        if content_part.get("type") == "input_audio":
236            audio_base64 = content_part.get("input_audio", {}).get("data", None)
237            format = content_part.get("input_audio", {}).get("format", "wav")
238
239            if audio_base64 is not None:
240                base64_data_uri = f"data:audio/{format};base64,{audio_base64}"
241
242                processed_content.append(
243                    {
244                        "type": "input_audio",
245                        "input_audio": {
246                            "data": LangfuseMedia(base64_data_uri=base64_data_uri),
247                            "format": format,
248                        },
249                    }
250                )
251        else:
252            processed_content.append(content_part)
253
254    processed_message["content"] = processed_content
255
256    return processed_message
257
258
259def _extract_chat_response(kwargs: any):
260    """Extracts the llm output from the response."""
261    response = {
262        "role": kwargs.get("role", None),
263    }
264
265    audio = None
266
267    if kwargs.get("function_call") is not None:
268        response.update({"function_call": kwargs["function_call"]})
269
270    if kwargs.get("tool_calls") is not None:
271        response.update({"tool_calls": kwargs["tool_calls"]})
272
273    if kwargs.get("audio") is not None:
274        audio = kwargs["audio"].__dict__
275
276        if "data" in audio and audio["data"] is not None:
277            base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}"
278            audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri)
279
280    response.update(
281        {
282            "content": kwargs.get("content", None),
283        }
284    )
285
286    if audio is not None:
287        response.update({"audio": audio})
288
289    return response
290
291
292def _get_langfuse_data_from_kwargs(
293    resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
294):
295    name = kwargs.get("name", "OpenAI-generation")
296
297    if name is None:
298        name = "OpenAI-generation"
299
300    if name is not None and not isinstance(name, str):
301        raise TypeError("name must be a string")
302
303    decorator_context_observation_id = langfuse_context.get_current_observation_id()
304    decorator_context_trace_id = langfuse_context.get_current_trace_id()
305
306    trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
307    if trace_id is not None and not isinstance(trace_id, str):
308        raise TypeError("trace_id must be a string")
309
310    session_id = kwargs.get("session_id", None)
311    if session_id is not None and not isinstance(session_id, str):
312        raise TypeError("session_id must be a string")
313
314    user_id = kwargs.get("user_id", None)
315    if user_id is not None and not isinstance(user_id, str):
316        raise TypeError("user_id must be a string")
317
318    tags = kwargs.get("tags", None)
319    if tags is not None and (
320        not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
321    ):
322        raise TypeError("tags must be a list of strings")
323
324    # Update trace params in decorator context if specified in openai call
325    if decorator_context_trace_id:
326        langfuse_context.update_current_trace(
327            session_id=session_id, user_id=user_id, tags=tags
328        )
329
330    parent_observation_id = kwargs.get("parent_observation_id", None) or (
331        decorator_context_observation_id
332        if decorator_context_observation_id != decorator_context_trace_id
333        else None
334    )
335    if parent_observation_id is not None and not isinstance(parent_observation_id, str):
336        raise TypeError("parent_observation_id must be a string")
337    if parent_observation_id is not None and trace_id is None:
338        raise ValueError("parent_observation_id requires trace_id to be set")
339
340    metadata = kwargs.get("metadata", {})
341
342    if metadata is not None and not isinstance(metadata, dict):
343        raise TypeError("metadata must be a dictionary")
344
345    model = kwargs.get("model", None) or None
346
347    prompt = None
348
349    if resource.type == "completion":
350        prompt = kwargs.get("prompt", None)
351    elif resource.type == "chat":
352        prompt = _extract_chat_prompt(kwargs)
353
354    is_nested_trace = False
355    if trace_id:
356        is_nested_trace = True
357        langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
358    else:
359        trace_id = (
360            decorator_context_trace_id
361            or langfuse.trace(
362                session_id=session_id,
363                user_id=user_id,
364                tags=tags,
365                name=name,
366                input=prompt,
367                metadata=metadata,
368            ).id
369        )
370
371    parsed_temperature = (
372        kwargs.get("temperature", 1)
373        if not isinstance(kwargs.get("temperature", 1), NotGiven)
374        else 1
375    )
376
377    parsed_max_tokens = (
378        kwargs.get("max_tokens", float("inf"))
379        if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven)
380        else float("inf")
381    )
382
383    parsed_top_p = (
384        kwargs.get("top_p", 1)
385        if not isinstance(kwargs.get("top_p", 1), NotGiven)
386        else 1
387    )
388
389    parsed_frequency_penalty = (
390        kwargs.get("frequency_penalty", 0)
391        if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven)
392        else 0
393    )
394
395    parsed_presence_penalty = (
396        kwargs.get("presence_penalty", 0)
397        if not isinstance(kwargs.get("presence_penalty", 0), NotGiven)
398        else 0
399    )
400
401    parsed_seed = (
402        kwargs.get("seed", None)
403        if not isinstance(kwargs.get("seed", None), NotGiven)
404        else None
405    )
406
407    parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1
408
409    modelParameters = {
410        "temperature": parsed_temperature,
411        "max_tokens": parsed_max_tokens,  # casing?
412        "top_p": parsed_top_p,
413        "frequency_penalty": parsed_frequency_penalty,
414        "presence_penalty": parsed_presence_penalty,
415    }
416    if parsed_n is not None and parsed_n > 1:
417        modelParameters["n"] = parsed_n
418
419    if parsed_seed is not None:
420        modelParameters["seed"] = parsed_seed
421
422    langfuse_prompt = kwargs.get("langfuse_prompt", None)
423
424    return {
425        "name": name,
426        "metadata": metadata,
427        "trace_id": trace_id,
428        "parent_observation_id": parent_observation_id,
429        "user_id": user_id,
430        "start_time": start_time,
431        "input": prompt,
432        "model_parameters": modelParameters,
433        "model": model or None,
434        "prompt": langfuse_prompt,
435    }, is_nested_trace
436
437
438def _create_langfuse_update(
439    completion,
440    generation: StatefulGenerationClient,
441    completion_start_time,
442    model=None,
443    usage=None,
444):
445    update = {
446        "end_time": _get_timestamp(),
447        "output": completion,
448        "completion_start_time": completion_start_time,
449    }
450    if model is not None:
451        update["model"] = model
452
453    if usage is not None:
454        update["usage"] = _parse_usage(usage)
455
456    generation.update(**update)
457
458
459def _parse_usage(usage=None):
460    if usage is None:
461        return
462
463    usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__.copy()
464
465    for tokens_details in ["prompt_tokens_details", "completion_tokens_details"]:
466        if tokens_details in usage_dict and usage_dict[tokens_details] is not None:
467            tokens_details_dict = (
468                usage_dict[tokens_details]
469                if isinstance(usage_dict[tokens_details], dict)
470                else usage_dict[tokens_details].__dict__
471            )
472            usage_dict[tokens_details] = {
473                k: v for k, v in tokens_details_dict.items() if v is not None
474            }
475
476    return usage_dict
477
478
479def _extract_streamed_openai_response(resource, chunks):
480    completion = defaultdict(str) if resource.type == "chat" else ""
481    model, usage = None, None
482
483    for chunk in chunks:
484        if _is_openai_v1():
485            chunk = chunk.__dict__
486
487        model = model or chunk.get("model", None) or None
488        usage = chunk.get("usage", None)
489
490        choices = chunk.get("choices", [])
491
492        for choice in choices:
493            if _is_openai_v1():
494                choice = choice.__dict__
495            if resource.type == "chat":
496                delta = choice.get("delta", None)
497
498                if _is_openai_v1():
499                    delta = delta.__dict__
500
501                if delta.get("role", None) is not None:
502                    completion["role"] = delta["role"]
503
504                if delta.get("content", None) is not None:
505                    completion["content"] = (
506                        delta.get("content", None)
507                        if completion["content"] is None
508                        else completion["content"] + delta.get("content", None)
509                    )
510                elif delta.get("function_call", None) is not None:
511                    curr = completion["function_call"]
512                    tool_call_chunk = delta.get("function_call", None)
513
514                    if not curr:
515                        completion["function_call"] = {
516                            "name": getattr(tool_call_chunk, "name", ""),
517                            "arguments": getattr(tool_call_chunk, "arguments", ""),
518                        }
519
520                    else:
521                        curr["name"] = curr["name"] or getattr(
522                            tool_call_chunk, "name", None
523                        )
524                        curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
525
526                elif delta.get("tool_calls", None) is not None:
527                    curr = completion["tool_calls"]
528                    tool_call_chunk = getattr(
529                        delta.get("tool_calls", None)[0], "function", None
530                    )
531
532                    if not curr:
533                        completion["tool_calls"] = [
534                            {
535                                "name": getattr(tool_call_chunk, "name", ""),
536                                "arguments": getattr(tool_call_chunk, "arguments", ""),
537                            }
538                        ]
539
540                    elif getattr(tool_call_chunk, "name", None) is not None:
541                        curr.append(
542                            {
543                                "name": getattr(tool_call_chunk, "name", None),
544                                "arguments": getattr(
545                                    tool_call_chunk, "arguments", None
546                                ),
547                            }
548                        )
549
550                    else:
551                        curr[-1]["name"] = curr[-1]["name"] or getattr(
552                            tool_call_chunk, "name", None
553                        )
554                        curr[-1]["arguments"] += getattr(
555                            tool_call_chunk, "arguments", None
556                        )
557
558            if resource.type == "completion":
559                completion += choice.get("text", None)
560
561    def get_response_for_chat():
562        return (
563            completion["content"]
564            or (
565                completion["function_call"]
566                and {
567                    "role": "assistant",
568                    "function_call": completion["function_call"],
569                }
570            )
571            or (
572                completion["tool_calls"]
573                and {
574                    "role": "assistant",
575                    # "tool_calls": [{"function": completion["tool_calls"]}],
576                    "tool_calls": [
577                        {"function": data} for data in completion["tool_calls"]
578                    ],
579                }
580            )
581            or None
582        )
583
584    return (
585        model,
586        get_response_for_chat() if resource.type == "chat" else completion,
587        usage,
588    )
589
590
591def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
592    if response is None:
593        return None, "<NoneType response returned from OpenAI>", None
594
595    model = response.get("model", None) or None
596
597    completion = None
598    if resource.type == "completion":
599        choices = response.get("choices", [])
600        if len(choices) > 0:
601            choice = choices[-1]
602
603            completion = choice.text if _is_openai_v1() else choice.get("text", None)
604    elif resource.type == "chat":
605        choices = response.get("choices", [])
606        if len(choices) > 0:
607            # If multiple choices were generated, we'll show all of them in the UI as a list.
608            if len(choices) > 1:
609                completion = [
610                    _extract_chat_response(choice.message.__dict__)
611                    if _is_openai_v1()
612                    else choice.get("message", None)
613                    for choice in choices
614                ]
615            else:
616                choice = choices[0]
617                completion = (
618                    _extract_chat_response(choice.message.__dict__)
619                    if _is_openai_v1()
620                    else choice.get("message", None)
621                )
622
623    usage = _parse_usage(response.get("usage", None))
624
625    return (model, completion, usage)
626
627
628def _is_openai_v1():
629    return Version(openai.__version__) >= Version("1.0.0")
630
631
632def _is_streaming_response(response):
633    return (
634        isinstance(response, types.GeneratorType)
635        or isinstance(response, types.AsyncGeneratorType)
636        or (_is_openai_v1() and isinstance(response, openai.Stream))
637        or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
638    )
639
640
641@_langfuse_wrapper
642def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
643    new_langfuse: Langfuse = initialize()
644
645    start_time = _get_timestamp()
646    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
647
648    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
649        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
650    )
651    generation = new_langfuse.generation(**generation)
652    try:
653        openai_response = wrapped(**arg_extractor.get_openai_args())
654
655        if _is_streaming_response(openai_response):
656            return LangfuseResponseGeneratorSync(
657                resource=open_ai_resource,
658                response=openai_response,
659                generation=generation,
660                langfuse=new_langfuse,
661                is_nested_trace=is_nested_trace,
662            )
663
664        else:
665            model, completion, usage = _get_langfuse_data_from_default_response(
666                open_ai_resource,
667                (openai_response and openai_response.__dict__)
668                if _is_openai_v1()
669                else openai_response,
670            )
671            generation.update(
672                model=model,
673                output=completion,
674                end_time=_get_timestamp(),
675                usage=usage,  # backward compat for all V2 self hosters
676                usage_details=usage,
677            )
678
679            # Avoiding the trace-update if trace-id is provided by user.
680            if not is_nested_trace:
681                new_langfuse.trace(id=generation.trace_id, output=completion)
682
683        return openai_response
684    except Exception as ex:
685        log.warning(ex)
686        model = kwargs.get("model", None) or None
687        generation.update(
688            end_time=_get_timestamp(),
689            status_message=str(ex),
690            level="ERROR",
691            model=model,
692            usage={
693                "input_cost": 0,
694                "output_cost": 0,
695                "total_cost": 0,
696            },  # backward compat for all V2 self hosters
697            cost_details={"input": 0, "output": 0, "total": 0},
698        )
699        raise ex
700
701
702@_langfuse_wrapper
703async def _wrap_async(
704    open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
705):
706    new_langfuse = initialize()
707    start_time = _get_timestamp()
708    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
709
710    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
711        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
712    )
713    generation = new_langfuse.generation(**generation)
714    try:
715        openai_response = await wrapped(**arg_extractor.get_openai_args())
716
717        if _is_streaming_response(openai_response):
718            return LangfuseResponseGeneratorAsync(
719                resource=open_ai_resource,
720                response=openai_response,
721                generation=generation,
722                langfuse=new_langfuse,
723                is_nested_trace=is_nested_trace,
724            )
725
726        else:
727            model, completion, usage = _get_langfuse_data_from_default_response(
728                open_ai_resource,
729                (openai_response and openai_response.__dict__)
730                if _is_openai_v1()
731                else openai_response,
732            )
733            generation.update(
734                model=model,
735                output=completion,
736                end_time=_get_timestamp(),
737                usage=usage,  # backward compat for all V2 self hosters
738                usage_details=usage,
739            )
740            # Avoiding the trace-update if trace-id is provided by user.
741            if not is_nested_trace:
742                new_langfuse.trace(id=generation.trace_id, output=completion)
743
744        return openai_response
745    except Exception as ex:
746        model = kwargs.get("model", None) or None
747        generation.update(
748            end_time=_get_timestamp(),
749            status_message=str(ex),
750            level="ERROR",
751            model=model,
752            usage={
753                "input_cost": 0,
754                "output_cost": 0,
755                "total_cost": 0,
756            },  # Backward compat for all V2 self hosters
757            cost_details={"input": 0, "output": 0, "total": 0},
758        )
759        raise ex
760
761
762class OpenAILangfuse:
763    _langfuse: Optional[Langfuse] = None
764
765    def initialize(self):
766        self._langfuse = LangfuseSingleton().get(
767            public_key=openai.langfuse_public_key,
768            secret_key=openai.langfuse_secret_key,
769            host=openai.langfuse_host,
770            debug=openai.langfuse_debug,
771            enabled=openai.langfuse_enabled,
772            sdk_integration="openai",
773            sample_rate=openai.langfuse_sample_rate,
774        )
775
776        return self._langfuse
777
778    def flush(cls):
779        cls._langfuse.flush()
780
781    def langfuse_auth_check(self):
782        """Check if the provided Langfuse credentials (public and secret key) are valid.
783
784        Raises:
785            Exception: If no projects were found for the provided credentials.
786
787        Note:
788            This method is blocking. It is discouraged to use it in production code.
789        """
790        if self._langfuse is None:
791            self.initialize()
792
793        return self._langfuse.auth_check()
794
795    def register_tracing(self):
796        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
797
798        for resource in resources:
799            if resource.min_version is not None and Version(
800                openai.__version__
801            ) < Version(resource.min_version):
802                continue
803
804            wrap_function_wrapper(
805                resource.module,
806                f"{resource.object}.{resource.method}",
807                _wrap(resource, self.initialize)
808                if resource.sync
809                else _wrap_async(resource, self.initialize),
810            )
811
812        setattr(openai, "langfuse_public_key", None)
813        setattr(openai, "langfuse_secret_key", None)
814        setattr(openai, "langfuse_host", None)
815        setattr(openai, "langfuse_debug", None)
816        setattr(openai, "langfuse_enabled", True)
817        setattr(openai, "langfuse_sample_rate", None)
818        setattr(openai, "langfuse_mask", None)
819        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
820        setattr(openai, "flush_langfuse", self.flush)
821
822
823modifier = OpenAILangfuse()
824modifier.register_tracing()
825
826
827# DEPRECATED: Use `openai.langfuse_auth_check()` instead
828def auth_check():
829    if modifier._langfuse is None:
830        modifier.initialize()
831
832    return modifier._langfuse.auth_check()
833
834
835class LangfuseResponseGeneratorSync:
836    def __init__(
837        self,
838        *,
839        resource,
840        response,
841        generation,
842        langfuse,
843        is_nested_trace,
844    ):
845        self.items = []
846
847        self.resource = resource
848        self.response = response
849        self.generation = generation
850        self.langfuse = langfuse
851        self.is_nested_trace = is_nested_trace
852        self.completion_start_time = None
853
854    def __iter__(self):
855        try:
856            for i in self.response:
857                self.items.append(i)
858
859                if self.completion_start_time is None:
860                    self.completion_start_time = _get_timestamp()
861
862                yield i
863        finally:
864            self._finalize()
865
866    def __next__(self):
867        try:
868            item = self.response.__next__()
869            self.items.append(item)
870
871            if self.completion_start_time is None:
872                self.completion_start_time = _get_timestamp()
873
874            return item
875
876        except StopIteration:
877            self._finalize()
878
879            raise
880
881    def __enter__(self):
882        return self.__iter__()
883
884    def __exit__(self, exc_type, exc_value, traceback):
885        pass
886
887    def _finalize(self):
888        model, completion, usage = _extract_streamed_openai_response(
889            self.resource, self.items
890        )
891
892        # Avoiding the trace-update if trace-id is provided by user.
893        if not self.is_nested_trace:
894            self.langfuse.trace(id=self.generation.trace_id, output=completion)
895
896        _create_langfuse_update(
897            completion,
898            self.generation,
899            self.completion_start_time,
900            model=model,
901            usage=usage,
902        )
903
904
905class LangfuseResponseGeneratorAsync:
906    def __init__(
907        self,
908        *,
909        resource,
910        response,
911        generation,
912        langfuse,
913        is_nested_trace,
914    ):
915        self.items = []
916
917        self.resource = resource
918        self.response = response
919        self.generation = generation
920        self.langfuse = langfuse
921        self.is_nested_trace = is_nested_trace
922        self.completion_start_time = None
923
924    async def __aiter__(self):
925        try:
926            async for i in self.response:
927                self.items.append(i)
928
929                if self.completion_start_time is None:
930                    self.completion_start_time = _get_timestamp()
931
932                yield i
933        finally:
934            await self._finalize()
935
936    async def __anext__(self):
937        try:
938            item = await self.response.__anext__()
939            self.items.append(item)
940
941            if self.completion_start_time is None:
942                self.completion_start_time = _get_timestamp()
943
944            return item
945
946        except StopAsyncIteration:
947            await self._finalize()
948
949            raise
950
951    async def __aenter__(self):
952        return self.__aiter__()
953
954    async def __aexit__(self, exc_type, exc_value, traceback):
955        pass
956
957    async def _finalize(self):
958        model, completion, usage = _extract_streamed_openai_response(
959            self.resource, self.items
960        )
961
962        # Avoiding the trace-update if trace-id is provided by user.
963        if not self.is_nested_trace:
964            self.langfuse.trace(id=self.generation.trace_id, output=completion)
965
966        _create_langfuse_update(
967            completion,
968            self.generation,
969            self.completion_start_time,
970            model=model,
971            usage=usage,
972        )
973
974    async def close(self) -> None:
975        """Close the response and release the connection.
976
977        Automatically called if the response body is read to completion.
978        """
979        await self.response.close()
980
981    async def aclose(self) -> None:
982        """Close the response and release the connection.
983
984        Automatically called if the response body is read to completion.
985        """
986        await self.response.aclose()
log = <Logger langfuse (WARNING)>
@dataclass
class OpenAiDefinition:
59@dataclass
60class OpenAiDefinition:
61    module: str
62    object: str
63    method: str
64    type: str
65    sync: bool
66    min_version: Optional[str] = None
OpenAiDefinition( module: str, object: str, method: str, type: str, sync: bool, min_version: Optional[str] = None)
module: str
object: str
method: str
type: str
sync: bool
min_version: Optional[str] = None
OPENAI_METHODS_V0 = [OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 = [OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0')]
class OpenAiArgsExtractor:
135class OpenAiArgsExtractor:
136    def __init__(
137        self,
138        name=None,
139        metadata=None,
140        trace_id=None,
141        session_id=None,
142        user_id=None,
143        tags=None,
144        parent_observation_id=None,
145        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
146        **kwargs,
147    ):
148        self.args = {}
149        self.args["name"] = name
150        self.args["metadata"] = (
151            metadata
152            if "response_format" not in kwargs
153            else {
154                **(metadata or {}),
155                "response_format": kwargs["response_format"].model_json_schema()
156                if isclass(kwargs["response_format"])
157                and issubclass(kwargs["response_format"], BaseModel)
158                else kwargs["response_format"],
159            }
160        )
161        self.args["trace_id"] = trace_id
162        self.args["session_id"] = session_id
163        self.args["user_id"] = user_id
164        self.args["tags"] = tags
165        self.args["parent_observation_id"] = parent_observation_id
166        self.args["langfuse_prompt"] = langfuse_prompt
167        self.kwargs = kwargs
168
169    def get_langfuse_args(self):
170        return {**self.args, **self.kwargs}
171
172    def get_openai_args(self):
173        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
174        # https://platform.openai.com/docs/guides/distillation
175        if self.kwargs.get("store", False):
176            self.kwargs["metadata"] = self.args.get("metadata", {})
177
178            # OpenAI does not support non-string type values in metadata when using
179            # model distillation feature
180            self.kwargs["metadata"].pop("response_format", None)
181
182        return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
136    def __init__(
137        self,
138        name=None,
139        metadata=None,
140        trace_id=None,
141        session_id=None,
142        user_id=None,
143        tags=None,
144        parent_observation_id=None,
145        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
146        **kwargs,
147    ):
148        self.args = {}
149        self.args["name"] = name
150        self.args["metadata"] = (
151            metadata
152            if "response_format" not in kwargs
153            else {
154                **(metadata or {}),
155                "response_format": kwargs["response_format"].model_json_schema()
156                if isclass(kwargs["response_format"])
157                and issubclass(kwargs["response_format"], BaseModel)
158                else kwargs["response_format"],
159            }
160        )
161        self.args["trace_id"] = trace_id
162        self.args["session_id"] = session_id
163        self.args["user_id"] = user_id
164        self.args["tags"] = tags
165        self.args["parent_observation_id"] = parent_observation_id
166        self.args["langfuse_prompt"] = langfuse_prompt
167        self.kwargs = kwargs
args
kwargs
def get_langfuse_args(self):
169    def get_langfuse_args(self):
170        return {**self.args, **self.kwargs}
def get_openai_args(self):
172    def get_openai_args(self):
173        # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
174        # https://platform.openai.com/docs/guides/distillation
175        if self.kwargs.get("store", False):
176            self.kwargs["metadata"] = self.args.get("metadata", {})
177
178            # OpenAI does not support non-string type values in metadata when using
179            # model distillation feature
180            self.kwargs["metadata"].pop("response_format", None)
181
182        return self.kwargs
class OpenAILangfuse:
763class OpenAILangfuse:
764    _langfuse: Optional[Langfuse] = None
765
766    def initialize(self):
767        self._langfuse = LangfuseSingleton().get(
768            public_key=openai.langfuse_public_key,
769            secret_key=openai.langfuse_secret_key,
770            host=openai.langfuse_host,
771            debug=openai.langfuse_debug,
772            enabled=openai.langfuse_enabled,
773            sdk_integration="openai",
774            sample_rate=openai.langfuse_sample_rate,
775        )
776
777        return self._langfuse
778
779    def flush(cls):
780        cls._langfuse.flush()
781
782    def langfuse_auth_check(self):
783        """Check if the provided Langfuse credentials (public and secret key) are valid.
784
785        Raises:
786            Exception: If no projects were found for the provided credentials.
787
788        Note:
789            This method is blocking. It is discouraged to use it in production code.
790        """
791        if self._langfuse is None:
792            self.initialize()
793
794        return self._langfuse.auth_check()
795
796    def register_tracing(self):
797        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
798
799        for resource in resources:
800            if resource.min_version is not None and Version(
801                openai.__version__
802            ) < Version(resource.min_version):
803                continue
804
805            wrap_function_wrapper(
806                resource.module,
807                f"{resource.object}.{resource.method}",
808                _wrap(resource, self.initialize)
809                if resource.sync
810                else _wrap_async(resource, self.initialize),
811            )
812
813        setattr(openai, "langfuse_public_key", None)
814        setattr(openai, "langfuse_secret_key", None)
815        setattr(openai, "langfuse_host", None)
816        setattr(openai, "langfuse_debug", None)
817        setattr(openai, "langfuse_enabled", True)
818        setattr(openai, "langfuse_sample_rate", None)
819        setattr(openai, "langfuse_mask", None)
820        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
821        setattr(openai, "flush_langfuse", self.flush)
def initialize(self):
766    def initialize(self):
767        self._langfuse = LangfuseSingleton().get(
768            public_key=openai.langfuse_public_key,
769            secret_key=openai.langfuse_secret_key,
770            host=openai.langfuse_host,
771            debug=openai.langfuse_debug,
772            enabled=openai.langfuse_enabled,
773            sdk_integration="openai",
774            sample_rate=openai.langfuse_sample_rate,
775        )
776
777        return self._langfuse
def flush(cls):
779    def flush(cls):
780        cls._langfuse.flush()
def langfuse_auth_check(self):
782    def langfuse_auth_check(self):
783        """Check if the provided Langfuse credentials (public and secret key) are valid.
784
785        Raises:
786            Exception: If no projects were found for the provided credentials.
787
788        Note:
789            This method is blocking. It is discouraged to use it in production code.
790        """
791        if self._langfuse is None:
792            self.initialize()
793
794        return self._langfuse.auth_check()

Check if the provided Langfuse credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def register_tracing(self):
796    def register_tracing(self):
797        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
798
799        for resource in resources:
800            if resource.min_version is not None and Version(
801                openai.__version__
802            ) < Version(resource.min_version):
803                continue
804
805            wrap_function_wrapper(
806                resource.module,
807                f"{resource.object}.{resource.method}",
808                _wrap(resource, self.initialize)
809                if resource.sync
810                else _wrap_async(resource, self.initialize),
811            )
812
813        setattr(openai, "langfuse_public_key", None)
814        setattr(openai, "langfuse_secret_key", None)
815        setattr(openai, "langfuse_host", None)
816        setattr(openai, "langfuse_debug", None)
817        setattr(openai, "langfuse_enabled", True)
818        setattr(openai, "langfuse_sample_rate", None)
819        setattr(openai, "langfuse_mask", None)
820        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
821        setattr(openai, "flush_langfuse", self.flush)
modifier = <OpenAILangfuse object>
def auth_check():
829def auth_check():
830    if modifier._langfuse is None:
831        modifier.initialize()
832
833    return modifier._langfuse.auth_check()
class LangfuseResponseGeneratorSync:
836class LangfuseResponseGeneratorSync:
837    def __init__(
838        self,
839        *,
840        resource,
841        response,
842        generation,
843        langfuse,
844        is_nested_trace,
845    ):
846        self.items = []
847
848        self.resource = resource
849        self.response = response
850        self.generation = generation
851        self.langfuse = langfuse
852        self.is_nested_trace = is_nested_trace
853        self.completion_start_time = None
854
855    def __iter__(self):
856        try:
857            for i in self.response:
858                self.items.append(i)
859
860                if self.completion_start_time is None:
861                    self.completion_start_time = _get_timestamp()
862
863                yield i
864        finally:
865            self._finalize()
866
867    def __next__(self):
868        try:
869            item = self.response.__next__()
870            self.items.append(item)
871
872            if self.completion_start_time is None:
873                self.completion_start_time = _get_timestamp()
874
875            return item
876
877        except StopIteration:
878            self._finalize()
879
880            raise
881
882    def __enter__(self):
883        return self.__iter__()
884
885    def __exit__(self, exc_type, exc_value, traceback):
886        pass
887
888    def _finalize(self):
889        model, completion, usage = _extract_streamed_openai_response(
890            self.resource, self.items
891        )
892
893        # Avoiding the trace-update if trace-id is provided by user.
894        if not self.is_nested_trace:
895            self.langfuse.trace(id=self.generation.trace_id, output=completion)
896
897        _create_langfuse_update(
898            completion,
899            self.generation,
900            self.completion_start_time,
901            model=model,
902            usage=usage,
903        )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
837    def __init__(
838        self,
839        *,
840        resource,
841        response,
842        generation,
843        langfuse,
844        is_nested_trace,
845    ):
846        self.items = []
847
848        self.resource = resource
849        self.response = response
850        self.generation = generation
851        self.langfuse = langfuse
852        self.is_nested_trace = is_nested_trace
853        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
class LangfuseResponseGeneratorAsync:
906class LangfuseResponseGeneratorAsync:
907    def __init__(
908        self,
909        *,
910        resource,
911        response,
912        generation,
913        langfuse,
914        is_nested_trace,
915    ):
916        self.items = []
917
918        self.resource = resource
919        self.response = response
920        self.generation = generation
921        self.langfuse = langfuse
922        self.is_nested_trace = is_nested_trace
923        self.completion_start_time = None
924
925    async def __aiter__(self):
926        try:
927            async for i in self.response:
928                self.items.append(i)
929
930                if self.completion_start_time is None:
931                    self.completion_start_time = _get_timestamp()
932
933                yield i
934        finally:
935            await self._finalize()
936
937    async def __anext__(self):
938        try:
939            item = await self.response.__anext__()
940            self.items.append(item)
941
942            if self.completion_start_time is None:
943                self.completion_start_time = _get_timestamp()
944
945            return item
946
947        except StopAsyncIteration:
948            await self._finalize()
949
950            raise
951
952    async def __aenter__(self):
953        return self.__aiter__()
954
955    async def __aexit__(self, exc_type, exc_value, traceback):
956        pass
957
958    async def _finalize(self):
959        model, completion, usage = _extract_streamed_openai_response(
960            self.resource, self.items
961        )
962
963        # Avoiding the trace-update if trace-id is provided by user.
964        if not self.is_nested_trace:
965            self.langfuse.trace(id=self.generation.trace_id, output=completion)
966
967        _create_langfuse_update(
968            completion,
969            self.generation,
970            self.completion_start_time,
971            model=model,
972            usage=usage,
973        )
974
975    async def close(self) -> None:
976        """Close the response and release the connection.
977
978        Automatically called if the response body is read to completion.
979        """
980        await self.response.close()
981
982    async def aclose(self) -> None:
983        """Close the response and release the connection.
984
985        Automatically called if the response body is read to completion.
986        """
987        await self.response.aclose()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
907    def __init__(
908        self,
909        *,
910        resource,
911        response,
912        generation,
913        langfuse,
914        is_nested_trace,
915    ):
916        self.items = []
917
918        self.resource = resource
919        self.response = response
920        self.generation = generation
921        self.langfuse = langfuse
922        self.is_nested_trace = is_nested_trace
923        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
async def close(self) -> None:
975    async def close(self) -> None:
976        """Close the response and release the connection.
977
978        Automatically called if the response body is read to completion.
979        """
980        await self.response.close()

Close the response and release the connection.

Automatically called if the response body is read to completion.

async def aclose(self) -> None:
982    async def aclose(self) -> None:
983        """Close the response and release the connection.
984
985        Automatically called if the response body is read to completion.
986        """
987        await self.response.aclose()

Close the response and release the connection.

Automatically called if the response body is read to completion.