Untitled
unknown
python
3 years ago
1.7 kB
5
Indexable
class RedisZSetCallback(RedisCallback): def __init__(self, host='127.0.0.1', port=6379, socket=None, key=None, numeric_type=float, score_key='timestamp', **kwargs): """ score_key: str the value at this key will be used to store the data in the ZSet in redis. The default is timestamp. If you wish to look up the data by a different value, use this to change it. It must be a numeric value. """ self.score_key = score_key super().__init__(host=host, port=port, socket=socket, key=key, numeric_type=numeric_type, **kwargs) async def write(self, data: dict): score = data[self.score_key] await self.queue.put({'score': score, 'data': data}) async def writer(self): while self.running: count = self.queue.qsize() if count == 0: await asyncio.sleep(self.writer_interval) elif count > 1: async with self.read_many_queue(count) as updates: async with self.redis.pipeline(transaction=False) as pipe: for update in updates: pipe = pipe.zadd(f"{self.key}-{update['data']['exchange']}-{update['data']['symbol']}", {json.dumps(update['data']): update['score']}, nx=True) await pipe.execute() else: async with self.read_queue() as update: await self.redis.zadd(f"{self.key}-{update['data']['exchange']}-{update['data']['symbol']}", {json.dumps(update['data']): update['score']}, nx=True) await self.redis.close() await self.redis.connection_pool.disconnect() self.exited = True
Editor is loading...