kv store multi-Thread
''' 1.Concurrency Control: We need to ensure that concurrent r/w to shared data structures do not lead to data races or inconsistent state. 2.Read-Write Lock: * r/w ratio is high * mechanism: allow multiple threads to read concurrently + only 1 thread can write at a time. 3.Python does not have a built-in ReadWriteLock, but we can implement one using threading.Condition. ''' # Starvation: In high contention scenarios, readers might starve writers (or vice versa). # This can be mitigated by tweaking the lock implementation to prioritize writes if needed. import threading from collections import deque class ReadWriteLock: def __init__(self): self._readers = 0 self._lock = threading.Condition() def acquire_read(self): # Allows multiple threads to acquire the read lock simultaneously. with self._lock: self._readers += 1 def release_read(self): # Decrements the reader count and notifies waiting writers if no readers are left. with self._lock: self._readers -= 1 if self._readers == 0: self._lock.notify_all() def acquire_write(self): # Blocks until no readers or writers hold the lock. self._lock.acquire() while self._readers > 0: self._lock.wait() def release_write(self): # Releases the write lock. self._lock.release() class WindowKeyValueStore: def __init__(self, window_seconds=10): self.window_seconds = window_seconds self.events = deque() # stores (timestamp, key, value) self.key_map = {} self.value_sum = 0 self.lock = ReadWriteLock() def put(self, key, val, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes. self.lock.acquire_write() try: self._remove_old_events(current_time) if key in self.key_map: self.delete(key, current_time) self.events.append((current_time, key, val)) self.key_map[key] = val self.value_sum += val finally: self.lock.release_write() def get(self, key, current_time): # Use acquire_read() and release_read() to allow concurrent reads. self.lock.acquire_read() try: self._remove_old_events(current_time) return self.key_map.get(key, -1) finally: self.lock.release_read() def delete(self, key, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes. self.lock.acquire_write() try: self._remove_old_events(current_time) if key in self.key_map: val = self.key_map[key] self.value_sum -= val del self.key_map[key] self._remove_key_from_events(key) finally: self.lock.release_write() def getAverage(self, current_time): # Use acquire_read() and release_read() to allow concurrent reads. self.lock.acquire_read() try: self._remove_old_events(current_time) if not self.events: return 0.0 return self.value_sum / len(self.events) finally: self.lock.release_read() def _remove_old_events(self, current_time): while self.events and current_time - self.events[0][0] > self.window_seconds: _, key, val = self.events.popleft() if key in self.key_map and self.key_map[key] == val: self.value_sum -= val del self.key_map[key] def _remove_key_from_events(self, key): self.events = deque((t, k, v) for t, k, v in self.events if k != key) # Example usage: # WindowKeyValueStore can now handle concurrent put and get operations safely
Leave a Comment