Untitled
unknown
plain_text
24 days ago
3.9 kB
3
Indexable
import asyncio import threading import time from pymilvus import Collection, connections from typing import List, Dict, Any, Optional # Async implementation of insert with progress async def insert_with_progress( collection: Collection, data: List[Dict[str, Any]], batch_size: int = 1000, progress_callback: Optional[callable] = None ) -> List[int]: """ Insert data into Milvus collection with progress tracking. """ total_records = len(data) inserted_count = 0 all_ids = [] # Process in batches for i in range(0, total_records, batch_size): batch = data[i:i+batch_size] # Perform async insert mr = await collection.insert(batch) # Track progress inserted_count += len(batch) progress = (inserted_count / total_records) * 100 if progress_callback: progress_callback(progress) all_ids.extend(mr.primary_keys) # Flush data await collection.flush(_async=True) if progress_callback: progress_callback(100) return all_ids # Synchronous wrapper using run_coroutine_threadsafe def sync_insert_with_progress( collection: Collection, data: List[Dict[str, Any]], loop, batch_size: int = 1000, progress_callback: Optional[callable] = None ) -> List[int]: """ Synchronous wrapper for insert_with_progress. """ # Create the coroutine coro = insert_with_progress( collection, data, batch_size=batch_size, progress_callback=progress_callback ) # Submit to event loop and wait for result future = asyncio.run_coroutine_threadsafe(coro, loop) return future.result() # Example of using the sync wrapper in a larger application class MilvusClient: def __init__(self, host="localhost", port="19530"): # Connect to Milvus connections.connect("default", host=host, port=port) # Start event loop in a separate thread self.loop = asyncio.new_event_loop() self.thread = threading.Thread(target=self._run_event_loop, daemon=True) self.thread.start() def _run_event_loop(self): """Run the event loop in the background thread.""" asyncio.set_event_loop(self.loop) self.loop.run_forever() def insert_data(self, collection_name, data, batch_size=1000): """Insert data into a collection with progress tracking.""" collection = Collection(collection_name) # Progress tracking start_time = time.time() last_update_time = start_time def progress_callback(progress): nonlocal last_update_time current_time = time.time() # Only update every 0.5 seconds to avoid too many prints if current_time - last_update_time >= 0.5 or progress >= 100: elapsed = current_time - start_time print(f"Progress: {progress:.2f}%, Time elapsed: {elapsed:.2f}s") last_update_time = current_time # Call the sync wrapper result = sync_insert_with_progress( collection, data, self.loop, batch_size=batch_size, progress_callback=progress_callback ) return result def close(self): """Shutdown the event loop and join the thread.""" self.loop.call_soon_threadsafe(self.loop.stop) self.thread.join() connections.disconnect("default") # Example usage if __name__ == "__main__": # Create client client = MilvusClient() try: # Example data data = [{"id": i, "vector": [i/100]*128, "text": f"text_{i}"} for i in range(10000)] # Insert data with progress tracking ids = client.insert_data("my_collection", data, batch_size=500) print(f"Inserted {len(ids)} records") finally: # Clean up client.close()
Editor is loading...
Leave a Comment