Untitled

mail@pastecode.io avatar
unknown
plain_text
16 days ago
3.1 kB
2
Indexable
Never
from in_memory_db import InMemoryDB
from collections import defaultdict, deque


class InMemoryDBImpl(InMemoryDB):
    def __init__(self):
        self.db = {}
        self.modification_counts = {}
        self.locks = {}  # Track which user has locked which key
        self.lock_queues = defaultdict(deque)  # Queues for waiting lock requests

    def set_or_inc(self, key: str, field: str, value: int) -> int:
        if key not in self.db:
            self.db[key] = {}
            self.modification_counts[key] = 0

        if field in self.db[key]:
            self.db[key][field] += value
        else:
            self.db[key][field] = value

        self.modification_counts[key] += 1
        return self.db[key][field]

    def get(self, key: str, field: str) -> int | None:
        if key in self.db and field in self.db[key]:
            return self.db[key][field]
        return None

    def delete(self, key: str, field: str) -> bool:
        if key in self.db and field in self.db[key]:
            del self.db[key][field]
            self.modification_counts[key] += 1

            if not self.db[key]:
                del self.db[key]
                del self.modification_counts[key]
            return True
        return False

    def top_n_keys(self, n: int) -> list[str]:
        sorted_keys = sorted(self.modification_counts.items(), key=lambda item: (-item[1], item[0]))[:n]
        return [f"{key}({count})" for key, count in sorted_keys]

    def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
        if key in self.db and (key not in self.locks or self.locks[key] == caller_id):
            result = self.set_or_inc(key, field, value)
            return result
        elif key in self.db and field in self.db[key]:
            return self.db[key][field]
        else:
            return None

    def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
        if key in self.db and (key not in self.locks or self.locks[key] == caller_id):
            return self.delete(key, field)
        return False

    def lock(self, caller_id: str, key: str) -> str | None:
        if key not in self.db:
            return "invalid_request"
        if key in self.locks:
            if self.locks[key] == caller_id:
                return None  # Already locked by the same user
            else:
                if caller_id not in self.lock_queues[key]:
                    self.lock_queues[key].append(caller_id)
                return "wait"
        else:
            self.locks[key] = caller_id
            return "acquired"

    def unlock(self, key: str) -> str | None:
        if key not in self.db:
            if key in self.lock_queues:
                del self.lock_queues[key]
            return "invalid_request"
        if key in self.locks:
            del self.locks[key]
            if self.lock_queues[key]:
                next_caller = self.lock_queues[key].popleft()
                self.locks[key] = next_caller
            return "released"
        return None
Leave a Comment