123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- """Implementation of caching solutions.
- - :py:obj:`searx.cache.ExpireCache` and its :py:obj:`searx.cache.ExpireCacheCfg`
- ----
- """
- from __future__ import annotations
- __all__ = ["ExpireCacheCfg", "ExpireCacheStats", "ExpireCache", "ExpireCacheSQLite"]
- import abc
- import dataclasses
- import datetime
- import hashlib
- import hmac
- import os
- import pickle
- import sqlite3
- import string
- import tempfile
- import time
- import typing
- import msgspec
- from searx import sqlitedb
- from searx import logger
- from searx import get_setting
- log = logger.getChild("cache")
- class ExpireCacheCfg(msgspec.Struct): # pylint: disable=too-few-public-methods
- """Configuration of a :py:obj:`ExpireCache` cache."""
- name: str
- """Name of the cache."""
- db_url: str = ""
- """URL of the SQLite DB, the path to the database file. If unset a default
- DB will be created in `/tmp/sxng_cache_{self.name}.db`"""
- MAX_VALUE_LEN: int = 1024 * 10
- """Max lenght of a *serialized* value."""
- MAXHOLD_TIME: int = 60 * 60 * 24 * 7 # 7 days
- """Hold time (default in sec.), after which a value is removed from the cache."""
- MAINTENANCE_PERIOD: int = 60 * 60 # 2h
- """Maintenance period in seconds / when :py:obj:`MAINTENANCE_MODE` is set to
- ``auto``."""
- MAINTENANCE_MODE: typing.Literal["auto", "off"] = "auto"
- """Type of maintenance mode
- ``auto``:
- Maintenance is carried out automatically as part of the maintenance
- intervals (:py:obj:`MAINTENANCE_PERIOD`); no external process is required.
- ``off``:
- Maintenance is switched off and must be carried out by an external process
- if required.
- """
- password: bytes = get_setting("server.secret_key").encode() # type: ignore
- """Password used by :py:obj:`ExpireCache.secret_hash`.
- The default password is taken from :ref:`secret_key <server.secret_key>`.
- When the password is changed, the hashed keys in the cache can no longer be
- used, which is why all values in the cache are deleted when the password is
- changed.
- """
- def __post_init__(self):
- # if db_url is unset, use a default DB in /tmp/sxng_cache_{name}.db
- if not self.db_url:
- self.db_url = tempfile.gettempdir() + os.sep + f"sxng_cache_{ExpireCache.normalize_name(self.name)}.db"
- @dataclasses.dataclass
- class ExpireCacheStats:
- """Dataclass wich provides information on the status of the cache."""
- cached_items: dict[str, list[tuple[str, typing.Any, int]]]
- """Values in the cache mapped by context name.
- .. code: python
- {
- "context name": [
- ("foo key": "foo value", <expire>),
- ("bar key": "bar value", <expire>),
- # ...
- ],
- # ...
- }
- """
- def report(self):
- c_ctx = 0
- c_kv = 0
- lines = []
- for ctx_name, kv_list in self.cached_items.items():
- c_ctx += 1
- if not kv_list:
- lines.append(f"[{ctx_name:20s}] empty")
- continue
- for key, value, expire in kv_list:
- valid_until = datetime.datetime.fromtimestamp(expire).strftime("%Y-%m-%d %H:%M:%S")
- c_kv += 1
- lines.append(f"[{ctx_name:20s}] {valid_until} {key:12}" f" --> ({type(value).__name__}) {value} ")
- lines.append(f"Number of contexts: {c_ctx}")
- lines.append(f"number of key/value pairs: {c_kv}")
- return "\n".join(lines)
- class ExpireCache(abc.ABC):
- """Abstract base class for the implementation of a key/value cache
- with expire date."""
- cfg: ExpireCacheCfg
- hash_token = "hash_token"
- @abc.abstractmethod
- def set(self, key: str, value: typing.Any, expire: int | None, ctx: str | None = None) -> bool:
- """Set *key* to *value*. To set a timeout on key use argument
- ``expire`` (in sec.). If expire is unset the default is taken from
- :py:obj:`ExpireCacheCfg.MAXHOLD_TIME`. After the timeout has expired,
- the key will automatically be deleted.
- The ``ctx`` argument specifies the context of the ``key``. A key is
- only unique in its context.
- The concrete implementations of this abstraction determine how the
- context is mapped in the connected database. In SQL databases, for
- example, the context is a DB table or in a Key/Value DB it could be
- a prefix for the key.
- If the context is not specified (the default is ``None``) then a
- default context should be used, e.g. a default table for SQL databases
- or a default prefix in a Key/Value DB.
- """
- @abc.abstractmethod
- def get(self, key: str, default=None, ctx: str | None = None) -> typing.Any:
- """Return *value* of *key*. If key is unset, ``None`` is returned."""
- @abc.abstractmethod
- def maintenance(self, force: bool = False, truncate: bool = False) -> bool:
- """Performs maintenance on the cache.
- ``force``:
- Maintenance should be carried out even if the maintenance interval has
- not yet been reached.
- ``truncate``:
- Truncate the entire cache, which is necessary, for example, if the
- password has changed.
- """
- @abc.abstractmethod
- def state(self) -> ExpireCacheStats:
- """Returns a :py:obj:`ExpireCacheStats`, which provides information
- about the status of the cache."""
- @staticmethod
- def build_cache(cfg: ExpireCacheCfg) -> ExpireCache:
- """Factory to build a caching instance.
- .. note::
- Currently, only the SQLite adapter is available, but other database
- types could be implemented in the future, e.g. a Valkey (Redis)
- adapter.
- """
- return ExpireCacheSQLite(cfg)
- @staticmethod
- def normalize_name(name: str) -> str:
- """Returns a normalized name that can be used as a file name or as a SQL
- table name (is used, for example, to normalize the context name)."""
- _valid = "-_." + string.ascii_letters + string.digits
- return "".join([c for c in name if c in _valid])
- def serialize(self, value: typing.Any) -> bytes:
- dump: bytes = pickle.dumps(value)
- return dump
- def deserialize(self, value: bytes) -> typing.Any:
- obj = pickle.loads(value)
- return obj
- def secret_hash(self, name: str | bytes) -> str:
- """Creates a hash of the argument ``name``. The hash value is formed
- from the ``name`` combined with the :py:obj:`password
- <ExpireCacheCfg.password>`. Can be used, for example, to make the
- ``key`` stored in the DB unreadable for third parties."""
- if isinstance(name, str):
- name = bytes(name, encoding='utf-8')
- m = hmac.new(name + self.cfg.password, digestmod='sha256')
- return m.hexdigest()
- class ExpireCacheSQLite(sqlitedb.SQLiteAppl, ExpireCache):
- """Cache that manages key/value pairs in a SQLite DB. The DB model in the
- SQLite DB is implemented in abstract class :py:obj:`SQLiteAppl
- <searx.sqlitedb.SQLiteAppl>`.
- The following configurations are required / supported:
- - :py:obj:`ExpireCacheCfg.db_url`
- - :py:obj:`ExpireCacheCfg.MAXHOLD_TIME`
- - :py:obj:`ExpireCacheCfg.MAINTENANCE_PERIOD`
- - :py:obj:`ExpireCacheCfg.MAINTENANCE_MODE`
- """
- DB_SCHEMA = 1
- # The key/value tables will be created on demand by self.create_table
- DDL_CREATE_TABLES = {}
- CACHE_TABLE_PREFIX = "CACHE-TABLE-"
- def __init__(self, cfg: ExpireCacheCfg):
- """An instance of the SQLite expire cache is build up from a
- :py:obj:`config <ExpireCacheCfg>`."""
- self.cfg = cfg
- if cfg.db_url == ":memory:":
- log.critical("don't use SQLite DB in :memory: in production!!")
- super().__init__(cfg.db_url)
- def init(self, conn: sqlite3.Connection) -> bool:
- ret_val = super().init(conn)
- if not ret_val:
- return False
- new = hashlib.sha256(self.cfg.password).hexdigest()
- old = self.properties(self.hash_token)
- if old != new:
- if old is not None:
- log.warning("[%s] hash token changed: truncate all cache tables", self.cfg.name)
- self.maintenance(force=True, truncate=True)
- self.properties.set(self.hash_token, new)
- return True
- def maintenance(self, force: bool = False, truncate: bool = False) -> bool:
- if not force and int(time.time()) < self.next_maintenance_time:
- # log.debug("no maintenance required yet, next maintenance interval is in the future")
- return False
- # Prevent parallel DB maintenance cycles from other DB connections
- # (e.g. in multi thread or process environments).
- self.properties.set("LAST_MAINTENANCE", "") # hint: this (also) sets the m_time of the property!
- if truncate:
- self.truncate_tables(self.table_names)
- return True
- # drop items by expire time stamp ..
- expire = int(time.time())
- with self.connect() as conn:
- for table in self.table_names:
- res = conn.execute(f"DELETE FROM {table} WHERE expire < ?", (expire,))
- log.debug("deleted %s keys from table %s (expire date reached)", res.rowcount, table)
- # Vacuuming the WALs
- # https://www.theunterminatedstring.com/sqlite-vacuuming/
- conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
- conn.close()
- return True
- def create_table(self, table: str) -> bool:
- """Create DB ``table`` if it has not yet been created, no recreates are
- initiated if the table already exists.
- """
- if table in self.table_names:
- # log.debug("key/value table %s exists in DB (no need to recreate)", table)
- return False
- log.info("key/value table '%s' NOT exists in DB -> create DB table ..", table)
- sql_table = "\n".join(
- [
- f"CREATE TABLE IF NOT EXISTS {table} (",
- " key TEXT,",
- " value BLOB,",
- f" expire INTEGER DEFAULT (strftime('%s', 'now') + {self.cfg.MAXHOLD_TIME}),",
- "PRIMARY KEY (key))",
- ]
- )
- sql_index = f"CREATE INDEX IF NOT EXISTS index_expire_{table} ON {table}(expire);"
- with self.connect() as conn:
- conn.execute(sql_table)
- conn.execute(sql_index)
- conn.close()
- self.properties.set(f"{self.CACHE_TABLE_PREFIX}-{table}", table)
- return True
- @property
- def table_names(self) -> list[str]:
- """List of key/value tables already created in the DB."""
- sql = f"SELECT value FROM properties WHERE name LIKE '{self.CACHE_TABLE_PREFIX}%%'"
- rows = self.DB.execute(sql).fetchall() or []
- return [r[0] for r in rows]
- def truncate_tables(self, table_names: list[str]):
- log.debug("truncate table: %s", ",".join(table_names))
- with self.connect() as conn:
- for table in table_names:
- conn.execute(f"DELETE FROM {table}")
- conn.close()
- return True
- @property
- def next_maintenance_time(self) -> int:
- """Returns (unix epoch) time of the next maintenance."""
- return self.cfg.MAINTENANCE_PERIOD + self.properties.m_time("LAST_MAINTENANCE", int(time.time()))
- # implement ABC methods of ExpireCache
- def set(self, key: str, value: typing.Any, expire: int | None, ctx: str | None = None) -> bool:
- """Set key/value in DB table given by argument ``ctx``. If expire is
- unset the default is taken from :py:obj:`ExpireCacheCfg.MAXHOLD_TIME`.
- If ``ctx`` argument is ``None`` (the default), a table name is
- generated from the :py:obj:`ExpireCacheCfg.name`. If DB table does not
- exists, it will be created (on demand) by :py:obj:`self.create_table
- <ExpireCacheSQLite.create_table>`.
- """
- table = ctx
- self.maintenance()
- value = self.serialize(value=value)
- if len(value) > self.cfg.MAX_VALUE_LEN:
- log.warning("ExpireCache.set(): %s.key='%s' - value too big to cache (len: %s) ", table, value, len(value))
- return False
- if not expire:
- expire = self.cfg.MAXHOLD_TIME
- expire = int(time.time()) + expire
- table_name = table
- if not table_name:
- table_name = self.normalize_name(self.cfg.name)
- self.create_table(table_name)
- sql = (
- f"INSERT INTO {table_name} (key, value, expire) VALUES (?, ?, ?)"
- f" ON CONFLICT DO "
- f"UPDATE SET value=?, expire=?"
- )
- if table:
- with self.DB:
- self.DB.execute(sql, (key, value, expire, value, expire))
- else:
- with self.connect() as conn:
- conn.execute(sql, (key, value, expire, value, expire))
- conn.close()
- return True
- def get(self, key: str, default=None, ctx: str | None = None) -> typing.Any:
- """Get value of ``key`` from table given by argument ``ctx``. If
- ``ctx`` argument is ``None`` (the default), a table name is generated
- from the :py:obj:`ExpireCacheCfg.name`. If ``key`` not exists (in
- table), the ``default`` value is returned.
- """
- table = ctx
- self.maintenance()
- if not table:
- table = self.normalize_name(self.cfg.name)
- if table not in self.table_names:
- return default
- sql = f"SELECT value FROM {table} WHERE key = ?"
- row = self.DB.execute(sql, (key,)).fetchone()
- if row is None:
- return default
- return self.deserialize(row[0])
- def state(self) -> ExpireCacheStats:
- cached_items = {}
- for table in self.table_names:
- cached_items[table] = []
- for row in self.DB.execute(f"SELECT key, value, expire FROM {table}"):
- cached_items[table].append((row[0], self.deserialize(row[1]), row[2]))
- return ExpireCacheStats(cached_items=cached_items)
|