Untitled
unknown
plain_text
a year ago
3.1 kB
8
Indexable
from in_memory_db import InMemoryDB
from collections import defaultdict, deque
class InMemoryDBImpl(InMemoryDB):
def __init__(self):
self.db = {}
self.modification_counts = {}
self.locks = {} # Track which user has locked which key
self.lock_queues = defaultdict(deque) # Queues for waiting lock requests
def set_or_inc(self, key: str, field: str, value: int) -> int:
if key not in self.db:
self.db[key] = {}
self.modification_counts[key] = 0
if field in self.db[key]:
self.db[key][field] += value
else:
self.db[key][field] = value
self.modification_counts[key] += 1
return self.db[key][field]
def get(self, key: str, field: str) -> int | None:
if key in self.db and field in self.db[key]:
return self.db[key][field]
return None
def delete(self, key: str, field: str) -> bool:
if key in self.db and field in self.db[key]:
del self.db[key][field]
self.modification_counts[key] += 1
if not self.db[key]:
del self.db[key]
del self.modification_counts[key]
return True
return False
def top_n_keys(self, n: int) -> list[str]:
sorted_keys = sorted(self.modification_counts.items(), key=lambda item: (-item[1], item[0]))[:n]
return [f"{key}({count})" for key, count in sorted_keys]
def set_or_inc_by_caller(self, key: str, field: str, value: int, caller_id: str) -> int | None:
if key in self.db and (key not in self.locks or self.locks[key] == caller_id):
result = self.set_or_inc(key, field, value)
return result
elif key in self.db and field in self.db[key]:
return self.db[key][field]
else:
return None
def delete_by_caller(self, key: str, field: str, caller_id: str) -> bool:
if key in self.db and (key not in self.locks or self.locks[key] == caller_id):
return self.delete(key, field)
return False
def lock(self, caller_id: str, key: str) -> str | None:
if key not in self.db:
return "invalid_request"
if key in self.locks:
if self.locks[key] == caller_id:
return None # Already locked by the same user
else:
if caller_id not in self.lock_queues[key]:
self.lock_queues[key].append(caller_id)
return "wait"
else:
self.locks[key] = caller_id
return "acquired"
def unlock(self, key: str) -> str | None:
if key not in self.db:
if key in self.lock_queues:
del self.lock_queues[key]
return "invalid_request"
if key in self.locks:
del self.locks[key]
if self.lock_queues[key]:
next_caller = self.lock_queues[key].popleft()
self.locks[key] = next_caller
return "released"
return None
Editor is loading...
Leave a Comment