Untitled
unknown
plain_text
5 months ago
1.5 kB
2
Indexable
from cachetools import TTLCache from threading import Lock from typing import Dict, Any class ThreadSafeTTLCache: def __init__(self, maxsize: int, ttl: int): self.cache = TTLCache(maxsize=maxsize, ttl=ttl) self.lock = Lock() def get(self, key: str) -> Any: with self.lock: return self.cache.get(key) def set(self, key: str, value: Any) -> None: with self.lock: self.cache[key] = value def remove(self, key: str) -> None: with self.lock: self.cache.pop(key, None) def clear(self) -> None: with self.lock: self.cache.clear() def cache_info(self): with self.lock: return { "size": len(self.cache), "maxsize": self.cache.maxsize, "ttl": self.cache.ttl, "hits": self.cache.hits, "misses": self.cache.misses } # Create a cache with a time-to-live for tokens (e.g. 3600 seconds in 1 hour) and upto 15K tokens token_cache = ThreadSafeTTLCache(maxsize=15000, ttl=3600) def get_token_from_cache(token: str) -> Dict[str, Any]: return token_cache.get(token) def set_token_to_cache(token: str, token_data: Dict[str, Any])-> None: token_cache.set(token, token_data) def remove_token_from_cache(token: str) -> None: token_cache.remove(token) def clear_token_cache() -> None: token_cache.clear()
Editor is loading...
Leave a Comment