Untitled

mail@pastecode.io avatar
unknown
plain_text
5 months ago
3.4 kB
2
Indexable
from collections import deque
from in_memory_db import InMemoryDB

class InMemoryDBImpl(InMemoryDB):
    def __init__(self):
        self.db = {}
        self.modification_counts = {}
        self.locks = {}
        self.lock_queues = {}

    def set_or_inc(self, key: str, field: str, value: int) -> int | None:
        if key in self.locks:
            return None
        return self._set_or_inc_internal(key, field, value)

    def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
        if key in self.locks and self.locks[key] != caller_id:
            return None
        return self._set_or_inc_internal(key, field, value)

    def _set_or_inc_internal(self, key: str, field: str, value: int) -> int:
        if key not in self.db:
            self.db[key] = {}
        if key not in self.modification_counts:
            self.modification_counts[key] = 0
        if field in self.db[key]:
            self.db[key][field] += value
        else:
            self.db[key][field] = value
        self.modification_counts[key] += 1
        return self.db[key][field]

    def get(self, key: str, field: str) -> int | None:
        return self.db.get(key, {}).get(field)

    def delete(self, key: str, field: str) -> bool:
        if key in self.locks:
            return False
        return self._delete_internal(key, field)

    def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
        if key in self.locks and self.locks[key] != caller_id:
            return False
        return self._delete_internal(key, field)

    def _delete_internal(self, key: str, field: str) -> bool:
        if key in self.db and field in self.db[key]:
            del self.db[key][field]
            self.modification_counts[key] += 1
            if not self.db[key]:
                del self.db[key]
                del self.modification_counts[key]
            return True
        return False

    def top_n_keys(self, n: int) -> list[str]:
        sorted_keys = sorted(
            self.modification_counts.items(),
            key=lambda item: (-item[1], item[0])
        )
        return [f"{key}({count})" for key, count in sorted_keys[:n]]

    def lock(self, caller_id: str, key: str) -> str | None:
        if key not in self.db:
            return "invalid_request"
        if key in self.locks:
            if self.locks[key] == caller_id:
                return None
            if key not in self.lock_queues:
                self.lock_queues[key] = deque()
            if caller_id not in self.lock_queues[key]:
                self.lock_queues[key].append(caller_id)
            return "wait"
        self.locks[key] = caller_id
        return "acquired"

    def unlock(self, key: str) -> str | None:
        if key not in self.locks:
            if key in self.db:
                return "invalid_request"
            else:
                return "invalid_request"
        
        self.locks.pop(key)
        if key in self.lock_queues and self.lock_queues[key]:
            next_caller = self.lock_queues[key].popleft()
            self.locks[key] = next_caller
        elif key in self.lock_queues:
            del self.lock_queues[key]
        
        return "released"

    def _cleanup_locks(self, key: str):
        if key in self.locks:
            del self.locks[key]
        if key in self.lock_queues:
            del self.lock_queues[key]
Leave a Comment