Untitled

mail@pastecode.io avatar
unknown
python
2 years ago
1.7 kB
3
Indexable
Never
class RedisZSetCallback(RedisCallback):
    def __init__(self, host='127.0.0.1', port=6379, socket=None, key=None, numeric_type=float, score_key='timestamp', **kwargs):
        """
        score_key: str
            the value at this key will be used to store the data in the ZSet in redis. The
            default is timestamp. If you wish to look up the data by a different value,
            use this to change it. It must be a numeric value.
        """
        self.score_key = score_key
        super().__init__(host=host, port=port, socket=socket, key=key, numeric_type=numeric_type, **kwargs)

    async def write(self, data: dict):
        score = data[self.score_key]
        await self.queue.put({'score': score, 'data': data})

    async def writer(self):
        while self.running:

            count = self.queue.qsize()
            if count == 0:
                await asyncio.sleep(self.writer_interval)
            elif count > 1:
                async with self.read_many_queue(count) as updates:
                    async with self.redis.pipeline(transaction=False) as pipe:
                        for update in updates:
                            pipe = pipe.zadd(f"{self.key}-{update['data']['exchange']}-{update['data']['symbol']}", {json.dumps(update['data']): update['score']}, nx=True)
                        await pipe.execute()
            else:
                async with self.read_queue() as update:
                    await self.redis.zadd(f"{self.key}-{update['data']['exchange']}-{update['data']['symbol']}", {json.dumps(update['data']): update['score']}, nx=True)

        await self.redis.close()
        await self.redis.connection_pool.disconnect()
        self.exited = True