langfuse.openai

If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.

- import openai
+ from langfuse.openai import openai

Langfuse automatically tracks:

  • All prompts/completions with support for streaming, async and functions
  • Latencies
  • API Errors
  • Model usage (tokens) and cost (USD)

The integration is fully interoperable with the observe() decorator and the low-level tracing SDK.

See docs for more details: https://langfuse.com/docs/integrations/openai

  1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
  2
  3```diff
  4- import openai
  5+ from langfuse.openai import openai
  6```
  7
  8Langfuse automatically tracks:
  9
 10- All prompts/completions with support for streaming, async and functions
 11- Latencies
 12- API Errors
 13- Model usage (tokens) and cost (USD)
 14
 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
 16
 17See docs for more details: https://langfuse.com/docs/integrations/openai
 18"""
 19
 20import copy
 21import logging
 22import types
 23
 24from collections import defaultdict
 25from typing import List, Optional
 26
 27from packaging.version import Version
 28from wrapt import wrap_function_wrapper
 29
 30from langfuse import Langfuse
 31from langfuse.client import StatefulGenerationClient
 32from langfuse.decorators import langfuse_context
 33from langfuse.utils import _get_timestamp
 34from langfuse.utils.langfuse_singleton import LangfuseSingleton
 35
 36try:
 37    import openai
 38except ImportError:
 39    raise ModuleNotFoundError(
 40        "Please install OpenAI to use this feature: 'pip install openai'"
 41    )
 42
 43try:
 44    from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI  # noqa: F401
 45except ImportError:
 46    AsyncAzureOpenAI = None
 47    AsyncOpenAI = None
 48    AzureOpenAI = None
 49    OpenAI = None
 50
 51log = logging.getLogger("langfuse")
 52
 53
 54class OpenAiDefinition:
 55    module: str
 56    object: str
 57    method: str
 58    type: str
 59    sync: bool
 60
 61    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
 62        self.module = module
 63        self.object = object
 64        self.method = method
 65        self.type = type
 66        self.sync = sync
 67
 68
 69OPENAI_METHODS_V0 = [
 70    OpenAiDefinition(
 71        module="openai",
 72        object="ChatCompletion",
 73        method="create",
 74        type="chat",
 75        sync=True,
 76    ),
 77    OpenAiDefinition(
 78        module="openai",
 79        object="Completion",
 80        method="create",
 81        type="completion",
 82        sync=True,
 83    ),
 84]
 85
 86
 87OPENAI_METHODS_V1 = [
 88    OpenAiDefinition(
 89        module="openai.resources.chat.completions",
 90        object="Completions",
 91        method="create",
 92        type="chat",
 93        sync=True,
 94    ),
 95    OpenAiDefinition(
 96        module="openai.resources.completions",
 97        object="Completions",
 98        method="create",
 99        type="completion",
100        sync=True,
101    ),
102    OpenAiDefinition(
103        module="openai.resources.chat.completions",
104        object="AsyncCompletions",
105        method="create",
106        type="chat",
107        sync=False,
108    ),
109    OpenAiDefinition(
110        module="openai.resources.completions",
111        object="AsyncCompletions",
112        method="create",
113        type="completion",
114        sync=False,
115    ),
116]
117
118
119class OpenAiArgsExtractor:
120    def __init__(
121        self,
122        name=None,
123        metadata=None,
124        trace_id=None,
125        session_id=None,
126        user_id=None,
127        tags=None,
128        parent_observation_id=None,
129        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
130        **kwargs,
131    ):
132        self.args = {}
133        self.args["name"] = name
134        self.args["metadata"] = metadata
135        self.args["trace_id"] = trace_id
136        self.args["session_id"] = session_id
137        self.args["user_id"] = user_id
138        self.args["tags"] = tags
139        self.args["parent_observation_id"] = parent_observation_id
140        self.args["langfuse_prompt"] = langfuse_prompt
141        self.kwargs = kwargs
142
143    def get_langfuse_args(self):
144        return {**self.args, **self.kwargs}
145
146    def get_openai_args(self):
147        return self.kwargs
148
149
150def _langfuse_wrapper(func):
151    def _with_langfuse(open_ai_definitions, initialize):
152        def wrapper(wrapped, instance, args, kwargs):
153            return func(open_ai_definitions, initialize, wrapped, args, kwargs)
154
155        return wrapper
156
157    return _with_langfuse
158
159
160def _extract_chat_prompt(kwargs: any):
161    """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
162    prompt = {}
163
164    if kwargs.get("functions") is not None:
165        prompt.update({"functions": kwargs["functions"]})
166
167    if kwargs.get("function_call") is not None:
168        prompt.update({"function_call": kwargs["function_call"]})
169
170    if kwargs.get("tools") is not None:
171        prompt.update({"tools": kwargs["tools"]})
172
173    if prompt:
174        # uf user provided functions, we need to send these together with messages to langfuse
175        prompt.update(
176            {
177                "messages": _filter_image_data(kwargs.get("messages", [])),
178            }
179        )
180        return prompt
181    else:
182        # vanilla case, only send messages in openai format to langfuse
183        return _filter_image_data(kwargs.get("messages", []))
184
185
186def _extract_chat_response(kwargs: any):
187    """Extracts the llm output from the response."""
188    response = {
189        "role": kwargs.get("role", None),
190    }
191
192    if kwargs.get("function_call") is not None:
193        response.update({"function_call": kwargs["function_call"]})
194
195    if kwargs.get("tool_calls") is not None:
196        response.update({"tool_calls": kwargs["tool_calls"]})
197
198    response.update(
199        {
200            "content": kwargs.get("content", None),
201        }
202    )
203    return response
204
205
206def _get_langfuse_data_from_kwargs(
207    resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
208):
209    name = kwargs.get("name", "OpenAI-generation")
210
211    if name is None:
212        name = "OpenAI-generation"
213
214    if name is not None and not isinstance(name, str):
215        raise TypeError("name must be a string")
216
217    decorator_context_observation_id = langfuse_context.get_current_observation_id()
218    decorator_context_trace_id = langfuse_context.get_current_trace_id()
219
220    trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
221    if trace_id is not None and not isinstance(trace_id, str):
222        raise TypeError("trace_id must be a string")
223
224    session_id = kwargs.get("session_id", None)
225    if session_id is not None and not isinstance(session_id, str):
226        raise TypeError("session_id must be a string")
227
228    user_id = kwargs.get("user_id", None)
229    if user_id is not None and not isinstance(user_id, str):
230        raise TypeError("user_id must be a string")
231
232    tags = kwargs.get("tags", None)
233    if tags is not None and (
234        not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
235    ):
236        raise TypeError("tags must be a list of strings")
237
238    # Update trace params in decorator context if specified in openai call
239    if decorator_context_trace_id:
240        langfuse_context.update_current_trace(
241            session_id=session_id, user_id=user_id, tags=tags
242        )
243
244    parent_observation_id = kwargs.get("parent_observation_id", None) or (
245        decorator_context_observation_id
246        if decorator_context_observation_id != decorator_context_trace_id
247        else None
248    )
249    if parent_observation_id is not None and not isinstance(parent_observation_id, str):
250        raise TypeError("parent_observation_id must be a string")
251    if parent_observation_id is not None and trace_id is None:
252        raise ValueError("parent_observation_id requires trace_id to be set")
253
254    metadata = kwargs.get("metadata", {})
255
256    if metadata is not None and not isinstance(metadata, dict):
257        raise TypeError("metadata must be a dictionary")
258
259    model = kwargs.get("model", None) or None
260
261    prompt = None
262
263    if resource.type == "completion":
264        prompt = kwargs.get("prompt", None)
265    elif resource.type == "chat":
266        prompt = _extract_chat_prompt(kwargs)
267
268    is_nested_trace = False
269    if trace_id:
270        is_nested_trace = True
271        langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
272    else:
273        trace_id = (
274            decorator_context_trace_id
275            or langfuse.trace(
276                session_id=session_id,
277                user_id=user_id,
278                tags=tags,
279                name=name,
280                input=prompt,
281                metadata=metadata,
282            ).id
283        )
284
285    modelParameters = {
286        "temperature": kwargs.get("temperature", 1),
287        "max_tokens": kwargs.get("max_tokens", float("inf")),  # casing?
288        "top_p": kwargs.get("top_p", 1),
289        "frequency_penalty": kwargs.get("frequency_penalty", 0),
290        "presence_penalty": kwargs.get("presence_penalty", 0),
291    }
292
293    langfuse_prompt = kwargs.get("langfuse_prompt", None)
294
295    return {
296        "name": name,
297        "metadata": metadata,
298        "trace_id": trace_id,
299        "parent_observation_id": parent_observation_id,
300        "user_id": user_id,
301        "start_time": start_time,
302        "input": prompt,
303        "model_parameters": modelParameters,
304        "model": model or None,
305        "prompt": langfuse_prompt,
306    }, is_nested_trace
307
308
309def _create_langfuse_update(
310    completion, generation: StatefulGenerationClient, completion_start_time, model=None
311):
312    update = {
313        "end_time": _get_timestamp(),
314        "output": completion,
315        "completion_start_time": completion_start_time,
316    }
317    if model is not None:
318        update["model"] = model
319
320    generation.update(**update)
321
322
323def _extract_streamed_openai_response(resource, chunks):
324    completion = defaultdict(str) if resource.type == "chat" else ""
325    model = None
326    completion_start_time = None
327
328    for index, i in enumerate(chunks):
329        if index == 0:
330            completion_start_time = _get_timestamp()
331
332        if _is_openai_v1():
333            i = i.__dict__
334
335        model = model or i.get("model", None) or None
336
337        choices = i.get("choices", [])
338
339        for choice in choices:
340            if _is_openai_v1():
341                choice = choice.__dict__
342            if resource.type == "chat":
343                delta = choice.get("delta", None)
344
345                if _is_openai_v1():
346                    delta = delta.__dict__
347
348                if delta.get("role", None) is not None:
349                    completion["role"] = delta["role"]
350
351                if delta.get("content", None) is not None:
352                    completion["content"] = (
353                        delta.get("content", None)
354                        if completion["content"] is None
355                        else completion["content"] + delta.get("content", None)
356                    )
357                elif delta.get("function_call", None) is not None:
358                    curr = completion["function_call"]
359                    tool_call_chunk = delta.get("function_call", None)
360
361                    if not curr:
362                        completion["function_call"] = {
363                            "name": getattr(tool_call_chunk, "name", ""),
364                            "arguments": getattr(tool_call_chunk, "arguments", ""),
365                        }
366
367                    else:
368                        curr["name"] = curr["name"] or getattr(
369                            tool_call_chunk, "name", None
370                        )
371                        curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
372
373                elif delta.get("tool_calls", None) is not None:
374                    curr = completion["tool_calls"]
375                    tool_call_chunk = getattr(
376                        delta.get("tool_calls", None)[0], "function", None
377                    )
378
379                    if not curr:
380                        completion["tool_calls"] = {
381                            "name": getattr(tool_call_chunk, "name", ""),
382                            "arguments": getattr(tool_call_chunk, "arguments", ""),
383                        }
384
385                    else:
386                        curr["name"] = curr["name"] or getattr(
387                            tool_call_chunk, "name", None
388                        )
389                        curr["arguments"] += getattr(tool_call_chunk, "arguments", None)
390
391            if resource.type == "completion":
392                completion += choice.get("text", None)
393
394    def get_response_for_chat():
395        return (
396            completion["content"]
397            or (
398                completion["function_call"]
399                and {
400                    "role": "assistant",
401                    "function_call": completion["function_call"],
402                }
403            )
404            or (
405                completion["tool_calls"]
406                and {
407                    "role": "assistant",
408                    "tool_calls": [{"function": completion["tool_calls"]}],
409                }
410            )
411            or None
412        )
413
414    return (
415        model,
416        completion_start_time,
417        get_response_for_chat() if resource.type == "chat" else completion,
418    )
419
420
421def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
422    model = response.get("model", None) or None
423
424    completion = None
425    if resource.type == "completion":
426        choices = response.get("choices", [])
427        if len(choices) > 0:
428            choice = choices[-1]
429
430            completion = choice.text if _is_openai_v1() else choice.get("text", None)
431    elif resource.type == "chat":
432        choices = response.get("choices", [])
433        if len(choices) > 0:
434            choice = choices[-1]
435            completion = (
436                _extract_chat_response(choice.message.__dict__)
437                if _is_openai_v1()
438                else choice.get("message", None)
439            )
440
441    usage = response.get("usage", None)
442
443    return model, completion, usage.__dict__ if _is_openai_v1() else usage
444
445
446def _is_openai_v1():
447    return Version(openai.__version__) >= Version("1.0.0")
448
449
450def _is_streaming_response(response):
451    return (
452        isinstance(response, types.GeneratorType)
453        or isinstance(response, types.AsyncGeneratorType)
454        or (_is_openai_v1() and isinstance(response, openai.Stream))
455        or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
456    )
457
458
459@_langfuse_wrapper
460def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
461    new_langfuse: Langfuse = initialize()
462
463    start_time = _get_timestamp()
464    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
465
466    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
467        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
468    )
469    generation = new_langfuse.generation(**generation)
470    try:
471        openai_response = wrapped(**arg_extractor.get_openai_args())
472
473        if _is_streaming_response(openai_response):
474            return LangfuseResponseGeneratorSync(
475                resource=open_ai_resource,
476                response=openai_response,
477                generation=generation,
478                langfuse=new_langfuse,
479                is_nested_trace=is_nested_trace,
480            )
481
482        else:
483            model, completion, usage = _get_langfuse_data_from_default_response(
484                open_ai_resource,
485                openai_response.__dict__ if _is_openai_v1() else openai_response,
486            )
487            generation.update(
488                model=model, output=completion, end_time=_get_timestamp(), usage=usage
489            )
490
491            # Avoiding the trace-update if trace-id is provided by user.
492            if not is_nested_trace:
493                new_langfuse.trace(id=generation.trace_id, output=completion)
494
495        return openai_response
496    except Exception as ex:
497        log.warning(ex)
498        model = kwargs.get("model", None) or None
499        generation.update(
500            end_time=_get_timestamp(),
501            status_message=str(ex),
502            level="ERROR",
503            model=model,
504            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
505        )
506        raise ex
507
508
509@_langfuse_wrapper
510async def _wrap_async(
511    open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
512):
513    new_langfuse = initialize()
514    start_time = _get_timestamp()
515    arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
516
517    generation, is_nested_trace = _get_langfuse_data_from_kwargs(
518        open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
519    )
520    generation = new_langfuse.generation(**generation)
521    try:
522        openai_response = await wrapped(**arg_extractor.get_openai_args())
523
524        if _is_streaming_response(openai_response):
525            return LangfuseResponseGeneratorAsync(
526                resource=open_ai_resource,
527                response=openai_response,
528                generation=generation,
529                langfuse=new_langfuse,
530                is_nested_trace=is_nested_trace,
531            )
532
533        else:
534            model, completion, usage = _get_langfuse_data_from_default_response(
535                open_ai_resource,
536                openai_response.__dict__ if _is_openai_v1() else openai_response,
537            )
538            generation.update(
539                model=model,
540                output=completion,
541                end_time=_get_timestamp(),
542                usage=usage,
543            )
544            # Avoiding the trace-update if trace-id is provided by user.
545            if not is_nested_trace:
546                new_langfuse.trace(id=generation.trace_id, output=completion)
547
548        return openai_response
549    except Exception as ex:
550        model = kwargs.get("model", None) or None
551        generation.update(
552            end_time=_get_timestamp(),
553            status_message=str(ex),
554            level="ERROR",
555            model=model,
556            usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
557        )
558        raise ex
559
560
561class OpenAILangfuse:
562    _langfuse: Optional[Langfuse] = None
563
564    def initialize(self):
565        self._langfuse = LangfuseSingleton().get(
566            public_key=openai.langfuse_public_key,
567            secret_key=openai.langfuse_secret_key,
568            host=openai.langfuse_host,
569            debug=openai.langfuse_debug,
570            enabled=openai.langfuse_enabled,
571            sdk_integration="openai",
572        )
573
574        return self._langfuse
575
576    def flush(cls):
577        cls._langfuse.flush()
578
579    def langfuse_auth_check(self):
580        """Check if the provided Langfuse credentials (public and secret key) are valid.
581
582        Raises:
583            Exception: If no projects were found for the provided credentials.
584
585        Note:
586            This method is blocking. It is discouraged to use it in production code.
587        """
588        if self._langfuse is None:
589            self.initialize()
590
591        return self._langfuse.auth_check()
592
593    def register_tracing(self):
594        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
595
596        for resource in resources:
597            wrap_function_wrapper(
598                resource.module,
599                f"{resource.object}.{resource.method}",
600                _wrap(resource, self.initialize)
601                if resource.sync
602                else _wrap_async(resource, self.initialize),
603            )
604
605        setattr(openai, "langfuse_public_key", None)
606        setattr(openai, "langfuse_secret_key", None)
607        setattr(openai, "langfuse_host", None)
608        setattr(openai, "langfuse_debug", None)
609        setattr(openai, "langfuse_enabled", True)
610        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
611        setattr(openai, "flush_langfuse", self.flush)
612
613
614modifier = OpenAILangfuse()
615modifier.register_tracing()
616
617
618# DEPRECATED: Use `openai.langfuse_auth_check()` instead
619def auth_check():
620    if modifier._langfuse is None:
621        modifier.initialize()
622
623    return modifier._langfuse.auth_check()
624
625
626def _filter_image_data(messages: List[dict]):
627    """https://platform.openai.com/docs/guides/vision?lang=python
628
629    The messages array remains the same, but the 'image_url' is removed from the 'content' array.
630    It should only be removed if the value starts with 'data:image/jpeg;base64,'
631
632    """
633    output_messages = copy.deepcopy(messages)
634
635    for message in output_messages:
636        content = (
637            message.get("content", None)
638            if isinstance(message, dict)
639            else getattr(message, "content", None)
640        )
641
642        if content is not None:
643            for index, item in enumerate(content):
644                if isinstance(item, dict) and item.get("image_url", None) is not None:
645                    url = item["image_url"]["url"]
646                    if url.startswith("data:image/"):
647                        del content[index]["image_url"]
648
649    return output_messages
650
651
652class LangfuseResponseGeneratorSync:
653    def __init__(
654        self,
655        *,
656        resource,
657        response,
658        generation,
659        langfuse,
660        is_nested_trace,
661    ):
662        self.items = []
663
664        self.resource = resource
665        self.response = response
666        self.generation = generation
667        self.langfuse = langfuse
668        self.is_nested_trace = is_nested_trace
669
670    def __iter__(self):
671        try:
672            for i in self.response:
673                self.items.append(i)
674
675                yield i
676        finally:
677            self._finalize()
678
679    def __enter__(self):
680        return self.__iter__()
681
682    def __exit__(self, exc_type, exc_value, traceback):
683        pass
684
685    def _finalize(self):
686        model, completion_start_time, completion = _extract_streamed_openai_response(
687            self.resource, self.items
688        )
689
690        # Avoiding the trace-update if trace-id is provided by user.
691        if not self.is_nested_trace:
692            self.langfuse.trace(id=self.generation.trace_id, output=completion)
693
694        _create_langfuse_update(
695            completion, self.generation, completion_start_time, model=model
696        )
697
698
699class LangfuseResponseGeneratorAsync:
700    def __init__(
701        self,
702        *,
703        resource,
704        response,
705        generation,
706        langfuse,
707        is_nested_trace,
708    ):
709        self.items = []
710
711        self.resource = resource
712        self.response = response
713        self.generation = generation
714        self.langfuse = langfuse
715        self.is_nested_trace = is_nested_trace
716
717    async def __aiter__(self):
718        try:
719            async for i in self.response:
720                self.items.append(i)
721
722                yield i
723        finally:
724            await self._finalize()
725
726    async def __aenter__(self):
727        return self.__aiter__()
728
729    async def __aexit__(self, exc_type, exc_value, traceback):
730        pass
731
732    async def _finalize(self):
733        model, completion_start_time, completion = _extract_streamed_openai_response(
734            self.resource, self.items
735        )
736
737        # Avoiding the trace-update if trace-id is provided by user.
738        if not self.is_nested_trace:
739            self.langfuse.trace(id=self.generation.trace_id, output=completion)
740
741        _create_langfuse_update(
742            completion, self.generation, completion_start_time, model=model
743        )
log = <Logger langfuse (WARNING)>
class OpenAiDefinition:
55class OpenAiDefinition:
56    module: str
57    object: str
58    method: str
59    type: str
60    sync: bool
61
62    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
63        self.module = module
64        self.object = object
65        self.method = method
66        self.type = type
67        self.sync = sync
OpenAiDefinition(module: str, object: str, method: str, type: str, sync: bool)
62    def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
63        self.module = module
64        self.object = object
65        self.method = method
66        self.type = type
67        self.sync = sync
module: str
object: str
method: str
type: str
sync: bool
OPENAI_METHODS_V0 = [<OpenAiDefinition object>, <OpenAiDefinition object>]
OPENAI_METHODS_V1 = [<OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>, <OpenAiDefinition object>]
class OpenAiArgsExtractor:
120class OpenAiArgsExtractor:
121    def __init__(
122        self,
123        name=None,
124        metadata=None,
125        trace_id=None,
126        session_id=None,
127        user_id=None,
128        tags=None,
129        parent_observation_id=None,
130        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
131        **kwargs,
132    ):
133        self.args = {}
134        self.args["name"] = name
135        self.args["metadata"] = metadata
136        self.args["trace_id"] = trace_id
137        self.args["session_id"] = session_id
138        self.args["user_id"] = user_id
139        self.args["tags"] = tags
140        self.args["parent_observation_id"] = parent_observation_id
141        self.args["langfuse_prompt"] = langfuse_prompt
142        self.kwargs = kwargs
143
144    def get_langfuse_args(self):
145        return {**self.args, **self.kwargs}
146
147    def get_openai_args(self):
148        return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
121    def __init__(
122        self,
123        name=None,
124        metadata=None,
125        trace_id=None,
126        session_id=None,
127        user_id=None,
128        tags=None,
129        parent_observation_id=None,
130        langfuse_prompt=None,  # we cannot use prompt because it's an argument of the old OpenAI completions API
131        **kwargs,
132    ):
133        self.args = {}
134        self.args["name"] = name
135        self.args["metadata"] = metadata
136        self.args["trace_id"] = trace_id
137        self.args["session_id"] = session_id
138        self.args["user_id"] = user_id
139        self.args["tags"] = tags
140        self.args["parent_observation_id"] = parent_observation_id
141        self.args["langfuse_prompt"] = langfuse_prompt
142        self.kwargs = kwargs
args
kwargs
def get_langfuse_args(self):
144    def get_langfuse_args(self):
145        return {**self.args, **self.kwargs}
def get_openai_args(self):
147    def get_openai_args(self):
148        return self.kwargs
class OpenAILangfuse:
562class OpenAILangfuse:
563    _langfuse: Optional[Langfuse] = None
564
565    def initialize(self):
566        self._langfuse = LangfuseSingleton().get(
567            public_key=openai.langfuse_public_key,
568            secret_key=openai.langfuse_secret_key,
569            host=openai.langfuse_host,
570            debug=openai.langfuse_debug,
571            enabled=openai.langfuse_enabled,
572            sdk_integration="openai",
573        )
574
575        return self._langfuse
576
577    def flush(cls):
578        cls._langfuse.flush()
579
580    def langfuse_auth_check(self):
581        """Check if the provided Langfuse credentials (public and secret key) are valid.
582
583        Raises:
584            Exception: If no projects were found for the provided credentials.
585
586        Note:
587            This method is blocking. It is discouraged to use it in production code.
588        """
589        if self._langfuse is None:
590            self.initialize()
591
592        return self._langfuse.auth_check()
593
594    def register_tracing(self):
595        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
596
597        for resource in resources:
598            wrap_function_wrapper(
599                resource.module,
600                f"{resource.object}.{resource.method}",
601                _wrap(resource, self.initialize)
602                if resource.sync
603                else _wrap_async(resource, self.initialize),
604            )
605
606        setattr(openai, "langfuse_public_key", None)
607        setattr(openai, "langfuse_secret_key", None)
608        setattr(openai, "langfuse_host", None)
609        setattr(openai, "langfuse_debug", None)
610        setattr(openai, "langfuse_enabled", True)
611        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
612        setattr(openai, "flush_langfuse", self.flush)
def initialize(self):
565    def initialize(self):
566        self._langfuse = LangfuseSingleton().get(
567            public_key=openai.langfuse_public_key,
568            secret_key=openai.langfuse_secret_key,
569            host=openai.langfuse_host,
570            debug=openai.langfuse_debug,
571            enabled=openai.langfuse_enabled,
572            sdk_integration="openai",
573        )
574
575        return self._langfuse
def flush(cls):
577    def flush(cls):
578        cls._langfuse.flush()
def langfuse_auth_check(self):
580    def langfuse_auth_check(self):
581        """Check if the provided Langfuse credentials (public and secret key) are valid.
582
583        Raises:
584            Exception: If no projects were found for the provided credentials.
585
586        Note:
587            This method is blocking. It is discouraged to use it in production code.
588        """
589        if self._langfuse is None:
590            self.initialize()
591
592        return self._langfuse.auth_check()

