Untitled

 avatar
unknown
plain_text
24 days ago
3.9 kB
3
Indexable
import asyncio
import threading
import time
from pymilvus import Collection, connections
from typing import List, Dict, Any, Optional

# Async implementation of insert with progress
async def insert_with_progress(
    collection: Collection, 
    data: List[Dict[str, Any]], 
    batch_size: int = 1000,
    progress_callback: Optional[callable] = None
) -> List[int]:
    """
    Insert data into Milvus collection with progress tracking.
    """
    total_records = len(data)
    inserted_count = 0
    all_ids = []

    # Process in batches
    for i in range(0, total_records, batch_size):
        batch = data[i:i+batch_size]

        # Perform async insert
        mr = await collection.insert(batch)

        # Track progress
        inserted_count += len(batch)
        progress = (inserted_count / total_records) * 100

        if progress_callback:
            progress_callback(progress)

        all_ids.extend(mr.primary_keys)

    # Flush data
    await collection.flush(_async=True)

    if progress_callback:
        progress_callback(100)

    return all_ids

# Synchronous wrapper using run_coroutine_threadsafe
def sync_insert_with_progress(
    collection: Collection, 
    data: List[Dict[str, Any]], 
    loop,
    batch_size: int = 1000,
    progress_callback: Optional[callable] = None
) -> List[int]:
    """
    Synchronous wrapper for insert_with_progress.
    """
    # Create the coroutine
    coro = insert_with_progress(
        collection, 
        data, 
        batch_size=batch_size, 
        progress_callback=progress_callback
    )

    # Submit to event loop and wait for result
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    return future.result()

# Example of using the sync wrapper in a larger application
class MilvusClient:
    def __init__(self, host="localhost", port="19530"):
        # Connect to Milvus
        connections.connect("default", host=host, port=port)

        # Start event loop in a separate thread
        self.loop = asyncio.new_event_loop()
        self.thread = threading.Thread(target=self._run_event_loop, daemon=True)
        self.thread.start()

    def _run_event_loop(self):
        """Run the event loop in the background thread."""
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def insert_data(self, collection_name, data, batch_size=1000):
        """Insert data into a collection with progress tracking."""
        collection = Collection(collection_name)

        # Progress tracking
        start_time = time.time()
        last_update_time = start_time

        def progress_callback(progress):
            nonlocal last_update_time
            current_time = time.time()

            # Only update every 0.5 seconds to avoid too many prints
            if current_time - last_update_time >= 0.5 or progress >= 100:
                elapsed = current_time - start_time
                print(f"Progress: {progress:.2f}%, Time elapsed: {elapsed:.2f}s")
                last_update_time = current_time

        # Call the sync wrapper
        result = sync_insert_with_progress(
            collection,
            data,
            self.loop,
            batch_size=batch_size,
            progress_callback=progress_callback
        )

        return result

    def close(self):
        """Shutdown the event loop and join the thread."""
        self.loop.call_soon_threadsafe(self.loop.stop)
        self.thread.join()
        connections.disconnect("default")

# Example usage
if __name__ == "__main__":
    # Create client
    client = MilvusClient()

    try:
        # Example data
        data = [{"id": i, "vector": [i/100]*128, "text": f"text_{i}"} for i in range(10000)]

        # Insert data with progress tracking
        ids = client.insert_data("my_collection", data, batch_size=500)
        print(f"Inserted {len(ids)} records")

    finally:
        # Clean up
        client.close()
Editor is loading...
Leave a Comment