Untitled

mail@pastecode.io avatar
unknown
plain_text
4 months ago
3.3 kB
2
Indexable
from in_memory_db import InMemoryDB
from collections import defaultdict, deque


class InMemoryDBImpl(InMemoryDB):
    def __init__(self):
        self.db = {}
        self.modification_counts = {}
        self.locks = {}
        self.lock_queues = defaultdict(deque)

    def set_or_inc(self, key: str, field: str, value: int) -> int:
        if key not in self.db:
            self.db[key] = {}
            self.modification_counts[key] = 0

        if field in self.db[key]:
            self.db[key][field] += value
        else:
            self.db[key][field] = value

        self.modification_counts[key] += 1
        return self.db[key][field]

    def get(self, key: str, field: str) -> int | None:
        if key in self.db and field in self.db[key]:
            return self.db[key][field]
        return None

    def delete(self, key: str, field: str) -> bool:
        if key in self.db and field in self.db[key]:
            del self.db[key][field]
            self.modification_counts[key] += 1

            if not self.db[key]:  # If the record has no fields left
                del self.db[key]
                del self.modification_counts[key]
                if key in self.locks:
                    del self.locks[key]
                    if self.lock_queues[key]:
                        next_caller = self.lock_queues[key].popleft()
                        self.locks[key] = next_caller
            return True
        return False

    def top_n_keys(self, n: int) -> list[str]:
        sorted_keys = sorted(self.modification_counts.items(), key=lambda item: (-item[1], item[0]))[:n]
        return [f"{key}({count})" for key, count in sorted_keys]

    def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
        if key not in self.db:  # Handle case where key doesn't exist
            return None
        if key in self.locks and self.locks[key] != caller_id:
            return self.get(key, field)  # Return existing value if locked by someone else
        return self.set_or_inc(key, field, value)

    def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
        if key not in self.db:  # Handle case where key doesn't exist
            return False
        if key in self.locks and self.locks[key] != caller_id:
            return False  # Cannot delete if locked by someone else
        return self.delete(key, field)

    def lock(self, caller_id: str, key: str) -> str | None:
        if key not in self.db:
            return "invalid_request"
        if key in self.locks:
            if self.locks[key] == caller_id:
                return None  # Already locked by the same user
            else:
                if caller_id not in self.lock_queues[key]:
                    self.lock_queues[key].append(caller_id)
                return "wait"
        else:
            self.locks[key] = caller_id
            return "acquired"

    def unlock(self, key: str) -> str | None:
        if key not in self.db and key not in self.lock_queues:
            return "invalid_request" 
        if key in self.locks:
            del self.locks[key]
            if self.lock_queues[key]:
                next_caller = self.lock_queues[key].popleft()
                self.locks[key] = next_caller
            return "released"
        return None
Leave a Comment