Untitled

mail@pastecode.io avatar
unknown
plain_text
4 months ago
4.1 kB
2
Indexable
from collections import deque
from in_memory_db import InMemoryDB

class InMemoryDBImpl(InMemoryDB):
    def __init__(self):
        self.db = {}
        self.modification_counts = {}
        self.locks = {}
        self.lock_queues = {}

    def set_or_inc(self, key: str, field: str, value: int) -> int | None:
        if key in self.locks:
            return self.get(key, field)
        return self._set_or_inc_internal(key, field, value)

    def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
        if key in self.locks and self.locks[key] != caller_id:
            return self.get(key, field)
        return self._set_or_inc_internal(key, field, value)

    def _set_or_inc_internal(self, key: str, field: str, value: int) -> int:
        if key not in self.db:
            self.db[key] = {}
            self.modification_counts[key] = 0
        if field in self.db[key]:
            self.db[key][field] += value
        else:
            self.db[key][field] = value
        self.modification_counts[key] += 1
        return self.db[key][field]

    def get(self, key: str, field: str) -> int | None:
        if key in self.db and field in self.db[key]:
            return self.db[key][field]
        return None

    def delete(self, key: str, field: str) -> bool:
        if key in self.locks:
            return False
        return self._delete_internal(key, field)

    def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
        if key in self.locks and self.locks[key] != caller_id:
            return False
        return self._delete_internal(key, field)

    def _delete_internal(self, key: str, field: str) -> bool:
        if key in self.db and field in self.db[key]:
            del self.db[key][field]
            self.modification_counts[key] += 1
            if not self.db[key]:
                del self.db[key]
                del self.modification_counts[key]
                self._cleanup_locks(key)
            return True
        return False

    def top_n_keys(self, n: int) -> list[str]:
        sorted_keys = sorted(
            self.modification_counts.items(), 
            key=lambda item: (-item[1], item[0])
        )
        return [f"{key}({count})" for key, count in sorted_keys[:n]]

    def lock(self, caller_id: str, key: str) -> str | None:
        if key not in self.db:
            return "invalid_request"
        
        if key in self.locks:
            if self.locks[key] == caller_id:
                return None  # Already locked by the same user
            if key in self.lock_queues and caller_id in self.lock_queues[key]:
                return None  # Already in the queue
            
            if key not in self.lock_queues:
                self.lock_queues[key] = deque()
            self.lock_queues[key].append(caller_id)
            return "wait"
        
        # Lock the key for the caller
        self.locks[key] = caller_id
        return "acquired"

    def unlock(self, key: str) -> str | None:
        # Check if the key exists in the database or it has pending locks (even if deleted)
        if key not in self.db and key not in self.locks:
            return "invalid_request"  # The key neither exists nor is locked

        # If the key is not locked but exists in the db, return None
        if key not in self.locks:
            return None  # No lock to release

        # Remove the lock from the current holder
        self.locks.pop(key)

        # Clean up if the queue is empty
        if key in self.lock_queues:
            if self.lock_queues[key]:
                next_user = self.lock_queues[key].popleft()
                self.locks[key] = next_user
                return "released"
            else:
                del self.lock_queues[key]

        return "released"

    # Helper method to clean up locks and queues when a key is deleted
    def _cleanup_locks(self, key: str):
        if key in self.locks:
            del self.locks[key]
        if key in self.lock_queues:
            del self.lock_queues[key]
Leave a Comment