Untitled

 avatar
unknown
plain_text
5 months ago
1.5 kB
2
Indexable
from cachetools import TTLCache
from threading import Lock
from typing import Dict, Any

class ThreadSafeTTLCache:
    def __init__(self, maxsize: int, ttl: int):
        self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
        self.lock = Lock()

    def get(self, key: str) -> Any:
        with self.lock:
            return self.cache.get(key)
    
    def set(self, key: str, value: Any) -> None:
        with self.lock:
            self.cache[key] = value

    def remove(self, key: str) -> None:
        with self.lock:
            self.cache.pop(key, None)
    
    def clear(self) -> None:
        with self.lock:
            self.cache.clear()

    def cache_info(self):
        with self.lock:
            return {
                "size": len(self.cache),
                "maxsize": self.cache.maxsize,
                "ttl": self.cache.ttl,
                "hits": self.cache.hits,
                "misses": self.cache.misses
            }

# Create a cache with a time-to-live for tokens (e.g. 3600 seconds in 1 hour) and upto 15K tokens
token_cache = ThreadSafeTTLCache(maxsize=15000, ttl=3600)

def get_token_from_cache(token: str) -> Dict[str, Any]:
    return token_cache.get(token)

def set_token_to_cache(token: str, token_data: Dict[str, Any])-> None:
    token_cache.set(token, token_data)

def remove_token_from_cache(token: str) -> None:
    token_cache.remove(token)

def clear_token_cache() -> None:
    token_cache.clear()
Editor is loading...
Leave a Comment