Check if the provided Langfuse credentials (public and secret key) are valid.

Raises:
  • Exception: If no projects were found for the provided credentials.
Note:

This method is blocking. It is discouraged to use it in production code.

def register_tracing(self):
594    def register_tracing(self):
595        resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
596
597        for resource in resources:
598            wrap_function_wrapper(
599                resource.module,
600                f"{resource.object}.{resource.method}",
601                _wrap(resource, self.initialize)
602                if resource.sync
603                else _wrap_async(resource, self.initialize),
604            )
605
606        setattr(openai, "langfuse_public_key", None)
607        setattr(openai, "langfuse_secret_key", None)
608        setattr(openai, "langfuse_host", None)
609        setattr(openai, "langfuse_debug", None)
610        setattr(openai, "langfuse_enabled", True)
611        setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
612        setattr(openai, "flush_langfuse", self.flush)
modifier = <OpenAILangfuse object>
def auth_check():
620def auth_check():
621    if modifier._langfuse is None:
622        modifier.initialize()
623
624    return modifier._langfuse.auth_check()
class LangfuseResponseGeneratorSync:
653class LangfuseResponseGeneratorSync:
654    def __init__(
655        self,
656        *,
657        resource,
658        response,
659        generation,
660        langfuse,
661        is_nested_trace,
662    ):
663        self.items = []
664
665        self.resource = resource
666        self.response = response
667        self.generation = generation
668        self.langfuse = langfuse
669        self.is_nested_trace = is_nested_trace
670
671    def __iter__(self):
672        try:
673            for i in self.response:
674                self.items.append(i)
675
676                yield i
677        finally:
678            self._finalize()
679
680    def __enter__(self):
681        return self.__iter__()
682
683    def __exit__(self, exc_type, exc_value, traceback):
684        pass
685
686    def _finalize(self):
687        model, completion_start_time, completion = _extract_streamed_openai_response(
688            self.resource, self.items
689        )
690
691        # Avoiding the trace-update if trace-id is provided by user.
692        if not self.is_nested_trace:
693            self.langfuse.trace(id=self.generation.trace_id, output=completion)
694
695        _create_langfuse_update(
696            completion, self.generation, completion_start_time, model=model
697        )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
654    def __init__(
655        self,
656        *,
657        resource,
658        response,
659        generation,
660        langfuse,
661        is_nested_trace,
662    ):
663        self.items = []
664
665        self.resource = resource
666        self.response = response
667        self.generation = generation
668        self.langfuse = langfuse
669        self.is_nested_trace = is_nested_trace
items
resource
response
generation
langfuse
is_nested_trace
class LangfuseResponseGeneratorAsync:
700class LangfuseResponseGeneratorAsync:
701    def __init__(
702        self,
703        *,
704        resource,
705        response,
706        generation,
707        langfuse,
708        is_nested_trace,
709    ):
710        self.items = []
711
712        self.resource = resource
713        self.response = response
714        self.generation = generation
715        self.langfuse = langfuse
716        self.is_nested_trace = is_nested_trace
717
718    async def __aiter__(self):
719        try:
720            async for i in self.response:
721                self.items.append(i)
722
723                yield i
724        finally:
725            await self._finalize()
726
727    async def __aenter__(self):
728        return self.__aiter__()
729
730    async def __aexit__(self, exc_type, exc_value, traceback):
731        pass
732
733    async def _finalize(self):
734        model, completion_start_time, completion = _extract_streamed_openai_response(
735            self.resource, self.items
736        )
737
738        # Avoiding the trace-update if trace-id is provided by user.
739        if not self.is_nested_trace:
740            self.langfuse.trace(id=self.generation.trace_id, output=completion)
741
742        _create_langfuse_update(
743            completion, self.generation, completion_start_time, model=model
744        )
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
701    def __init__(
702        self,
703        *,
704        resource,
705        response,
706        generation,
707        langfuse,
708        is_nested_trace,
709    ):
710        self.items = []
711
712        self.resource = resource
713        self.response = response
714        self.generation = generation
715        self.langfuse = langfuse
716        self.is_nested_trace = is_nested_trace
items
resource
response
generation
langfuse
is_nested_trace