kv store multi-Thread
unknown
python
a year ago
3.8 kB
32
Indexable
'''
1.Concurrency Control: We need to ensure that concurrent r/w to shared data structures do not lead to data races or inconsistent state.
2.Read-Write Lock:
* r/w ratio is high
* mechanism: allow multiple threads to read concurrently + only 1 thread can write at a time.
3.Python does not have a built-in ReadWriteLock, but we can implement one using threading.Condition.
'''
# Starvation: In high contention scenarios, readers might starve writers (or vice versa).
# This can be mitigated by tweaking the lock implementation to prioritize writes if needed.
import threading
from collections import deque
class ReadWriteLock:
def __init__(self):
self._readers = 0
self._lock = threading.Condition()
def acquire_read(self): # Allows multiple threads to acquire the read lock simultaneously.
with self._lock:
self._readers += 1
def release_read(self): # Decrements the reader count and notifies waiting writers if no readers are left.
with self._lock:
self._readers -= 1
if self._readers == 0:
self._lock.notify_all()
def acquire_write(self): # Blocks until no readers or writers hold the lock.
self._lock.acquire()
while self._readers > 0:
self._lock.wait()
def release_write(self): # Releases the write lock.
self._lock.release()
class WindowKeyValueStore:
def __init__(self, window_seconds=10):
self.window_seconds = window_seconds
self.events = deque() # stores (timestamp, key, value)
self.key_map = {}
self.value_sum = 0
self.lock = ReadWriteLock()
def put(self, key, val, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes.
self.lock.acquire_write()
try:
self._remove_old_events(current_time)
if key in self.key_map:
self.delete(key, current_time)
self.events.append((current_time, key, val))
self.key_map[key] = val
self.value_sum += val
finally:
self.lock.release_write()
def get(self, key, current_time): # Use acquire_read() and release_read() to allow concurrent reads.
self.lock.acquire_read()
try:
self._remove_old_events(current_time)
return self.key_map.get(key, -1)
finally:
self.lock.release_read()
def delete(self, key, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes.
self.lock.acquire_write()
try:
self._remove_old_events(current_time)
if key in self.key_map:
val = self.key_map[key]
self.value_sum -= val
del self.key_map[key]
self._remove_key_from_events(key)
finally:
self.lock.release_write()
def getAverage(self, current_time): # Use acquire_read() and release_read() to allow concurrent reads.
self.lock.acquire_read()
try:
self._remove_old_events(current_time)
if not self.events:
return 0.0
return self.value_sum / len(self.events)
finally:
self.lock.release_read()
def _remove_old_events(self, current_time):
while self.events and current_time - self.events[0][0] > self.window_seconds:
_, key, val = self.events.popleft()
if key in self.key_map and self.key_map[key] == val:
self.value_sum -= val
del self.key_map[key]
def _remove_key_from_events(self, key):
self.events = deque((t, k, v) for t, k, v in self.events if k != key)
# Example usage:
# WindowKeyValueStore can now handle concurrent put and get operations safelyEditor is loading...
Leave a Comment