kv store multi-Thread

mail@pastecode.io avatar
unknown
python
a month ago
3.8 kB
17
Indexable
Never
	'''
	1.Concurrency Control: We need to ensure that concurrent r/w to shared data structures do not lead to data races or inconsistent state.
	2.Read-Write Lock: 
	 * r/w ratio is high 
	 * mechanism: allow multiple threads to read concurrently + only 1 thread can write at a time.
	3.Python does not have a built-in ReadWriteLock, but we can implement one using threading.Condition.
	'''
	
	# Starvation: In high contention scenarios, readers might starve writers (or vice versa). 
	# This can be mitigated by tweaking the lock implementation to prioritize writes if needed.
	
import threading
from collections import deque

class ReadWriteLock:
    def __init__(self):
        self._readers = 0
        self._lock = threading.Condition()

    def acquire_read(self): # Allows multiple threads to acquire the read lock simultaneously.
        with self._lock:
            self._readers += 1

    def release_read(self): # Decrements the reader count and notifies waiting writers if no readers are left.
        with self._lock:
            self._readers -= 1
            if self._readers == 0:
                self._lock.notify_all()

    def acquire_write(self): #  Blocks until no readers or writers hold the lock.
        self._lock.acquire()
        while self._readers > 0:
            self._lock.wait()

    def release_write(self): # Releases the write lock.
        self._lock.release()

class WindowKeyValueStore:
    def __init__(self, window_seconds=10):
        self.window_seconds = window_seconds
        self.events = deque()  # stores (timestamp, key, value)
        self.key_map = {}
        self.value_sum = 0
        self.lock = ReadWriteLock()

    def put(self, key, val, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes.
        self.lock.acquire_write()
        try:
            self._remove_old_events(current_time)
            if key in self.key_map:
                self.delete(key, current_time)
            self.events.append((current_time, key, val))
            self.key_map[key] = val
            self.value_sum += val
        finally:
            self.lock.release_write()

    def get(self, key, current_time): # Use acquire_read() and release_read() to allow concurrent reads.
        self.lock.acquire_read()
        try:
            self._remove_old_events(current_time)
            return self.key_map.get(key, -1)
        finally:
            self.lock.release_read()

    def delete(self, key, current_time): # Use acquire_write() and release_write() to ensure exclusive access for writes.
        self.lock.acquire_write()
        try:
            self._remove_old_events(current_time)
            if key in self.key_map:
                val = self.key_map[key]
                self.value_sum -= val
                del self.key_map[key]
                self._remove_key_from_events(key)
        finally:
            self.lock.release_write()

    def getAverage(self, current_time): # Use acquire_read() and release_read() to allow concurrent reads.
        self.lock.acquire_read()
        try:
            self._remove_old_events(current_time)
            if not self.events:
                return 0.0
            return self.value_sum / len(self.events)
        finally:
            self.lock.release_read()

    def _remove_old_events(self, current_time):
        while self.events and current_time - self.events[0][0] > self.window_seconds:
            _, key, val = self.events.popleft()
            if key in self.key_map and self.key_map[key] == val:
                self.value_sum -= val
                del self.key_map[key]

    def _remove_key_from_events(self, key):
        self.events = deque((t, k, v) for t, k, v in self.events if k != key)

# Example usage:
# WindowKeyValueStore can now handle concurrent put and get operations safely
Leave a Comment