Untitled
unknown
plain_text
9 months ago
3.9 kB
18
Indexable
import asyncio
import threading
import time
from pymilvus import Collection, connections
from typing import List, Dict, Any, Optional
# Async implementation of insert with progress
async def insert_with_progress(
collection: Collection,
data: List[Dict[str, Any]],
batch_size: int = 1000,
progress_callback: Optional[callable] = None
) -> List[int]:
"""
Insert data into Milvus collection with progress tracking.
"""
total_records = len(data)
inserted_count = 0
all_ids = []
# Process in batches
for i in range(0, total_records, batch_size):
batch = data[i:i+batch_size]
# Perform async insert
mr = await collection.insert(batch)
# Track progress
inserted_count += len(batch)
progress = (inserted_count / total_records) * 100
if progress_callback:
progress_callback(progress)
all_ids.extend(mr.primary_keys)
# Flush data
await collection.flush(_async=True)
if progress_callback:
progress_callback(100)
return all_ids
# Synchronous wrapper using run_coroutine_threadsafe
def sync_insert_with_progress(
collection: Collection,
data: List[Dict[str, Any]],
loop,
batch_size: int = 1000,
progress_callback: Optional[callable] = None
) -> List[int]:
"""
Synchronous wrapper for insert_with_progress.
"""
# Create the coroutine
coro = insert_with_progress(
collection,
data,
batch_size=batch_size,
progress_callback=progress_callback
)
# Submit to event loop and wait for result
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
# Example of using the sync wrapper in a larger application
class MilvusClient:
def __init__(self, host="localhost", port="19530"):
# Connect to Milvus
connections.connect("default", host=host, port=port)
# Start event loop in a separate thread
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.thread.start()
def _run_event_loop(self):
"""Run the event loop in the background thread."""
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def insert_data(self, collection_name, data, batch_size=1000):
"""Insert data into a collection with progress tracking."""
collection = Collection(collection_name)
# Progress tracking
start_time = time.time()
last_update_time = start_time
def progress_callback(progress):
nonlocal last_update_time
current_time = time.time()
# Only update every 0.5 seconds to avoid too many prints
if current_time - last_update_time >= 0.5 or progress >= 100:
elapsed = current_time - start_time
print(f"Progress: {progress:.2f}%, Time elapsed: {elapsed:.2f}s")
last_update_time = current_time
# Call the sync wrapper
result = sync_insert_with_progress(
collection,
data,
self.loop,
batch_size=batch_size,
progress_callback=progress_callback
)
return result
def close(self):
"""Shutdown the event loop and join the thread."""
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
connections.disconnect("default")
# Example usage
if __name__ == "__main__":
# Create client
client = MilvusClient()
try:
# Example data
data = [{"id": i, "vector": [i/100]*128, "text": f"text_{i}"} for i in range(10000)]
# Insert data with progress tracking
ids = client.insert_data("my_collection", data, batch_size=500)
print(f"Inserted {len(ids)} records")
finally:
# Clean up
client.close()
Editor is loading...
Leave a Comment