langfuse._task_manager.task_manager
@private
1"""@private""" 2 3import atexit 4import logging 5import queue 6from queue import Queue 7from typing import List, Optional 8 9from langfuse.api.client import FernLangfuse 10from langfuse.request import LangfuseClient 11from langfuse.types import MaskFunction 12from langfuse.utils import _get_timestamp 13 14from .ingestion_consumer import IngestionConsumer 15from .media_manager import MediaManager 16from .media_upload_consumer import MediaUploadConsumer 17 18 19class TaskManager(object): 20 _log = logging.getLogger(__name__) 21 _ingestion_consumers: List[IngestionConsumer] 22 _enabled: bool 23 _threads: int 24 _max_task_queue_size: int 25 _ingestion_queue: Queue 26 _media_upload_queue: Queue 27 _client: LangfuseClient 28 _api_client: FernLangfuse 29 _flush_at: int 30 _flush_interval: float 31 _max_retries: int 32 _public_key: str 33 _sdk_name: str 34 _sdk_version: str 35 _sdk_integration: str 36 _sample_rate: float 37 _mask: Optional[MaskFunction] 38 39 def __init__( 40 self, 41 *, 42 client: LangfuseClient, 43 api_client: FernLangfuse, 44 flush_at: int, 45 flush_interval: float, 46 max_retries: int, 47 threads: int, 48 public_key: str, 49 sdk_name: str, 50 sdk_version: str, 51 sdk_integration: str, 52 enabled: bool = True, 53 max_task_queue_size: int = 100_000, 54 sample_rate: float = 1, 55 mask: Optional[MaskFunction] = None, 56 ): 57 self._max_task_queue_size = max_task_queue_size 58 self._threads = threads 59 self._ingestion_queue = queue.Queue(self._max_task_queue_size) 60 self._media_upload_queue = Queue(self._max_task_queue_size) 61 self._media_manager = MediaManager( 62 api_client=api_client, 63 media_upload_queue=self._media_upload_queue, 64 max_retries=max_retries, 65 ) 66 self._ingestion_consumers = [] 67 self._media_upload_consumers = [] 68 self._client = client 69 self._api_client = api_client 70 self._flush_at = flush_at 71 self._flush_interval = flush_interval 72 self._max_retries = max_retries 73 self._public_key = public_key 74 self._sdk_name = sdk_name 75 self._sdk_version = sdk_version 76 self._sdk_integration = sdk_integration 77 self._enabled = enabled 78 self._sample_rate = sample_rate 79 self._mask = mask 80 81 self.init_resources() 82 83 # cleans up when the python interpreter closes 84 atexit.register(self.join) 85 86 def init_resources(self): 87 for i in range(self._threads): 88 ingestion_consumer = IngestionConsumer( 89 ingestion_queue=self._ingestion_queue, 90 identifier=i, 91 client=self._client, 92 media_manager=self._media_manager, 93 flush_at=self._flush_at, 94 flush_interval=self._flush_interval, 95 max_retries=self._max_retries, 96 public_key=self._public_key, 97 sdk_name=self._sdk_name, 98 sdk_version=self._sdk_version, 99 sdk_integration=self._sdk_integration, 100 sample_rate=self._sample_rate, 101 mask=self._mask, 102 ) 103 ingestion_consumer.start() 104 self._ingestion_consumers.append(ingestion_consumer) 105 106 for i in range(self._threads): 107 media_upload_consumer = MediaUploadConsumer( 108 identifier=i, 109 media_manager=self._media_manager, 110 ) 111 media_upload_consumer.start() 112 self._media_upload_consumers.append(media_upload_consumer) 113 114 def add_task(self, event: dict): 115 if not self._enabled: 116 return 117 118 try: 119 event["timestamp"] = _get_timestamp() 120 121 self._ingestion_queue.put(event, block=False) 122 except queue.Full: 123 self._log.warning("analytics-python queue is full") 124 return False 125 except Exception as e: 126 self._log.exception(f"Exception in adding task {e}") 127 128 return False 129 130 def flush(self): 131 """Force a flush from the internal queue to the server.""" 132 self._log.debug("flushing ingestion and media upload queues") 133 134 # Ingestion queue 135 ingestion_queue_size = self._ingestion_queue.qsize() 136 self._ingestion_queue.join() 137 self._log.debug( 138 f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue" 139 ) 140 141 # Media upload queue 142 media_upload_queue_size = self._media_upload_queue.qsize() 143 self._media_upload_queue.join() 144 self._log.debug( 145 f"Successfully flushed ~{media_upload_queue_size} items from media upload queue" 146 ) 147 148 def join(self): 149 """End the consumer threads once the queue is empty. 150 151 Blocks execution until finished 152 """ 153 self._log.debug( 154 f"joining {len(self._ingestion_consumers)} ingestion consumer threads" 155 ) 156 157 # pause all consumers before joining them so we don't have to wait for multiple 158 # flush intervals to join them all. 159 for ingestion_consumer in self._ingestion_consumers: 160 ingestion_consumer.pause() 161 162 for ingestion_consumer in self._ingestion_consumers: 163 try: 164 ingestion_consumer.join() 165 except RuntimeError: 166 # consumer thread has not started 167 pass 168 169 self._log.debug( 170 f"IngestionConsumer thread {ingestion_consumer._identifier} joined" 171 ) 172 173 self._log.debug( 174 f"joining {len(self._media_upload_consumers)} media upload consumer threads" 175 ) 176 for media_upload_consumer in self._media_upload_consumers: 177 media_upload_consumer.pause() 178 179 for media_upload_consumer in self._media_upload_consumers: 180 try: 181 media_upload_consumer.join() 182 except RuntimeError: 183 # consumer thread has not started 184 pass 185 186 self._log.debug( 187 f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined" 188 ) 189 190 def shutdown(self): 191 """Flush all messages and cleanly shutdown the client.""" 192 self._log.debug("shutdown initiated") 193 194 self.flush() 195 self.join() 196 197 self._log.debug("shutdown completed")
class
TaskManager:
20class TaskManager(object): 21 _log = logging.getLogger(__name__) 22 _ingestion_consumers: List[IngestionConsumer] 23 _enabled: bool 24 _threads: int 25 _max_task_queue_size: int 26 _ingestion_queue: Queue 27 _media_upload_queue: Queue 28 _client: LangfuseClient 29 _api_client: FernLangfuse 30 _flush_at: int 31 _flush_interval: float 32 _max_retries: int 33 _public_key: str 34 _sdk_name: str 35 _sdk_version: str 36 _sdk_integration: str 37 _sample_rate: float 38 _mask: Optional[MaskFunction] 39 40 def __init__( 41 self, 42 *, 43 client: LangfuseClient, 44 api_client: FernLangfuse, 45 flush_at: int, 46 flush_interval: float, 47 max_retries: int, 48 threads: int, 49 public_key: str, 50 sdk_name: str, 51 sdk_version: str, 52 sdk_integration: str, 53 enabled: bool = True, 54 max_task_queue_size: int = 100_000, 55 sample_rate: float = 1, 56 mask: Optional[MaskFunction] = None, 57 ): 58 self._max_task_queue_size = max_task_queue_size 59 self._threads = threads 60 self._ingestion_queue = queue.Queue(self._max_task_queue_size) 61 self._media_upload_queue = Queue(self._max_task_queue_size) 62 self._media_manager = MediaManager( 63 api_client=api_client, 64 media_upload_queue=self._media_upload_queue, 65 max_retries=max_retries, 66 ) 67 self._ingestion_consumers = [] 68 self._media_upload_consumers = [] 69 self._client = client 70 self._api_client = api_client 71 self._flush_at = flush_at 72 self._flush_interval = flush_interval 73 self._max_retries = max_retries 74 self._public_key = public_key 75 self._sdk_name = sdk_name 76 self._sdk_version = sdk_version 77 self._sdk_integration = sdk_integration 78 self._enabled = enabled 79 self._sample_rate = sample_rate 80 self._mask = mask 81 82 self.init_resources() 83 84 # cleans up when the python interpreter closes 85 atexit.register(self.join) 86 87 def init_resources(self): 88 for i in range(self._threads): 89 ingestion_consumer = IngestionConsumer( 90 ingestion_queue=self._ingestion_queue, 91 identifier=i, 92 client=self._client, 93 media_manager=self._media_manager, 94 flush_at=self._flush_at, 95 flush_interval=self._flush_interval, 96 max_retries=self._max_retries, 97 public_key=self._public_key, 98 sdk_name=self._sdk_name, 99 sdk_version=self._sdk_version, 100 sdk_integration=self._sdk_integration, 101 sample_rate=self._sample_rate, 102 mask=self._mask, 103 ) 104 ingestion_consumer.start() 105 self._ingestion_consumers.append(ingestion_consumer) 106 107 for i in range(self._threads): 108 media_upload_consumer = MediaUploadConsumer( 109 identifier=i, 110 media_manager=self._media_manager, 111 ) 112 media_upload_consumer.start() 113 self._media_upload_consumers.append(media_upload_consumer) 114 115 def add_task(self, event: dict): 116 if not self._enabled: 117 return 118 119 try: 120 event["timestamp"] = _get_timestamp() 121 122 self._ingestion_queue.put(event, block=False) 123 except queue.Full: 124 self._log.warning("analytics-python queue is full") 125 return False 126 except Exception as e: 127 self._log.exception(f"Exception in adding task {e}") 128 129 return False 130 131 def flush(self): 132 """Force a flush from the internal queue to the server.""" 133 self._log.debug("flushing ingestion and media upload queues") 134 135 # Ingestion queue 136 ingestion_queue_size = self._ingestion_queue.qsize() 137 self._ingestion_queue.join() 138 self._log.debug( 139 f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue" 140 ) 141 142 # Media upload queue 143 media_upload_queue_size = self._media_upload_queue.qsize() 144 self._media_upload_queue.join() 145 self._log.debug( 146 f"Successfully flushed ~{media_upload_queue_size} items from media upload queue" 147 ) 148 149 def join(self): 150 """End the consumer threads once the queue is empty. 151 152 Blocks execution until finished 153 """ 154 self._log.debug( 155 f"joining {len(self._ingestion_consumers)} ingestion consumer threads" 156 ) 157 158 # pause all consumers before joining them so we don't have to wait for multiple 159 # flush intervals to join them all. 160 for ingestion_consumer in self._ingestion_consumers: 161 ingestion_consumer.pause() 162 163 for ingestion_consumer in self._ingestion_consumers: 164 try: 165 ingestion_consumer.join() 166 except RuntimeError: 167 # consumer thread has not started 168 pass 169 170 self._log.debug( 171 f"IngestionConsumer thread {ingestion_consumer._identifier} joined" 172 ) 173 174 self._log.debug( 175 f"joining {len(self._media_upload_consumers)} media upload consumer threads" 176 ) 177 for media_upload_consumer in self._media_upload_consumers: 178 media_upload_consumer.pause() 179 180 for media_upload_consumer in self._media_upload_consumers: 181 try: 182 media_upload_consumer.join() 183 except RuntimeError: 184 # consumer thread has not started 185 pass 186 187 self._log.debug( 188 f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined" 189 ) 190 191 def shutdown(self): 192 """Flush all messages and cleanly shutdown the client.""" 193 self._log.debug("shutdown initiated") 194 195 self.flush() 196 self.join() 197 198 self._log.debug("shutdown completed")
TaskManager( *, client: langfuse.request.LangfuseClient, api_client: langfuse.api.client.FernLangfuse, flush_at: int, flush_interval: float, max_retries: int, threads: int, public_key: str, sdk_name: str, sdk_version: str, sdk_integration: str, enabled: bool = True, max_task_queue_size: int = 100000, sample_rate: float = 1, mask: Optional[langfuse.types.MaskFunction] = None)
40 def __init__( 41 self, 42 *, 43 client: LangfuseClient, 44 api_client: FernLangfuse, 45 flush_at: int, 46 flush_interval: float, 47 max_retries: int, 48 threads: int, 49 public_key: str, 50 sdk_name: str, 51 sdk_version: str, 52 sdk_integration: str, 53 enabled: bool = True, 54 max_task_queue_size: int = 100_000, 55 sample_rate: float = 1, 56 mask: Optional[MaskFunction] = None, 57 ): 58 self._max_task_queue_size = max_task_queue_size 59 self._threads = threads 60 self._ingestion_queue = queue.Queue(self._max_task_queue_size) 61 self._media_upload_queue = Queue(self._max_task_queue_size) 62 self._media_manager = MediaManager( 63 api_client=api_client, 64 media_upload_queue=self._media_upload_queue, 65 max_retries=max_retries, 66 ) 67 self._ingestion_consumers = [] 68 self._media_upload_consumers = [] 69 self._client = client 70 self._api_client = api_client 71 self._flush_at = flush_at 72 self._flush_interval = flush_interval 73 self._max_retries = max_retries 74 self._public_key = public_key 75 self._sdk_name = sdk_name 76 self._sdk_version = sdk_version 77 self._sdk_integration = sdk_integration 78 self._enabled = enabled 79 self._sample_rate = sample_rate 80 self._mask = mask 81 82 self.init_resources() 83 84 # cleans up when the python interpreter closes 85 atexit.register(self.join)
def
init_resources(self):
87 def init_resources(self): 88 for i in range(self._threads): 89 ingestion_consumer = IngestionConsumer( 90 ingestion_queue=self._ingestion_queue, 91 identifier=i, 92 client=self._client, 93 media_manager=self._media_manager, 94 flush_at=self._flush_at, 95 flush_interval=self._flush_interval, 96 max_retries=self._max_retries, 97 public_key=self._public_key, 98 sdk_name=self._sdk_name, 99 sdk_version=self._sdk_version, 100 sdk_integration=self._sdk_integration, 101 sample_rate=self._sample_rate, 102 mask=self._mask, 103 ) 104 ingestion_consumer.start() 105 self._ingestion_consumers.append(ingestion_consumer) 106 107 for i in range(self._threads): 108 media_upload_consumer = MediaUploadConsumer( 109 identifier=i, 110 media_manager=self._media_manager, 111 ) 112 media_upload_consumer.start() 113 self._media_upload_consumers.append(media_upload_consumer)
def
add_task(self, event: dict):
115 def add_task(self, event: dict): 116 if not self._enabled: 117 return 118 119 try: 120 event["timestamp"] = _get_timestamp() 121 122 self._ingestion_queue.put(event, block=False) 123 except queue.Full: 124 self._log.warning("analytics-python queue is full") 125 return False 126 except Exception as e: 127 self._log.exception(f"Exception in adding task {e}") 128 129 return False
def
flush(self):
131 def flush(self): 132 """Force a flush from the internal queue to the server.""" 133 self._log.debug("flushing ingestion and media upload queues") 134 135 # Ingestion queue 136 ingestion_queue_size = self._ingestion_queue.qsize() 137 self._ingestion_queue.join() 138 self._log.debug( 139 f"Successfully flushed ~{ingestion_queue_size} items from ingestion queue" 140 ) 141 142 # Media upload queue 143 media_upload_queue_size = self._media_upload_queue.qsize() 144 self._media_upload_queue.join() 145 self._log.debug( 146 f"Successfully flushed ~{media_upload_queue_size} items from media upload queue" 147 )
Force a flush from the internal queue to the server.
def
join(self):
149 def join(self): 150 """End the consumer threads once the queue is empty. 151 152 Blocks execution until finished 153 """ 154 self._log.debug( 155 f"joining {len(self._ingestion_consumers)} ingestion consumer threads" 156 ) 157 158 # pause all consumers before joining them so we don't have to wait for multiple 159 # flush intervals to join them all. 160 for ingestion_consumer in self._ingestion_consumers: 161 ingestion_consumer.pause() 162 163 for ingestion_consumer in self._ingestion_consumers: 164 try: 165 ingestion_consumer.join() 166 except RuntimeError: 167 # consumer thread has not started 168 pass 169 170 self._log.debug( 171 f"IngestionConsumer thread {ingestion_consumer._identifier} joined" 172 ) 173 174 self._log.debug( 175 f"joining {len(self._media_upload_consumers)} media upload consumer threads" 176 ) 177 for media_upload_consumer in self._media_upload_consumers: 178 media_upload_consumer.pause() 179 180 for media_upload_consumer in self._media_upload_consumers: 181 try: 182 media_upload_consumer.join() 183 except RuntimeError: 184 # consumer thread has not started 185 pass 186 187 self._log.debug( 188 f"MediaUploadConsumer thread {media_upload_consumer._identifier} joined" 189 )
End the consumer threads once the queue is empty.
Blocks execution until finished