langfuse.openai

If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.

- import openai
+ from langfuse.openai import openai

Langfuse automatically tracks:

  • All prompts/completions with support for streaming, async and functions
  • Latencies
  • API Errors
  • Model usage (tokens) and cost (USD)

The integration is fully interoperable with the observe() decorator and the low-level tracing SDK.

See docs for more details: https://langfuse.com/docs/integrations/openai

  1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
  2
  3```diff
  4- import openai
  5+ from langfuse.openai import openai
  6```
  7
  8Langfuse automatically tracks:
  9
 10- All prompts/completions with support for streaming, async and functions
 11- Latencies
 12- API Errors
 13- Model usage (tokens) and cost (USD)
 14
 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
 16
 17See docs for more details: https://langfuse.com/docs/integrations/openai
 18"""
 19
 20import logging
 21import types
 22from collections import defaultdict
 23from dataclasses import dataclass
 24from inspect import isclass
 25from typing import Optional
 26
 27
 28import openai.resources
 29from openai._types import NotGiven
 30from packaging.version import Version
 31from pydantic import BaseModel
 32from wrapt import wrap_function_wrapper
 33
 34from langfuse import Langfuse
 35from langfuse.client import StatefulGenerationClient
 36from langfuse.decorators import langfuse_context
 37from langfuse.media import LangfuseMedia
 38from langfuse.utils import _get_timestamp
 39from langfuse.utils.langfuse_singleton import LangfuseSingleton
 40
 41try:
 42    import openai
 43except ImportError:
 44    raise ModuleNotFoundError(
 45        "Please install OpenAI to use this feature: 'pip install openai'"
 46    )
 47
 48try:
 49    from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI  # noqa: F401
 50except ImportError:
 51    AsyncAzureOpenAI = None
 52    AsyncOpenAI = None
 53    AzureOpenAI = None
 54    OpenAI = None
 55
 56log = logging.getLogger("langfuse")
 57
 58
 59@dataclass
 60class OpenAiDefinition:
 61    module: str
 62    object: str
 63    method: str
 64    type: str
 65    sync: bool
 66    min_version: Optional[str] = None
 67
 68
 69OPENAI_METHODS_V0 = [
 70    OpenAiDefinition(
 71        module="openai",
 72        object="ChatCompletion",
 73        method="create",
 74        type="chat",
 75        sync=True,
 76    ),
 77    OpenAiDefinition(
 78        module="openai",
 79        object="Completion",
 80        method="create",
 81        type="completion",
 82        sync=True,
 83    ),
 84]
 85
 86
 87OPENAI_METHODS_V1 = [
 88    OpenAiDefinition(
 89        module="openai.resources.chat.completions",
 90        object="Completions",
 91        method="create",
 92        type="chat",
 93        sync=True,
 94    ),
 95    OpenAiDefinition(
 96        module="openai.resources.completions",
 97        object="Completions",
 98        method="create",
 99        type="completion",
100        sync=True,
101    ),
102    OpenAiDefinition(
103        module="openai.resources.chat.completions",
104        object="AsyncCompletions",
105        method="create",
106        type="chat",
107        sync=False,
108    ),
109    OpenAiDefinition(
110        module="openai.resources.completions",
111        object="AsyncCompletions",
112        method="create",
113        type="completion",
114        sync=False,
115    ),
116    OpenAiDefinition(
117        module="openai.resources.beta.chat.completions",
118        object="Completions",
119        method="parse",
120        type="chat",
121        sync=True,
122        min_version="1.50.0",
123    ),
124    OpenAiDefinition(
125        module="openai.resources.beta.chat.completions",
126        object="AsyncCompletions",
127        method="parse",
128        type="chat",
129        sync=False,
130        min_version="1.50.0",
131    ),
132]
133
134
135class OpenAiArgsExtractor:
136    def __init__(
137        self,
138        name=None,
139        metadata=None,
140        trace_id=None,
141        session_id=None,
142        user_id=None,
143        tags=None,
144        parent_observation_id=None,
145        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
146        **kwargs,
147    ):
148        self.args = {}
149        self.args["name"] = name
150        self.args["metadata"] = (
151            metadata
152            if "response_format" not in kwargs
153            else {
154                **(metadata or {}),
155                "response_format": kwargs["response_format"].model_json_schema()
156                if isclass(kwargs["response_format"])
157                and issubclass(kwargs["response_format"], BaseModel)
158                else kwargs["response_format"],
159            }
160        )
161        self.args["trace_id"] = trace_id
162        self.args["session_id"] = session_id
163        self.args["user_id"] = user_id
164        self.args["tags"] = tags
165        self.args["parent_observation_id"] = parent_observation_id
166        self.args["langfuse_prompt"] = langfuse_prompt
167        self.kwargs = kwargs
168
169    def get_langfuse_args(self):
170        return {**self.args, **self.kwargs}
171
172    def get_openai_args(self):
173        return self.kwargs
174
175
176def _langfuse_wrapper(func):
177    def _with_langfuse(open_ai_definitions, initialize):
178        def wrapper(wrapped, instance, args, kwargs):
179            return func(open_ai_definitions, initialize, wrapped, args, kwargs)
180
181        return wrapper
182
183    return _with_langfuse
184
185
186def _extract_chat_prompt(kwargs: any):
187    """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
188    prompt = {}
189
190    if kwargs.get("functions") is not None:
191        prompt.update({"functions": kwargs["functions"]})
192
193    if kwargs.get("function_call") is not None:
194        prompt.update({"function_call": kwargs["function_call"]})
195
196    if kwargs.get("tools") is not None:
197        prompt.update({"tools": kwargs["tools"]})
198
199    if prompt:
200        # uf user provided functions, we need to send these together with messages to langfuse
201        prompt.update(
202            {
203                "messages": [
204                    _process_message(message) for message in kwargs.get("messages", [])
205                ],
206            }
207        )
208        return prompt
209    else:
210        # vanilla case, only send messages in openai format to langfuse
211        return [_process_message(message) for message in kwargs.get("messages", [])]
212
213
214def _process_message(message):
215    if not isinstance(message, dict):
216        return message
217
218    processed_message = {**message}
219
220    content = processed_message.get("content", None)
221    if not isinstance(content, list):
222        return processed_message
223
224    processed_content = []
225
226    for content_part in content:
227        if content_part.get("type") == "input_audio":
228            audio_base64 = content_part.get("input_audio", {}).get("data", None)
229            format = content_part.get("input_audio", {}).get("format", "wav")
230
231            if audio_base64 is not None:
232                base64_data_uri = f"data:audio/{format};base64,{audio_base64}"
233
234                processed_content.append(
235                    {
236                        "type": "input_audio",
237                        "input_audio": {
238                            "data": LangfuseMedia(base64_data_uri=base64_data_uri),
239                            "format": format,
240                        },
241                    }
242                )
243        else:
244            processed_content.append(content_part)
245
246    processed_message["content"] = processed_content
247
248    return processed_message
249
250
251def _extract_chat_response(kwargs: any):
252    """Extracts the llm output from the response."""
253    response = {
254        "role": kwargs.get("role", None),
255    }
256
257    audio = None
258
259    if kwargs.get("function_call") is not None:
260        response.update({"function_call": kwargs["function_call"]})
261
262    if kwargs.get("tool_calls") is not None:
263        response.update({"tool_calls": kwargs["tool_calls"]})
264
265    if kwargs.get("audio") is not None:
266        audio = kwargs["audio"].__dict__
267
268        if "data" in audio and audio["data"] is not None:
269            base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}"
270            audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri)
271
272    response.update(
273        {
274            "content": kwargs.get("content", None),
275        }
276    )
277
278    if audio is not None:
279        response.update({"audio": audio})
280
281    return response
282
283
284def _get_langfuse_data_from_kwargs(
285    resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
286):
287    name = kwargs.get("name", "OpenAI-generation")
288
289    if name is None:
290        name = "OpenAI-generation"
291
292    if name is not None and not isinstance(name, str):
293        raise TypeError("name must be a string")
294
295    decorator_context_observation_id = langfuse_context.get_current_observation_id()
296    decorator_context_trace_id = langfuse_context.get_current_trace_id()
297
298    trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
299    if trace_id is not None and not isinstance(trace_id, str):
300        raise TypeError("trace_id must be a string")
301
302    session_id = kwargs.get("session_id", None)
303    if session_id is not None and not isinstance(session_id, str):
304        raise TypeError("session_id must be a string")
305
306    user_id = kwargs.get("user_id", None)
307    if user_id is not None and not isinstance(user_id, str):
308        raise TypeError("user_id must be a string")
309
310    tags = kwargs.get("tags", None)
311    if tags is not None and (
312        not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
313    ):
314        raise TypeError("tags must be a list of strings")
315
316    # Update trace params in decorator context if specified in openai call
317    if decorator_context_trace_id:
318        langfuse_context.update_current_trace(
319            session_id=session_id, user_id=user_id, tags=tags
320        )
321
322    parent_observation_id = kwargs.get("parent_observation_id", None) or (
323        decorator_context_observation_id
324        if decorator_context_observation_id != decorator_context_trace_id
325        else None
326    )
327    if parent_observation_id is not None and not isinstance(parent_observation_id, str):
328        raise TypeError("parent_observation_id must be a string")
329    if parent_observation_id is not None and trace_id is None:
330        raise ValueError("parent_observation_id requires trace_id to be set")
331
332    metadata = kwargs.get("metadata", {})
333
334    if metadata is not None and not isinstance(metadata, dict):
335        raise TypeError("metadata must be a dictionary")
336
337    model = kwargs.get("model", None) or None
338
339    prompt = None
340
341    if resource.type == "completion":
342        prompt = kwargs.get("prompt", None)
343    elif resource.type == "chat":
344        prompt = _extract_chat_prompt(kwargs)
345
346    is_nested_trace = False
347    if trace_id:
348        is_nested_trace = True
349        langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
350    else:
351        trace_id = (
352            decorator_context_trace_id
353            or langfuse.trace(
354                session_id=session_id,
355                user_id=user_id,
356                tags=tags,
357                name=name,
358                input=prompt,
359                metadata=metadata,
360            ).id
361        )
362
363    parsed_temperature = (
364        kwargs.get("temperature", 1)
365        if not isinstance(kwargs.get("temperature", 1), NotGiven)
366        else 1
367    )
368
369    parsed_max_tokens = (
370        kwargs.get("max_tokens", float("inf"))
371        if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven)
372        else float("inf")
373    )
374
375    parsed_top_p = (
376        kwargs.get("top_p", 1)
377        if not isinstance(kwargs.get("top_p", 1), NotGiven)
378        else 1
379    )
380
381    parsed_frequency_penalty = (
382        kwargs.get("frequency_penalty", 0)
383        if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven)
384        else 0
385    )
386
387    parsed_presence_penalty = (
388        kwargs.get("presence_penalty", 0)
389        if not isinstance(kwargs.get("presence_penalty", 0), NotGiven)
390        else 0
391    )
392
393    parsed_seed = (
394        kwargs.get("seed", None)
395        if not isinstance(kwargs.get("seed", None), NotGiven)
396        else None
397    )
398
399    parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1
400
401    modelParameters = {
402        "temperature": parsed_temperature,
403        "max_tokens": parsed_max_tokens,  # casing?
404        "top_p": parsed_top_p,
405        "frequency_penalty": parsed_frequency_penalty,
406        "presence_penalty": parsed_presence_penalty,
407    }
408    if parsed_n is not None and parsed_n > 1:
409        modelParameters["n"] = parsed_n
410
411    if parsed_seed is not None:
412        modelParameters["seed"] = parsed_seed
413
414    langfuse_prompt = kwargs.get("langfuse_prompt", None)
415
416    return {
417        "name": name,
418        "metadata": metadata,
419        "trace_id": trace_id,
420        "parent_observation_id": parent_observation_id,
421        "user_id": user_id,
422        "start_time": start_time,
423        "input": prompt,
424        "model_parameters": modelParameters,
425        "model": model or None,
426        "prompt": langfuse_prompt,
427    }, is_nested_trace
428
429
430def _create_langfuse_update(
431    completion,
432    generation: StatefulGenerationClient,
433    completion_start_time,
434    model=None,
435    usage=None,
436):
437    update = {
438        "end_time": _get_timestamp(),
439        "output": completion,
440        "completion_start_time": completion_start_time,
441    }
442    if model is not None:
443        update["model"] = model
444
445    if usage is not None:
446        update["usage"] = usage
447
448    generation.update(**update)
449
450
451def _extract_streamed_openai_response(resource, chunks):
452    completion = defaultdict(str) if resource.type == "chat" else ""
453    model = None
454
455    for chunk in chunks:
456        if _is_openai_v1():
457            chunk = chunk.__dict__
458
459        model = model or chunk.get("model", None) or None
460        usage = chunk.get("usage", None)
461
462        choices = chunk.get("choices", [])
463
464        for choice in choices:
465            if _is_openai_v1():
466                choice = choice.__dict__
467            if resource.type == "chat":
468                delta = choice.get("delta", None)
469
470                if _is_openai_v1():
471                    delta = delta.__dict__
472
473                if delta.get("role", None) is not None:
474                    completion["role"] = delta["role"]
475
476                if delta.get("content", None) is not None:
477                    completion["content"] = (
478                        delta.get("content", None)
479                        if completion["content"] is None
480                        else completion["content"] + delta.get("content", None)
481                    )
482                elif delta.get("function_call", None) is not None:
483                    curr = completion["function_call"]
484                    tool_call_chunk = delta.get("function_call", None)
485
486                    if not curr:
487                        completion["function_call"] = {
488                            "name": getattr(tool_call_chunk, "name", ""),
489                            "arguments": getattr(tool_call_chunk, "arguments", ""),
490                        }
491
492                    else:
493                        curr["name"] = curr["name"] or getattr(
494                            tool_call_chunk, "name", None
495                        )
496                        curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
497
498                elif delta.get("tool_calls", None) is not None:
499                    curr = completion["tool_calls"]
500                    tool_call_chunk = getattr(
501                        delta.get("tool_calls", None)[0], "function", None
502                    )
503
504                    if not curr:
505                        completion["tool_calls"] = [
506                            {
507                                "name": getattr(tool_call_chunk, "name", ""),
508                                "arguments": getattr(tool_call_chunk, "arguments", ""),
509                            }
510                        ]
511
512                    elif getattr(tool_call_chunk, "name", None) is not None:
513                        curr.append(
514                            {
515                                "name": getattr(tool_call_chunk, "name", None),
516                                "arguments": getattr(
517                                    tool_call_chunk, "arguments", None
518                                ),
519                            }
520                        )
521
522                    else:
523                        curr[-1]["name"] = curr[-1]["name"] or getattr(
524                            tool_call_chunk, "name", None
525                        )
526                        curr[-1]["arguments"] += getattr(
527                            tool_call_chunk, "arguments", None
528                        )
529
530            if resource.type == "completion":
531                completion += choice.get("text", None)
532
533    def get_response_for_chat():
534        return (
535            completion["content"]
536            or (
537                completion["function_call"]
538                and {
539                    "role": "assistant",
540                    "function_call": completion["function_call"],
541                }
542            )
543            or (
544                completion["tool_calls"]
545                and {
546                    "role": "assistant",
547                    # "tool_calls": [{"function": completion["tool_calls"]}],
548                    "tool_calls": [
549                        {"function": data} for data in completion["tool_calls"]
550                    ],
551                }
552            )
553            or None
554        )
555
556    return (
557        model,
558        get_response_for_chat() if resource.type == "chat" else completion,
559        usage.__dict__ if _is_openai_v1() and usage is not None else usage,
560    )
561
562
563def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
564    if response is None:
565        return None, "<NoneType response returned from OpenAI>", None
566
567    model = response.get("model", None) or None
568
569    completion = None
570    if resource.type == "completion":
571        choices = response.get("choices", [])
572        if len(choices) > 0:
573            choice = choices[-1]
574
575            completion = choice.text if _is_openai_v1() else choice.get("text", None)
576    elif resource.type == "chat":
577        choices = response.get("choices", [])
578        if len(choices) > 0:
579            # If multiple choices were generated, we'll show all of them in the UI as a list.
580            if len(choices) > 1:
581                completion = [
582                    _extract_chat_response(choice.message.__dict__)
583                    if _is_openai_v1()
584                    else choice.get("message", None)
585                    for choice in choices
586                ]
587            else:
588                choice = choices[0]
589                completion = (
590                    _extract_chat_response(choice.message.__dict__)
591                    if _is_openai_v1()
592                    else choice.get("message", None)
593                )
594
595    usage = response.get("usage", None)
596
597    return (
598        model,
599        completion,
600        usage.__dict__ if _is_openai_v1() and usage is not None else usage,
601    )
602
603
604def _is_openai_v1():
605    return Version(openai.__version__) >= Version("1.0.0")
606
607
608def _is_streaming_response(response):
609    return (
610        isinstance(response, types.GeneratorType)
611        or isinstance(response, types.AsyncGeneratorType)
612        or (_is_openai_v1() and isinstance(response, openai.Stream))
613        or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
614    )
615
616
617@_langfuse_wrapper
618def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
619    new_langfuse: Langfuse = initialize()
620
621    start_time = _get_timestamp()
622    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
623
624    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
625        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
626    )
627    generation = new_langfuse.generation(**generation)
628    try:
629        openai_response = wrapped(**arg_extractor.get_openai_args())
630
631        if _is_streaming_response(openai_response):
632            return LangfuseResponseGeneratorSync(
633                resource=open_ai_resource,
634                response=openai_response,
635                generation=generation,
636                langfuse=new_langfuse,
637                is_nested_trace=is_nested_trace,
638            )
639
640        else:
641            model, completion, usage = _get_langfuse_data_from_default_response(
642                open_ai_resource,
643                (openai_response and openai_response.__dict__)
644                if _is_openai_v1()
645                else openai_response,
646            )
647            generation.update(
648                model=model, output=completion, end_time=_get_timestamp(), usage=usage
649            )
650
651            # Avoiding the trace-update if trace-id is provided by user.
652            if not is_nested_trace:
653                new_langfuse.trace(id=generation.trace_id, output=completion)
654
655        return openai_response
656    except Exception as ex:
657        log.warning(ex)
658        model = kwargs.get("model", None) or None
659        generation.update(
660            end_time=_get_timestamp(),
661            status_message=str(ex),
662            level="ERROR",
663            model=model,
664            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
665        )
666        raise ex
667
668
669@_langfuse_wrapper
670async def _wrap_async(
671    open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
672):
673    new_langfuse = initialize()
674    start_time = _get_timestamp()
675    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
676
677    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
678        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
679    )
680    generation = new_langfuse.generation(**generation)
681    try:
682        openai_response = await wrapped(**arg_extractor.get_openai_args())
683
684        if _is_streaming_response(openai_response):
685            return LangfuseResponseGeneratorAsync(
686                resource=open_ai_resource,
687                response=openai_response,
688                generation=generation,
689                langfuse=new_langfuse,
690                is_nested_trace=is_nested_trace,
691            )
692
693        else:
694            model, completion, usage = _get_langfuse_data_from_default_response(
695                open_ai_resource,
696                (openai_response and openai_response.__dict__)
697                if _is_openai_v1()
698                else openai_response,
699            )
700            generation.update(
701                model=model,
702                output=completion,
703                end_time=_get_timestamp(),
704                usage=usage,
705            )
706            # Avoiding the trace-update if trace-id is provided by user.
707            if not is_nested_trace:
708                new_langfuse.trace(id=generation.trace_id, output=completion)
709
710        return openai_response
711    except Exception as ex:
712        model = kwargs.get("model", None) or None
713        generation.update(
714            end_time=_get_timestamp(),
715            status_message=str(ex),
716            level="ERROR",
717            model=model,
718            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
719        )
720        raise ex
721
722
723class OpenAILangfuse:
724    _langfuse: Optional[Langfuse] = None
725
726    def initialize(self):
727        self._langfuse = LangfuseSingleton().get(
728            public_key=openai.langfuse_public_key,
729            secret_key=openai.langfuse_secret_key,
730            host=openai.langfuse_host,
731            debug=openai.langfuse_debug,
732            enabled=openai.langfuse_enabled,
733            sdk_integration="openai",
734            sample_rate=openai.langfuse_sample_rate,
735        )
736
737        return self._langfuse
738
739    def flush(cls):
740        cls._langfuse.flush()
741
742    def langfuse_auth_check(self):
743        """Check if the provided Langfuse credentials (public and secret key) are valid.
744
745        Raises:
746            Exception: If no projects were found for the provided credentials.
747
748        Note:
749            This method is blocking. It is discouraged to use it in production code.
750        """
751        if self._langfuse is None:
752            self.initialize()
753
754        return self._langfuse.auth_check()
755
756    def register_tracing(self):
757        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
758
759        for resource in resources:
760            if resource.min_version is not None and Version(
761                openai.__version__
762            ) < Version(resource.min_version):
763                continue
764
765            wrap_function_wrapper(
766                resource.module,
767                f"{resource.object}.{resource.method}",
768                _wrap(resource, self.initialize)
769                if resource.sync
770                else _wrap_async(resource, self.initialize),
771            )
772
773        setattr(openai, "langfuse_public_key", None)
774        setattr(openai, "langfuse_secret_key", None)
775        setattr(openai, "langfuse_host", None)
776        setattr(openai, "langfuse_debug", None)
777        setattr(openai, "langfuse_enabled", True)
778        setattr(openai, "langfuse_sample_rate", None)
779        setattr(openai, "langfuse_mask", None)
780        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
781        setattr(openai, "flush_langfuse", self.flush)
782
783
784modifier = OpenAILangfuse()
785modifier.register_tracing()
786
787
788# DEPRECATED: Use `openai.langfuse_auth_check()` instead
789def auth_check():
790    if modifier._langfuse is None:
791        modifier.initialize()
792
793    return modifier._langfuse.auth_check()
794
795
796class LangfuseResponseGeneratorSync:
797    def __init__(
798        self,
799        *,
800        resource,
801        response,
802        generation,
803        langfuse,
804        is_nested_trace,
805    ):
806        self.items = []
807
808        self.resource = resource
809        self.response = response
810        self.generation = generation
811        self.langfuse = langfuse
812        self.is_nested_trace = is_nested_trace
813        self.completion_start_time = None
814
815    def __iter__(self):
816        try:
817            for i in self.response:
818                self.items.append(i)
819
820                if self.completion_start_time is None:
821                    self.completion_start_time = _get_timestamp()
822
823                yield i
824        finally:
825            self._finalize()
826
827    def __next__(self):
828        try:
829            item = self.response.__next__()
830            self.items.append(item)
831
832            if self.completion_start_time is None:
833                self.completion_start_time = _get_timestamp()
834
835            return item
836
837        except StopIteration:
838            self._finalize()
839
840            raise
841
842    def __enter__(self):
843        return self.__iter__()
844
845    def __exit__(self, exc_type, exc_value, traceback):
846        pass
847
848    def _finalize(self):
849        model, completion, usage = _extract_streamed_openai_response(
850            self.resource, self.items
851        )
852
853        # Avoiding the trace-update if trace-id is provided by user.
854        if not self.is_nested_trace:
855            self.langfuse.trace(id=self.generation.trace_id, output=completion)
856
857        _create_langfuse_update(
858            completion,
859            self.generation,
860            self.completion_start_time,
861            model=model,
862            usage=usage,
863        )
864
865
866class LangfuseResponseGeneratorAsync:
867    def __init__(
868        self,
869        *,
870        resource,
871        response,
872        generation,
873        langfuse,
874        is_nested_trace,
875    ):
876        self.items = []
877
878        self.resource = resource
879        self.response = response
880        self.generation = generation
881        self.langfuse = langfuse
882        self.is_nested_trace = is_nested_trace
883        self.completion_start_time = None
884
885    async def __aiter__(self):
886        try:
887            async for i in self.response:
888                self.items.append(i)
889
890                if self.completion_start_time is None:
891                    self.completion_start_time = _get_timestamp()
892
893                yield i
894        finally:
895            await self._finalize()
896
897    async def __anext__(self):
898        try:
899            item = await self.response.__anext__()
900            self.items.append(item)
901
902            if self.completion_start_time is None:
903                self.completion_start_time = _get_timestamp()
904
905            return item
906
907        except StopAsyncIteration:
908            await self._finalize()
909
910            raise
911
912    async def __aenter__(self):
913        return self.__aiter__()
914
915    async def __aexit__(self, exc_type, exc_value, traceback):
916        pass
917
918    async def _finalize(self):
919        model, completion, usage = _extract_streamed_openai_response(
920            self.resource, self.items
921        )
922
923        # Avoiding the trace-update if trace-id is provided by user.
924        if not self.is_nested_trace:
925            self.langfuse.trace(id=self.generation.trace_id, output=completion)
926
927        _create_langfuse_update(
928            completion,
929            self.generation,
930            self.completion_start_time,
931            model=model,
932            usage=usage,
933        )
934
935    async def close(self) -> None:
936        """Close the response and release the connection.
937
938        Automatically called if the response body is read to completion.
939        """
940        await self.response.close()
log = <Logger langfuse (WARNING)>
@dataclass
class OpenAiDefinition:
60@dataclass
61class OpenAiDefinition:
62    module: str
63    object: str
64    method: str
65    type: str
66    sync: bool
67    min_version: Optional[str] = None
OpenAiDefinition( module: str, object: str, method: str, type: str, sync: bool, min_version: Optional[str] = None)
module: str
object: str
method: str
type: str
sync: bool
min_version: Optional[str] = None
OPENAI_METHODS_V0 = [OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None)]
OPENAI_METHODS_V1 = [OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0')]
class OpenAiArgsExtractor:
136class OpenAiArgsExtractor:
137    def __init__(
138        self,
139        name=None,
140        metadata=None,
141        trace_id=None,
142        session_id=None,
143        user_id=None,
144        tags=None,
145        parent_observation_id=None,
146        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
147        **kwargs,
148    ):
149        self.args = {}
150        self.args["name"] = name
151        self.args["metadata"] = (
152            metadata
153            if "response_format" not in kwargs
154            else {
155                **(metadata or {}),
156                "response_format": kwargs["response_format"].model_json_schema()
157                if isclass(kwargs["response_format"])
158                and issubclass(kwargs["response_format"], BaseModel)
159                else kwargs["response_format"],
160            }
161        )
162        self.args["trace_id"] = trace_id
163        self.args["session_id"] = session_id
164        self.args["user_id"] = user_id
165        self.args["tags"] = tags
166        self.args["parent_observation_id"] = parent_observation_id
167        self.args["langfuse_prompt"] = langfuse_prompt
168        self.kwargs = kwargs
169
170    def get_langfuse_args(self):
171        return {**self.args, **self.kwargs}
172
173    def get_openai_args(self):
174        return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
137    def __init__(
138        self,
139        name=None,
140        metadata=None,
141        trace_id=None,
142        session_id=None,
143        user_id=None,
144        tags=None,
145        parent_observation_id=None,
146        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
147        **kwargs,
148    ):
149        self.args = {}
150        self.args["name"] = name
151        self.args["metadata"] = (
152            metadata
153            if "response_format" not in kwargs
154            else {
155                **(metadata or {}),
156                "response_format": kwargs["response_format"].model_json_schema()
157                if isclass(kwargs["response_format"])
158                and issubclass(kwargs["response_format"], BaseModel)
159                else kwargs["response_format"],
160            }
161        )
162        self.args["trace_id"] = trace_id
163        self.args["session_id"] = session_id
164        self.args["user_id"] = user_id
165        self.args["tags"] = tags
166        self.args["parent_observation_id"] = parent_observation_id
167        self.args["langfuse_prompt"] = langfuse_prompt
168        self.kwargs = kwargs
args
kwargs
def get_langfuse_args(self):
170    def get_langfuse_args(self):
171        return {**self.args, **self.kwargs}
def get_openai_args(self):
173    def get_openai_args(self):
174        return self.kwargs
class OpenAILangfuse:
724class OpenAILangfuse:
725    _langfuse: Optional[Langfuse] = None
726
727    def initialize(self):
728        self._langfuse = LangfuseSingleton().get(
729            public_key=openai.langfuse_public_key,
730            secret_key=openai.langfuse_secret_key,
731            host=openai.langfuse_host,
732            debug=openai.langfuse_debug,
733            enabled=openai.langfuse_enabled,
734            sdk_integration="openai",
735            sample_rate=openai.langfuse_sample_rate,
736        )
737
738        return self._langfuse
739
740    def flush(cls):
741        cls._langfuse.flush()
742
743    def langfuse_auth_check(self):
744        """Check if the provided Langfuse credentials (public and secret key) are valid.
745
746        Raises:
747            Exception: If no projects were found for the provided credentials.
748
749        Note:
750            This method is blocking. It is discouraged to use it in production code.
751        """
752        if self._langfuse is None:
753            self.initialize()
754
755        return self._langfuse.auth_check()
756
757    def register_tracing(self):
758        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
759
760        for resource in resources:
761            if resource.min_version is not None and Version(
762                openai.__version__
763            ) < Version(resource.min_version):
764                continue
765
766            wrap_function_wrapper(
767                resource.module,
768                f"{resource.object}.{resource.method}",
769                _wrap(resource, self.initialize)
770                if resource.sync
771                else _wrap_async(resource, self.initialize),
772            )
773
774        setattr(openai, "langfuse_public_key", None)
775        setattr(openai, "langfuse_secret_key", None)
776        setattr(openai, "langfuse_host", None)
777        setattr(openai, "langfuse_debug", None)
778        setattr(openai, "langfuse_enabled", True)
779        setattr(openai, "langfuse_sample_rate", None)
780        setattr(openai, "langfuse_mask", None)
781        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
782        setattr(openai, "flush_langfuse", self.flush)
def initialize(self):
727    def initialize(self):
728        self._langfuse = LangfuseSingleton().get(
729            public_key=openai.langfuse_public_key,
730            secret_key=openai.langfuse_secret_key,
731            host=openai.langfuse_host,
732            debug=openai.langfuse_debug,
733            enabled=openai.langfuse_enabled,
734            sdk_integration="openai",
735            sample_rate=openai.langfuse_sample_rate,
736        )
737
738        return self._langfuse
def flush(cls):
740    def flush(cls):
741        cls._langfuse.flush()
def langfuse_auth_check(self):
743    def langfuse_auth_check(self):
744        """Check if the provided Langfuse credentials (public and secret key) are valid.
745
746        Raises:
747            Exception: If no projects were found for the provided credentials.
748
749        Note:
750            This method is blocking. It is discouraged to use it in production code.
751        """
752        if self._langfuse is None:
753            self.initialize()
754
755        return self._langfuse.auth_check()

Check if the provided Langfuse credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def register_tracing(self):
757    def register_tracing(self):
758        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
759
760        for resource in resources:
761            if resource.min_version is not None and Version(
762                openai.__version__
763            ) < Version(resource.min_version):
764                continue
765
766            wrap_function_wrapper(
767                resource.module,
768                f"{resource.object}.{resource.method}",
769                _wrap(resource, self.initialize)
770                if resource.sync
771                else _wrap_async(resource, self.initialize),
772            )
773
774        setattr(openai, "langfuse_public_key", None)
775        setattr(openai, "langfuse_secret_key", None)
776        setattr(openai, "langfuse_host", None)
777        setattr(openai, "langfuse_debug", None)
778        setattr(openai, "langfuse_enabled", True)
779        setattr(openai, "langfuse_sample_rate", None)
780        setattr(openai, "langfuse_mask", None)
781        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
782        setattr(openai, "flush_langfuse", self.flush)
modifier = <OpenAILangfuse object>
def auth_check():
790def auth_check():
791    if modifier._langfuse is None:
792        modifier.initialize()
793
794    return modifier._langfuse.auth_check()
class LangfuseResponseGeneratorSync:
797class LangfuseResponseGeneratorSync:
798    def __init__(
799        self,
800        *,
801        resource,
802        response,
803        generation,
804        langfuse,
805        is_nested_trace,
806    ):
807        self.items = []
808
809        self.resource = resource
810        self.response = response
811        self.generation = generation
812        self.langfuse = langfuse
813        self.is_nested_trace = is_nested_trace
814        self.completion_start_time = None
815
816    def __iter__(self):
817        try:
818            for i in self.response:
819                self.items.append(i)
820
821                if self.completion_start_time is None:
822                    self.completion_start_time = _get_timestamp()
823
824                yield i
825        finally:
826            self._finalize()
827
828    def __next__(self):
829        try:
830            item = self.response.__next__()
831            self.items.append(item)
832
833            if self.completion_start_time is None:
834                self.completion_start_time = _get_timestamp()
835
836            return item
837
838        except StopIteration:
839            self._finalize()
840
841            raise
842
843    def __enter__(self):
844        return self.__iter__()
845
846    def __exit__(self, exc_type, exc_value, traceback):
847        pass
848
849    def _finalize(self):
850        model, completion, usage = _extract_streamed_openai_response(
851            self.resource, self.items
852        )
853
854        # Avoiding the trace-update if trace-id is provided by user.
855        if not self.is_nested_trace:
856            self.langfuse.trace(id=self.generation.trace_id, output=completion)
857
858        _create_langfuse_update(
859            completion,
860            self.generation,
861            self.completion_start_time,
862            model=model,
863            usage=usage,
864        )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
798    def __init__(
799        self,
800        *,
801        resource,
802        response,
803        generation,
804        langfuse,
805        is_nested_trace,
806    ):
807        self.items = []
808
809        self.resource = resource
810        self.response = response
811        self.generation = generation
812        self.langfuse = langfuse
813        self.is_nested_trace = is_nested_trace
814        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
class LangfuseResponseGeneratorAsync:
867class LangfuseResponseGeneratorAsync:
868    def __init__(
869        self,
870        *,
871        resource,
872        response,
873        generation,
874        langfuse,
875        is_nested_trace,
876    ):
877        self.items = []
878
879        self.resource = resource
880        self.response = response
881        self.generation = generation
882        self.langfuse = langfuse
883        self.is_nested_trace = is_nested_trace
884        self.completion_start_time = None
885
886    async def __aiter__(self):
887        try:
888            async for i in self.response:
889                self.items.append(i)
890
891                if self.completion_start_time is None:
892                    self.completion_start_time = _get_timestamp()
893
894                yield i
895        finally:
896            await self._finalize()
897
898    async def __anext__(self):
899        try:
900            item = await self.response.__anext__()
901            self.items.append(item)
902
903            if self.completion_start_time is None:
904                self.completion_start_time = _get_timestamp()
905
906            return item
907
908        except StopAsyncIteration:
909            await self._finalize()
910
911            raise
912
913    async def __aenter__(self):
914        return self.__aiter__()
915
916    async def __aexit__(self, exc_type, exc_value, traceback):
917        pass
918
919    async def _finalize(self):
920        model, completion, usage = _extract_streamed_openai_response(
921            self.resource, self.items
922        )
923
924        # Avoiding the trace-update if trace-id is provided by user.
925        if not self.is_nested_trace:
926            self.langfuse.trace(id=self.generation.trace_id, output=completion)
927
928        _create_langfuse_update(
929            completion,
930            self.generation,
931            self.completion_start_time,
932            model=model,
933            usage=usage,
934        )
935
936    async def close(self) -> None:
937        """Close the response and release the connection.
938
939        Automatically called if the response body is read to completion.
940        """
941        await self.response.close()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
868    def __init__(
869        self,
870        *,
871        resource,
872        response,
873        generation,
874        langfuse,
875        is_nested_trace,
876    ):
877        self.items = []
878
879        self.resource = resource
880        self.response = response
881        self.generation = generation
882        self.langfuse = langfuse
883        self.is_nested_trace = is_nested_trace
884        self.completion_start_time = None
items
resource
response
generation
langfuse
is_nested_trace
completion_start_time
async def close(self) -> None:
936    async def close(self) -> None:
937        """Close the response and release the connection.
938
939        Automatically called if the response body is read to completion.
940        """
941        await self.response.close()

Close the response and release the connection.

Automatically called if the response body is read to completion.