langfuse.openai

If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.

- import openai
+ from langfuse.openai import openai

Langfuse automatically tracks:

  • All prompts/completions with support for streaming, async and functions
  • Latencies
  • API Errors
  • Model usage (tokens) and cost (USD)

The integration is fully interoperable with the observe() decorator and the low-level tracing SDK.

See docs for more details: https://langfuse.com/docs/integrations/openai

  1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
  2
  3```diff
  4- import openai
  5+ from langfuse.openai import openai
  6```
  7
  8Langfuse automatically tracks:
  9
 10- All prompts/completions with support for streaming, async and functions
 11- Latencies
 12- API Errors
 13- Model usage (tokens) and cost (USD)
 14
 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
 16
 17See docs for more details: https://langfuse.com/docs/integrations/openai
 18"""
 19
 20import copy
 21import logging
 22import types
 23
 24from collections import defaultdict
 25from typing import List, Optional
 26
 27import openai.resources
 28from openai._types import NotGiven
 29from packaging.version import Version
 30from wrapt import wrap_function_wrapper
 31
 32from langfuse import Langfuse
 33from langfuse.client import StatefulGenerationClient
 34from langfuse.decorators import langfuse_context
 35from langfuse.utils import _get_timestamp
 36from langfuse.utils.langfuse_singleton import LangfuseSingleton
 37
 38try:
 39    import openai
 40except ImportError:
 41    raise ModuleNotFoundError(
 42        "Please install OpenAI to use this feature: 'pip install openai'"
 43    )
 44
 45try:
 46    from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI  # noqa: F401
 47except ImportError:
 48    AsyncAzureOpenAI = None
 49    AsyncOpenAI = None
 50    AzureOpenAI = None
 51    OpenAI = None
 52
 53log = logging.getLogger("langfuse")
 54
 55
 56class OpenAiDefinition:
 57    module: str
 58    object: str
 59    method: str
 60    type: str
 61    sync: bool
 62
 63    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
 64        self.module = module
 65        self.object = object
 66        self.method = method
 67        self.type = type
 68        self.sync = sync
 69
 70
 71OPENAI_METHODS_V0 = [
 72    OpenAiDefinition(
 73        module="openai",
 74        object="ChatCompletion",
 75        method="create",
 76        type="chat",
 77        sync=True,
 78    ),
 79    OpenAiDefinition(
 80        module="openai",
 81        object="Completion",
 82        method="create",
 83        type="completion",
 84        sync=True,
 85    ),
 86]
 87
 88
 89OPENAI_METHODS_V1 = [
 90    OpenAiDefinition(
 91        module="openai.resources.chat.completions",
 92        object="Completions",
 93        method="create",
 94        type="chat",
 95        sync=True,
 96    ),
 97    OpenAiDefinition(
 98        module="openai.resources.completions",
 99        object="Completions",
100        method="create",
101        type="completion",
102        sync=True,
103    ),
104    OpenAiDefinition(
105        module="openai.resources.chat.completions",
106        object="AsyncCompletions",
107        method="create",
108        type="chat",
109        sync=False,
110    ),
111    OpenAiDefinition(
112        module="openai.resources.completions",
113        object="AsyncCompletions",
114        method="create",
115        type="completion",
116        sync=False,
117    ),
118]
119
120
121class OpenAiArgsExtractor:
122    def __init__(
123        self,
124        name=None,
125        metadata=None,
126        trace_id=None,
127        session_id=None,
128        user_id=None,
129        tags=None,
130        parent_observation_id=None,
131        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
132        **kwargs,
133    ):
134        self.args = {}
135        self.args["name"] = name
136        self.args["metadata"] = (
137            metadata
138            if "response_format" not in kwargs
139            else {**(metadata or {}), "response_format": kwargs["response_format"]}
140        )
141        self.args["trace_id"] = trace_id
142        self.args["session_id"] = session_id
143        self.args["user_id"] = user_id
144        self.args["tags"] = tags
145        self.args["parent_observation_id"] = parent_observation_id
146        self.args["langfuse_prompt"] = langfuse_prompt
147        self.kwargs = kwargs
148
149    def get_langfuse_args(self):
150        return {**self.args, **self.kwargs}
151
152    def get_openai_args(self):
153        return self.kwargs
154
155
156def _langfuse_wrapper(func):
157    def _with_langfuse(open_ai_definitions, initialize):
158        def wrapper(wrapped, instance, args, kwargs):
159            return func(open_ai_definitions, initialize, wrapped, args, kwargs)
160
161        return wrapper
162
163    return _with_langfuse
164
165
166def _extract_chat_prompt(kwargs: any):
167    """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
168    prompt = {}
169
170    if kwargs.get("functions") is not None:
171        prompt.update({"functions": kwargs["functions"]})
172
173    if kwargs.get("function_call") is not None:
174        prompt.update({"function_call": kwargs["function_call"]})
175
176    if kwargs.get("tools") is not None:
177        prompt.update({"tools": kwargs["tools"]})
178
179    if prompt:
180        # uf user provided functions, we need to send these together with messages to langfuse
181        prompt.update(
182            {
183                "messages": _filter_image_data(kwargs.get("messages", [])),
184            }
185        )
186        return prompt
187    else:
188        # vanilla case, only send messages in openai format to langfuse
189        return _filter_image_data(kwargs.get("messages", []))
190
191
192def _extract_chat_response(kwargs: any):
193    """Extracts the llm output from the response."""
194    response = {
195        "role": kwargs.get("role", None),
196    }
197
198    if kwargs.get("function_call") is not None:
199        response.update({"function_call": kwargs["function_call"]})
200
201    if kwargs.get("tool_calls") is not None:
202        response.update({"tool_calls": kwargs["tool_calls"]})
203
204    response.update(
205        {
206            "content": kwargs.get("content", None),
207        }
208    )
209    return response
210
211
212def _get_langfuse_data_from_kwargs(
213    resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
214):
215    name = kwargs.get("name", "OpenAI-generation")
216
217    if name is None:
218        name = "OpenAI-generation"
219
220    if name is not None and not isinstance(name, str):
221        raise TypeError("name must be a string")
222
223    decorator_context_observation_id = langfuse_context.get_current_observation_id()
224    decorator_context_trace_id = langfuse_context.get_current_trace_id()
225
226    trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
227    if trace_id is not None and not isinstance(trace_id, str):
228        raise TypeError("trace_id must be a string")
229
230    session_id = kwargs.get("session_id", None)
231    if session_id is not None and not isinstance(session_id, str):
232        raise TypeError("session_id must be a string")
233
234    user_id = kwargs.get("user_id", None)
235    if user_id is not None and not isinstance(user_id, str):
236        raise TypeError("user_id must be a string")
237
238    tags = kwargs.get("tags", None)
239    if tags is not None and (
240        not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
241    ):
242        raise TypeError("tags must be a list of strings")
243
244    # Update trace params in decorator context if specified in openai call
245    if decorator_context_trace_id:
246        langfuse_context.update_current_trace(
247            session_id=session_id, user_id=user_id, tags=tags
248        )
249
250    parent_observation_id = kwargs.get("parent_observation_id", None) or (
251        decorator_context_observation_id
252        if decorator_context_observation_id != decorator_context_trace_id
253        else None
254    )
255    if parent_observation_id is not None and not isinstance(parent_observation_id, str):
256        raise TypeError("parent_observation_id must be a string")
257    if parent_observation_id is not None and trace_id is None:
258        raise ValueError("parent_observation_id requires trace_id to be set")
259
260    metadata = kwargs.get("metadata", {})
261
262    if metadata is not None and not isinstance(metadata, dict):
263        raise TypeError("metadata must be a dictionary")
264
265    model = kwargs.get("model", None) or None
266
267    prompt = None
268
269    if resource.type == "completion":
270        prompt = kwargs.get("prompt", None)
271    elif resource.type == "chat":
272        prompt = _extract_chat_prompt(kwargs)
273
274    is_nested_trace = False
275    if trace_id:
276        is_nested_trace = True
277        langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
278    else:
279        trace_id = (
280            decorator_context_trace_id
281            or langfuse.trace(
282                session_id=session_id,
283                user_id=user_id,
284                tags=tags,
285                name=name,
286                input=prompt,
287                metadata=metadata,
288            ).id
289        )
290
291    parsed_temperature = (
292        kwargs.get("temperature", 1)
293        if not isinstance(kwargs.get("temperature", 1), NotGiven)
294        else 1
295    )
296
297    parsed_max_tokens = (
298        kwargs.get("max_tokens", float("inf"))
299        if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven)
300        else float("inf")
301    )
302
303    parsed_top_p = (
304        kwargs.get("top_p", 1)
305        if not isinstance(kwargs.get("top_p", 1), NotGiven)
306        else 1
307    )
308
309    parsed_frequency_penalty = (
310        kwargs.get("frequency_penalty", 0)
311        if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven)
312        else 0
313    )
314
315    parsed_presence_penalty = (
316        kwargs.get("presence_penalty", 0)
317        if not isinstance(kwargs.get("presence_penalty", 0), NotGiven)
318        else 0
319    )
320
321    parsed_seed = (
322        kwargs.get("seed", None)
323        if not isinstance(kwargs.get("seed", None), NotGiven)
324        else None
325    )
326
327    modelParameters = {
328        "temperature": parsed_temperature,
329        "max_tokens": parsed_max_tokens,  # casing?
330        "top_p": parsed_top_p,
331        "frequency_penalty": parsed_frequency_penalty,
332        "presence_penalty": parsed_presence_penalty,
333    }
334    if parsed_seed is not None:
335        modelParameters["seed"] = parsed_seed
336
337    langfuse_prompt = kwargs.get("langfuse_prompt", None)
338
339    return {
340        "name": name,
341        "metadata": metadata,
342        "trace_id": trace_id,
343        "parent_observation_id": parent_observation_id,
344        "user_id": user_id,
345        "start_time": start_time,
346        "input": prompt,
347        "model_parameters": modelParameters,
348        "model": model or None,
349        "prompt": langfuse_prompt,
350    }, is_nested_trace
351
352
353def _create_langfuse_update(
354    completion, generation: StatefulGenerationClient, completion_start_time, model=None
355):
356    update = {
357        "end_time": _get_timestamp(),
358        "output": completion,
359        "completion_start_time": completion_start_time,
360    }
361    if model is not None:
362        update["model"] = model
363
364    generation.update(**update)
365
366
367def _extract_streamed_openai_response(resource, chunks):
368    completion = defaultdict(str) if resource.type == "chat" else ""
369    model = None
370    completion_start_time = None
371
372    for index, i in enumerate(chunks):
373        if index == 0:
374            completion_start_time = _get_timestamp()
375
376        if _is_openai_v1():
377            i = i.__dict__
378
379        model = model or i.get("model", None) or None
380
381        choices = i.get("choices", [])
382
383        for choice in choices:
384            if _is_openai_v1():
385                choice = choice.__dict__
386            if resource.type == "chat":
387                delta = choice.get("delta", None)
388
389                if _is_openai_v1():
390                    delta = delta.__dict__
391
392                if delta.get("role", None) is not None:
393                    completion["role"] = delta["role"]
394
395                if delta.get("content", None) is not None:
396                    completion["content"] = (
397                        delta.get("content", None)
398                        if completion["content"] is None
399                        else completion["content"] + delta.get("content", None)
400                    )
401                elif delta.get("function_call", None) is not None:
402                    curr = completion["function_call"]
403                    tool_call_chunk = delta.get("function_call", None)
404
405                    if not curr:
406                        completion["function_call"] = {
407                            "name": getattr(tool_call_chunk, "name", ""),
408                            "arguments": getattr(tool_call_chunk, "arguments", ""),
409                        }
410
411                    else:
412                        curr["name"] = curr["name"] or getattr(
413                            tool_call_chunk, "name", None
414                        )
415                        curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
416
417                elif delta.get("tool_calls", None) is not None:
418                    curr = completion["tool_calls"]
419                    tool_call_chunk = getattr(
420                        delta.get("tool_calls", None)[0], "function", None
421                    )
422
423                    if not curr:
424                        completion["tool_calls"] = {
425                            "name": getattr(tool_call_chunk, "name", ""),
426                            "arguments": getattr(tool_call_chunk, "arguments", ""),
427                        }
428
429                    else:
430                        curr["name"] = curr["name"] or getattr(
431                            tool_call_chunk, "name", None
432                        )
433                        curr["arguments"] += getattr(tool_call_chunk, "arguments", None)
434
435            if resource.type == "completion":
436                completion += choice.get("text", None)
437
438    def get_response_for_chat():
439        return (
440            completion["content"]
441            or (
442                completion["function_call"]
443                and {
444                    "role": "assistant",
445                    "function_call": completion["function_call"],
446                }
447            )
448            or (
449                completion["tool_calls"]
450                and {
451                    "role": "assistant",
452                    "tool_calls": [{"function": completion["tool_calls"]}],
453                }
454            )
455            or None
456        )
457
458    return (
459        model,
460        completion_start_time,
461        get_response_for_chat() if resource.type == "chat" else completion,
462    )
463
464
465def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
466    model = response.get("model", None) or None
467
468    completion = None
469    if resource.type == "completion":
470        choices = response.get("choices", [])
471        if len(choices) > 0:
472            choice = choices[-1]
473
474            completion = choice.text if _is_openai_v1() else choice.get("text", None)
475    elif resource.type == "chat":
476        choices = response.get("choices", [])
477        if len(choices) > 0:
478            choice = choices[-1]
479            completion = (
480                _extract_chat_response(choice.message.__dict__)
481                if _is_openai_v1()
482                else choice.get("message", None)
483            )
484
485    usage = response.get("usage", None)
486
487    return model, completion, usage.__dict__ if _is_openai_v1() else usage
488
489
490def _is_openai_v1():
491    return Version(openai.__version__) >= Version("1.0.0")
492
493
494def _is_streaming_response(response):
495    return (
496        isinstance(response, types.GeneratorType)
497        or isinstance(response, types.AsyncGeneratorType)
498        or (_is_openai_v1() and isinstance(response, openai.Stream))
499        or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
500    )
501
502
503@_langfuse_wrapper
504def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
505    new_langfuse: Langfuse = initialize()
506
507    start_time = _get_timestamp()
508    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
509
510    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
511        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
512    )
513    generation = new_langfuse.generation(**generation)
514    try:
515        openai_response = wrapped(**arg_extractor.get_openai_args())
516
517        if _is_streaming_response(openai_response):
518            return LangfuseResponseGeneratorSync(
519                resource=open_ai_resource,
520                response=openai_response,
521                generation=generation,
522                langfuse=new_langfuse,
523                is_nested_trace=is_nested_trace,
524            )
525
526        else:
527            model, completion, usage = _get_langfuse_data_from_default_response(
528                open_ai_resource,
529                openai_response.__dict__ if _is_openai_v1() else openai_response,
530            )
531            generation.update(
532                model=model, output=completion, end_time=_get_timestamp(), usage=usage
533            )
534
535            # Avoiding the trace-update if trace-id is provided by user.
536            if not is_nested_trace:
537                new_langfuse.trace(id=generation.trace_id, output=completion)
538
539        return openai_response
540    except Exception as ex:
541        log.warning(ex)
542        model = kwargs.get("model", None) or None
543        generation.update(
544            end_time=_get_timestamp(),
545            status_message=str(ex),
546            level="ERROR",
547            model=model,
548            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
549        )
550        raise ex
551
552
553@_langfuse_wrapper
554async def _wrap_async(
555    open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
556):
557    new_langfuse = initialize()
558    start_time = _get_timestamp()
559    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
560
561    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
562        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
563    )
564    generation = new_langfuse.generation(**generation)
565    try:
566        openai_response = await wrapped(**arg_extractor.get_openai_args())
567
568        if _is_streaming_response(openai_response):
569            return LangfuseResponseGeneratorAsync(
570                resource=open_ai_resource,
571                response=openai_response,
572                generation=generation,
573                langfuse=new_langfuse,
574                is_nested_trace=is_nested_trace,
575            )
576
577        else:
578            model, completion, usage = _get_langfuse_data_from_default_response(
579                open_ai_resource,
580                openai_response.__dict__ if _is_openai_v1() else openai_response,
581            )
582            generation.update(
583                model=model,
584                output=completion,
585                end_time=_get_timestamp(),
586                usage=usage,
587            )
588            # Avoiding the trace-update if trace-id is provided by user.
589            if not is_nested_trace:
590                new_langfuse.trace(id=generation.trace_id, output=completion)
591
592        return openai_response
593    except Exception as ex:
594        model = kwargs.get("model", None) or None
595        generation.update(
596            end_time=_get_timestamp(),
597            status_message=str(ex),
598            level="ERROR",
599            model=model,
600            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
601        )
602        raise ex
603
604
605class OpenAILangfuse:
606    _langfuse: Optional[Langfuse] = None
607
608    def initialize(self):
609        self._langfuse = LangfuseSingleton().get(
610            public_key=openai.langfuse_public_key,
611            secret_key=openai.langfuse_secret_key,
612            host=openai.langfuse_host,
613            debug=openai.langfuse_debug,
614            enabled=openai.langfuse_enabled,
615            sdk_integration="openai",
616            sample_rate=openai.langfuse_sample_rate,
617        )
618
619        return self._langfuse
620
621    def flush(cls):
622        cls._langfuse.flush()
623
624    def langfuse_auth_check(self):
625        """Check if the provided Langfuse credentials (public and secret key) are valid.
626
627        Raises:
628            Exception: If no projects were found for the provided credentials.
629
630        Note:
631            This method is blocking. It is discouraged to use it in production code.
632        """
633        if self._langfuse is None:
634            self.initialize()
635
636        return self._langfuse.auth_check()
637
638    def register_tracing(self):
639        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
640
641        for resource in resources:
642            wrap_function_wrapper(
643                resource.module,
644                f"{resource.object}.{resource.method}",
645                _wrap(resource, self.initialize)
646                if resource.sync
647                else _wrap_async(resource, self.initialize),
648            )
649
650        setattr(openai, "langfuse_public_key", None)
651        setattr(openai, "langfuse_secret_key", None)
652        setattr(openai, "langfuse_host", None)
653        setattr(openai, "langfuse_debug", None)
654        setattr(openai, "langfuse_enabled", True)
655        setattr(openai, "langfuse_sample_rate", None)
656        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
657        setattr(openai, "flush_langfuse", self.flush)
658
659
660modifier = OpenAILangfuse()
661modifier.register_tracing()
662
663
664# DEPRECATED: Use `openai.langfuse_auth_check()` instead
665def auth_check():
666    if modifier._langfuse is None:
667        modifier.initialize()
668
669    return modifier._langfuse.auth_check()
670
671
672def _filter_image_data(messages: List[dict]):
673    """https://platform.openai.com/docs/guides/vision?lang=python
674
675    The messages array remains the same, but the 'image_url' is removed from the 'content' array.
676    It should only be removed if the value starts with 'data:image/jpeg;base64,'
677
678    """
679    output_messages = copy.deepcopy(messages)
680
681    for message in output_messages:
682        content = (
683            message.get("content", None)
684            if isinstance(message, dict)
685            else getattr(message, "content", None)
686        )
687
688        if content is not None:
689            for index, item in enumerate(content):
690                if isinstance(item, dict) and item.get("image_url", None) is not None:
691                    url = item["image_url"]["url"]
692                    if url.startswith("data:image/"):
693                        del content[index]["image_url"]
694
695    return output_messages
696
697
698class LangfuseResponseGeneratorSync:
699    def __init__(
700        self,
701        *,
702        resource,
703        response,
704        generation,
705        langfuse,
706        is_nested_trace,
707    ):
708        self.items = []
709
710        self.resource = resource
711        self.response = response
712        self.generation = generation
713        self.langfuse = langfuse
714        self.is_nested_trace = is_nested_trace
715
716    def __iter__(self):
717        try:
718            for i in self.response:
719                self.items.append(i)
720
721                yield i
722        finally:
723            self._finalize()
724
725    def __next__(self):
726        try:
727            item = self.response.__next__()
728            self.items.append(item)
729
730            return item
731
732        except StopIteration:
733            self._finalize()
734
735            raise
736
737    def __enter__(self):
738        return self.__iter__()
739
740    def __exit__(self, exc_type, exc_value, traceback):
741        pass
742
743    def _finalize(self):
744        model, completion_start_time, completion = _extract_streamed_openai_response(
745            self.resource, self.items
746        )
747
748        # Avoiding the trace-update if trace-id is provided by user.
749        if not self.is_nested_trace:
750            self.langfuse.trace(id=self.generation.trace_id, output=completion)
751
752        _create_langfuse_update(
753            completion, self.generation, completion_start_time, model=model
754        )
755
756
757class LangfuseResponseGeneratorAsync:
758    def __init__(
759        self,
760        *,
761        resource,
762        response,
763        generation,
764        langfuse,
765        is_nested_trace,
766    ):
767        self.items = []
768
769        self.resource = resource
770        self.response = response
771        self.generation = generation
772        self.langfuse = langfuse
773        self.is_nested_trace = is_nested_trace
774
775    async def __aiter__(self):
776        try:
777            async for i in self.response:
778                self.items.append(i)
779
780                yield i
781        finally:
782            await self._finalize()
783
784    async def __anext__(self):
785        try:
786            item = await self.response.__anext__()
787            self.items.append(item)
788
789            return item
790
791        except StopAsyncIteration:
792            await self._finalize()
793
794            raise
795
796    async def __aenter__(self):
797        return self.__aiter__()
798
799    async def __aexit__(self, exc_type, exc_value, traceback):
800        pass
801
802    async def _finalize(self):
803        model, completion_start_time, completion = _extract_streamed_openai_response(
804            self.resource, self.items
805        )
806
807        # Avoiding the trace-update if trace-id is provided by user.
808        if not self.is_nested_trace:
809            self.langfuse.trace(id=self.generation.trace_id, output=completion)
810
811        _create_langfuse_update(
812            completion, self.generation, completion_start_time, model=model
813        )
814
815    async def close(self) -> None:
816        """Close the response and release the connection.
817
818        Automatically called if the response body is read to completion.
819        """
820        await self.response.close()
log = <Logger langfuse (WARNING)>
class OpenAiDefinition:
57class OpenAiDefinition:
58    module: str
59    object: str
60    method: str
61    type: str
62    sync: bool
63
64    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
65        self.module = module
66        self.object = object
67        self.method = method
68        self.type = type
69        self.sync = sync
OpenAiDefinition(module: str, object: str, method: str, type: str, sync: bool)
64    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
65        self.module = module
66        self.object = object
67        self.method = method
68        self.type = type
69        self.sync = sync
module: str
object: str
method: str
type: str
sync: bool
OPENAI_METHODS_V0 = [<OpenAiDefinition object>, <OpenAiDefinition object>]
OPENAI_METHODS_V1 = [<OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>]
class OpenAiArgsExtractor:
122class OpenAiArgsExtractor:
123    def __init__(
124        self,
125        name=None,
126        metadata=None,
127        trace_id=None,
128        session_id=None,
129        user_id=None,
130        tags=None,
131        parent_observation_id=None,
132        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
133        **kwargs,
134    ):
135        self.args = {}
136        self.args["name"] = name
137        self.args["metadata"] = (
138            metadata
139            if "response_format" not in kwargs
140            else {**(metadata or {}), "response_format": kwargs["response_format"]}
141        )
142        self.args["trace_id"] = trace_id
143        self.args["session_id"] = session_id
144        self.args["user_id"] = user_id
145        self.args["tags"] = tags
146        self.args["parent_observation_id"] = parent_observation_id
147        self.args["langfuse_prompt"] = langfuse_prompt
148        self.kwargs = kwargs
149
150    def get_langfuse_args(self):
151        return {**self.args, **self.kwargs}
152
153    def get_openai_args(self):
154        return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
123    def __init__(
124        self,
125        name=None,
126        metadata=None,
127        trace_id=None,
128        session_id=None,
129        user_id=None,
130        tags=None,
131        parent_observation_id=None,
132        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
133        **kwargs,
134    ):
135        self.args = {}
136        self.args["name"] = name
137        self.args["metadata"] = (
138            metadata
139            if "response_format" not in kwargs
140            else {**(metadata or {}), "response_format": kwargs["response_format"]}
141        )
142        self.args["trace_id"] = trace_id
143        self.args["session_id"] = session_id
144        self.args["user_id"] = user_id
145        self.args["tags"] = tags
146        self.args["parent_observation_id"] = parent_observation_id
147        self.args["langfuse_prompt"] = langfuse_prompt
148        self.kwargs = kwargs
args
kwargs
def get_langfuse_args(self):
150    def get_langfuse_args(self):
151        return {**self.args, **self.kwargs}
def get_openai_args(self):
153    def get_openai_args(self):
154        return self.kwargs
class OpenAILangfuse:
606class OpenAILangfuse:
607    _langfuse: Optional[Langfuse] = None
608
609    def initialize(self):
610        self._langfuse = LangfuseSingleton().get(
611            public_key=openai.langfuse_public_key,
612            secret_key=openai.langfuse_secret_key,
613            host=openai.langfuse_host,
614            debug=openai.langfuse_debug,
615            enabled=openai.langfuse_enabled,
616            sdk_integration="openai",
617            sample_rate=openai.langfuse_sample_rate,
618        )
619
620        return self._langfuse
621
622    def flush(cls):
623        cls._langfuse.flush()
624
625    def langfuse_auth_check(self):
626        """Check if the provided Langfuse credentials (public and secret key) are valid.
627
628        Raises:
629            Exception: If no projects were found for the provided credentials.
630
631        Note:
632            This method is blocking. It is discouraged to use it in production code.
633        """
634        if self._langfuse is None:
635            self.initialize()
636
637        return self._langfuse.auth_check()
638
639    def register_tracing(self):
640        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
641
642        for resource in resources:
643            wrap_function_wrapper(
644                resource.module,
645                f"{resource.object}.{resource.method}",
646                _wrap(resource, self.initialize)
647                if resource.sync
648                else _wrap_async(resource, self.initialize),
649            )
650
651        setattr(openai, "langfuse_public_key", None)
652        setattr(openai, "langfuse_secret_key", None)
653        setattr(openai, "langfuse_host", None)
654        setattr(openai, "langfuse_debug", None)
655        setattr(openai, "langfuse_enabled", True)
656        setattr(openai, "langfuse_sample_rate", None)
657        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
658        setattr(openai, "flush_langfuse", self.flush)
def initialize(self):
609    def initialize(self):
610        self._langfuse = LangfuseSingleton().get(
611            public_key=openai.langfuse_public_key,
612            secret_key=openai.langfuse_secret_key,
613            host=openai.langfuse_host,
614            debug=openai.langfuse_debug,
615            enabled=openai.langfuse_enabled,
616            sdk_integration="openai",
617            sample_rate=openai.langfuse_sample_rate,
618        )
619
620        return self._langfuse
def flush(cls):
622    def flush(cls):
623        cls._langfuse.flush()
def langfuse_auth_check(self):
625    def langfuse_auth_check(self):
626        """Check if the provided Langfuse credentials (public and secret key) are valid.
627
628        Raises:
629            Exception: If no projects were found for the provided credentials.
630
631        Note:
632            This method is blocking. It is discouraged to use it in production code.
633        """
634        if self._langfuse is None:
635            self.initialize()
636
637        return self._langfuse.auth_check()

