diff --git a/relay/cache.py b/relay/cache.py index 9c28108..63f1a8c 100644 --- a/relay/cache.py +++ b/relay/cache.py @@ -6,7 +6,7 @@ import typing from abc import ABC, abstractmethod from dataclasses import asdict, dataclass -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from redis import Redis from .database import get_database @@ -94,6 +94,7 @@ class Cache(ABC): self.app = app self.setup() + @abstractmethod def get(self, namespace: str, key: str) -> Item: ... @@ -119,11 +120,26 @@ class Cache(ABC): ... + @abstractmethod + def delete_old(self, days: int = 14) -> None: + ... + + + @abstractmethod + def clear(self) -> None: + ... + + @abstractmethod def setup(self) -> None: ... + @abstractmethod + def close(self) -> None: + ... + + def set_item(self, item: Item) -> Item: return self.set( item.namespace, @@ -201,12 +217,32 @@ class SqlCache(Cache): pass + def delete_old(self, days: int = 14) -> None: + limit = datetime.now(tz = timezone.utc) - timedelta(days = days) + params = {"limit": limit.timestamp()} + + with self._db.connection() as conn: + with conn.execute("DELETE FROM cache WHERE updated < :limit", params): + pass + + + def clear(self) -> None: + with self._db.connection() as conn: + with conn.execute("DELETE FROM cache"): + pass + + def setup(self) -> None: with self._db.connection() as conn: with conn.exec_statement(f'create-cache-table-{self._db.type.name.lower()}', None): pass + def close(self) -> None: + self._db.close() + self._db = None + + @register_cache class RedisCache(Cache): name: str = 'redis' @@ -239,7 +275,7 @@ class RedisCache(Cache): def get_keys(self, namespace: str) -> Iterator[str]: - for key in self._rd.keys(self.get_key_name(namespace, '*')): + for key in self._rd.scan_iter(self.get_key_name(namespace, '*')): *_, key_name = key.split(':', 2) yield key_name @@ -247,7 +283,7 @@ class RedisCache(Cache): def get_namespaces(self) -> Iterator[str]: namespaces = [] - for key in self._rd.keys(f'{self.prefix}:*'): + for key in self._rd.scan_iter(f'{self.prefix}:*'): _, namespace, _ = key.split(':', 2) if namespace not in namespaces: @@ -269,6 +305,21 @@ class RedisCache(Cache): self._rd.delete(self.get_key_name(namespace, key)) + def delete_old(self, days: int = 14) -> None: + limit = datetime.now(tz = timezone.utc) - timedelta(days = days) + + for full_key in self._rd.scan_iter(f'{self.prefix}:*'): + _, namespace, key = full_key.split(':', 2) + item = self.get(namespace, key) + + if item.updated < limit: + self.delete_item(item) + + + def clear(self) -> None: + self._rd.delete(f"{self.prefix}:*") + + def setup(self) -> None: options = { 'client_name': f'ActivityRelay_{self.app.config.domain}', @@ -286,3 +337,8 @@ class RedisCache(Cache): options['port'] = self.app.config.rd_port self._rd = Redis(**options) + + + def close(self) -> None: + self._rd.close() + self._rd = None