
mail@pastecode.io avatar
4 months ago
3.8 kB
from collections import deque
from in_memory_db import InMemoryDB

class InMemoryDBImpl(InMemoryDB):
    def __init__(self):
        self.db = {}
        self.modification_counts = {}
        self.locks = {}
        self.lock_queues = {}

    def set_or_inc(self, key: str, field: str, value: int) -> int | None:
        if key in self.locks:
            return self.get(key, field)
        return self._set_or_inc_internal(key, field, value)

    def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
        if key in self.locks and self.locks[key] != caller_id:
            return self.get(key, field)
        return self._set_or_inc_internal(key, field, value)

    def _set_or_inc_internal(self, key: str, field: str, value: int) -> int:
        if key not in self.db:
            self.db[key] = {}
            self.modification_counts[key] = 0
        if field in self.db[key]:
            self.db[key][field] += value
            self.db[key][field] = value
        self.modification_counts[key] += 1
        return self.db[key][field]

    def get(self, key: str, field: str) -> int | None:
        if key in self.db and field in self.db[key]:
            return self.db[key][field]
        return None

    def delete(self, key: str, field: str) -> bool:
        if key in self.locks:
            return False
        return self._delete_internal(key, field)

    def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
        if key in self.locks and self.locks[key] != caller_id:
            return False
        return self._delete_internal(key, field)

    def _delete_internal(self, key: str, field: str) -> bool:
        if key in self.db and field in self.db[key]:
            del self.db[key][field]
            self.modification_counts[key] += 1
            if not self.db[key]:
                del self.db[key]
                del self.modification_counts[key]
                self._cleanup_locks(key)  # Clean up locks and queues when a key is fully deleted
            return True
        return False

    def top_n_keys(self, n: int) -> list[str]:
        sorted_keys = sorted(
            key=lambda item: (-item[1], item[0])
        return [f"{key}({count})" for key, count in sorted_keys[:n]]

    def lock(self, caller_id: str, key: str) -> str | None:
        if key not in self.db:
            return "invalid_request"
        if key in self.locks:
            if self.locks[key] == caller_id:
                return None  # Already locked by the same user
            if key in self.lock_queues and caller_id in self.lock_queues[key]:
                return None  # Already in the queue
            if key not in self.lock_queues:
                self.lock_queues[key] = deque()
            return "wait"
        # Lock the key for the caller
        self.locks[key] = caller_id
        return "acquired"

    def unlock(self, key: str) -> str | None:
        if key not in self.locks:
            if key not in self.db:
                # The key was deleted, but the lock should still be released
                return "released"
            return None  # Key exists but is not locked
        if key in self.lock_queues and self.lock_queues[key]:
            next_user = self.lock_queues[key].popleft()
            self.locks[key] = next_user
        elif key in self.lock_queues:
            del self.lock_queues[key]
        return "released"

    def _cleanup_locks(self, key: str):
        if key in self.locks:
            del self.locks[key]
        if key in self.lock_queues:
            del self.lock_queues[key]
Leave a Comment