Check if the provided Langfuse credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def register_tracing(self):
639    def register_tracing(self):
640        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
641
642        for resource in resources:
643            wrap_function_wrapper(
644                resource.module,
645                f"{resource.object}.{resource.method}",
646                _wrap(resource, self.initialize)
647                if resource.sync
648                else _wrap_async(resource, self.initialize),
649            )
650
651        setattr(openai, "langfuse_public_key", None)
652        setattr(openai, "langfuse_secret_key", None)
653        setattr(openai, "langfuse_host", None)
654        setattr(openai, "langfuse_debug", None)
655        setattr(openai, "langfuse_enabled", True)
656        setattr(openai, "langfuse_sample_rate", None)
657        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
658        setattr(openai, "flush_langfuse", self.flush)
modifier = <OpenAILangfuse object>
def auth_check():
666def auth_check():
667    if modifier._langfuse is None:
668        modifier.initialize()
669
670    return modifier._langfuse.auth_check()
class LangfuseResponseGeneratorSync:
699class LangfuseResponseGeneratorSync:
700    def __init__(
701        self,
702        *,
703        resource,
704        response,
705        generation,
706        langfuse,
707        is_nested_trace,
708    ):
709        self.items = []
710
711        self.resource = resource
712        self.response = response
713        self.generation = generation
714        self.langfuse = langfuse
715        self.is_nested_trace = is_nested_trace
716
717    def __iter__(self):
718        try:
719            for i in self.response:
720                self.items.append(i)
721
722                yield i
723        finally:
724            self._finalize()
725
726    def __next__(self):
727        try:
728            item = self.response.__next__()
729            self.items.append(item)
730
731            return item
732
733        except StopIteration:
734            self._finalize()
735
736            raise
737
738    def __enter__(self):
739        return self.__iter__()
740
741    def __exit__(self, exc_type, exc_value, traceback):
742        pass
743
744    def _finalize(self):
745        model, completion_start_time, completion = _extract_streamed_openai_response(
746            self.resource, self.items
747        )
748
749        # Avoiding the trace-update if trace-id is provided by user.
750        if not self.is_nested_trace:
751            self.langfuse.trace(id=self.generation.trace_id, output=completion)
752
753        _create_langfuse_update(
754            completion, self.generation, completion_start_time, model=model
755        )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
700    def __init__(
701        self,
702        *,
703        resource,
704        response,
705        generation,
706        langfuse,
707        is_nested_trace,
708    ):
709        self.items = []
710
711        self.resource = resource
712        self.response = response
713        self.generation = generation
714        self.langfuse = langfuse
715        self.is_nested_trace = is_nested_trace
items
resource
response
generation
langfuse
is_nested_trace
class LangfuseResponseGeneratorAsync:
758class LangfuseResponseGeneratorAsync:
759    def __init__(
760        self,
761        *,
762        resource,
763        response,
764        generation,
765        langfuse,
766        is_nested_trace,
767    ):
768        self.items = []
769
770        self.resource = resource
771        self.response = response
772        self.generation = generation
773        self.langfuse = langfuse
774        self.is_nested_trace = is_nested_trace
775
776    async def __aiter__(self):
777        try:
778            async for i in self.response:
779                self.items.append(i)
780
781                yield i
782        finally:
783            await self._finalize()
784
785    async def __anext__(self):
786        try:
787            item = await self.response.__anext__()
788            self.items.append(item)
789
790            return item
791
792        except StopAsyncIteration:
793            await self._finalize()
794
795            raise
796
797    async def __aenter__(self):
798        return self.__aiter__()
799
800    async def __aexit__(self, exc_type, exc_value, traceback):
801        pass
802
803    async def _finalize(self):
804        model, completion_start_time, completion = _extract_streamed_openai_response(
805            self.resource, self.items
806        )
807
808        # Avoiding the trace-update if trace-id is provided by user.
809        if not self.is_nested_trace:
810            self.langfuse.trace(id=self.generation.trace_id, output=completion)
811
812        _create_langfuse_update(
813            completion, self.generation, completion_start_time, model=model
814        )
815
816    async def close(self) -> None:
817        """Close the response and release the connection.
818
819        Automatically called if the response body is read to completion.
820        """
821        await self.response.close()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
759    def __init__(
760        self,
761        *,
762        resource,
763        response,
764        generation,
765        langfuse,
766        is_nested_trace,
767    ):
768        self.items = []
769
770        self.resource = resource
771        self.response = response
772        self.generation = generation
773        self.langfuse = langfuse
774        self.is_nested_trace = is_nested_trace
items
resource
response
generation
langfuse
is_nested_trace
async def close(self) -> None:
816    async def close(self) -> None:
817        """Close the response and release the connection.
818
819        Automatically called if the response body is read to completion.
820        """
821        await self.response.close()

Close the response and release the connection.

Automatically called if the response body is read to completion.