Source code for searx.sqlitedb

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Implementations to make access to SQLite databases a little more convenient.

:py:obj:`SQLiteAppl`
  Abstract class with which DB applications can be implemented.

:py:obj:`SQLiteProperties`:
  Class to manage properties stored in a database.

Examplarical implementations based on :py:obj:`SQLiteAppl`:

:py:obj:`searx.cache.ExpireCacheSQLite` :
  Cache that manages key/value pairs in a SQLite DB, in which the key/value
  pairs are deleted after an "expire" time.  This type of cache is used, for
  example, for the engines, see :py:obj:`searx.enginelib.EngineCache`.

:py:obj:`searx.favicons.cache.FaviconCacheSQLite` :
  Favicon cache that manages the favicon BLOBs in a SQLite DB.

----
"""
from __future__ import annotations

import abc
import datetime
import re
import sqlite3
import sys
import threading
import uuid

from searx import logger

logger = logger.getChild("sqlitedb")

THREAD_LOCAL = threading.local()


[docs] class DBSession: """A *thead-local* DB session"""
[docs] @classmethod def get_connect(cls, app: SQLiteAppl) -> sqlite3.Connection: """Returns a thread local DB connection. The connection is only established once per thread. """ if getattr(THREAD_LOCAL, "DBSession_map", None) is None: THREAD_LOCAL.DBSession_map = {} session = THREAD_LOCAL.DBSession_map.get(app.db_url) if session is None: session = cls(app) return session.conn
def __init__(self, app: SQLiteAppl): self.uuid = uuid.uuid4() self.app = app self._conn = None # self.__del__ will be called, when thread ends if getattr(THREAD_LOCAL, "DBSession_map", None) is None: THREAD_LOCAL.DBSession_map = {} THREAD_LOCAL.DBSession_map[self.app.db_url] = self @property def conn(self) -> sqlite3.Connection: msg = f"[{threading.current_thread().ident}] DBSession: " f"{self.app.__class__.__name__}({self.app.db_url})" if self._conn is None: self._conn = self.app.connect() logger.debug("%s --> created new connection", msg) # else: # logger.debug("%s --> already connected", msg) return self._conn def __del__(self): try: if self._conn is not None: # HINT: Don't use Python's logging facility in a destructor, it # will produce error reports when python aborts the process or # thread, because at this point objects that the logging module # needs, do not exist anymore. # msg = f"DBSession: close [{self.uuid}] {self.app.__class__.__name__}({self.app.db_url})" # logger.debug(msg) self._conn.close() except Exception: # pylint: disable=broad-exception-caught pass
[docs] class SQLiteAppl(abc.ABC): """Abstract base class for implementing convenient DB access in SQLite applications. In the constructor, a :py:obj:`SQLiteProperties` instance is already aggregated under ``self.properties``.""" DDL_CREATE_TABLES: dict[str, str] = {} DB_SCHEMA: int = 1 """As soon as changes are made to the DB schema, the version number must be increased. Changes to the version number require the DB to be recreated (or migrated / if an migration path exists and is implemented).""" SQLITE_THREADING_MODE = { 0: "single-thread", 1: "multi-thread", 3: "serialized"}[sqlite3.threadsafety] # fmt:skip """Threading mode of the SQLite library. Depends on the options used at compile time and is different for different distributions and architectures. Possible values are 0:``single-thread``, 1:``multi-thread``, 3:``serialized`` (see :py:obj:`sqlite3.threadsafety`). Pre- Python 3.11 this value was hard coded to 1. Depending on this value, optimizations are made, e.g. in “serialized” mode it is not necessary to create a separate DB connector for each thread. """ SQLITE_JOURNAL_MODE = "WAL" """``SQLiteAppl`` applications are optimzed for WAL_ mode, its not recommend to change the journal mode (see :py:obj:`SQLiteAppl.tear_down`). .. _WAL: https://sqlite.org/wal.html """ SQLITE_CONNECT_ARGS = { # "timeout": 5.0, # "detect_types": 0, "check_same_thread": bool(SQLITE_THREADING_MODE != "serialized"), "cached_statements": 0, # https://github.com/python/cpython/issues/118172 # "uri": False, "isolation_level": None, } # fmt:skip """Connection arguments (:py:obj:`sqlite3.connect`) ``check_same_thread``: Is disabled by default when :py:obj:`SQLITE_THREADING_MODE` is ``serialized``. The check is more of a hindrance in this case because it would prevent a DB connector from being used in multiple threads. ``cached_statements``: Is set to ``0`` by default. Note: Python 3.12+ fetch result are not consistent in multi-threading application and causing an API misuse error. The multithreading use in SQLiteAppl is intended and supported if threadsafety is set to 3 (aka "serialized"). CPython supports “serialized” from version 3.12 on, but unfortunately only with errors: - https://github.com/python/cpython/issues/118172 - https://github.com/python/cpython/issues/123873 The workaround for SQLite3 multithreading cache inconsistency ist to set option ``cached_statements`` to ``0`` by default. """ def __init__(self, db_url): self.db_url = db_url self.properties = SQLiteProperties(db_url) self._init_done = False self._compatibility() # atexit.register(self.tear_down) # def tear_down(self): # """:ref:`Vacuuming the WALs` upon normal interpreter termination # (:py:obj:`atexit.register`). # .. _SQLite: Vacuuming the WALs: https://www.theunterminatedstring.com/sqlite-vacuuming/ # """ # self.DB.execute("PRAGMA wal_checkpoint(TRUNCATE)") def _compatibility(self): if self.SQLITE_THREADING_MODE == "serialized": self._DB = None else: msg = ( f"SQLite library is compiled with {self.SQLITE_THREADING_MODE} mode," " read https://docs.python.org/3/library/sqlite3.html#sqlite3.threadsafety" ) if threading.active_count() > 1: logger.error(msg) else: logger.warning(msg) if sqlite3.sqlite_version_info <= (3, 35): # See "Generalize UPSERT:" in https://sqlite.org/releaselog/3_35_0.html logger.critical( "SQLite runtime library version %s is not supported (require >= 3.35)", sqlite3.sqlite_version ) def _connect(self) -> sqlite3.Connection: conn = sqlite3.Connection(self.db_url, **self.SQLITE_CONNECT_ARGS) # type: ignore conn.execute(f"PRAGMA journal_mode={self.SQLITE_JOURNAL_MODE}") self.register_functions(conn) return conn
[docs] def connect(self) -> sqlite3.Connection: """Creates a new DB connection (:py:obj:`SQLITE_CONNECT_ARGS`). If not already done, the DB schema is set up. The caller must take care of closing the resource. Alternatively, :py:obj:`SQLiteAppl.DB` can also be used (the resource behind `self.DB` is automatically closed when the process or thread is terminated). """ if sys.version_info < (3, 12): # Prior Python 3.12 there is no "autocommit" option self.SQLITE_CONNECT_ARGS.pop("autocommit", None) msg = ( f"[{threading.current_thread().ident}] {self.__class__.__name__}({self.db_url})" f" {self.SQLITE_CONNECT_ARGS} // {self.SQLITE_JOURNAL_MODE}" ) logger.debug(msg) with self._connect() as conn: self.init(conn) return conn
[docs] def register_functions(self, conn): """Create user-defined_ SQL functions. ``REGEXP(<pattern>, <field>)`` : 0 | 1 `re.search`_ returns (int) 1 for a match and 0 for none match of ``<pattern>`` in ``<field>``. .. code:: sql SELECT '12' AS field WHERE REGEXP('^[0-9][0-9]$', field) -- 12 SELECT REGEXP('[0-9][0-9]', 'X12Y') -- 1 SELECT REGEXP('[0-9][0-9]', 'X1Y') -- 0 .. _user-defined: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function .. _deterministic: https://sqlite.org/deterministic.html .. _re.search: https://docs.python.org/3/library/re.html#re.search """ conn.create_function("regexp", 2, lambda x, y: 1 if re.search(x, y) else 0, deterministic=True)
@property def DB(self) -> sqlite3.Connection: """Provides a DB connection. The connection is a *singleton* and therefore well suited for read access. If :py:obj:`SQLITE_THREADING_MODE` is ``serialized`` only one DB connection is created for all threads. .. note:: For dedicated `transaction control`_, it is recommended to create a new connection (:py:obj:`SQLiteAppl.connect`). .. _transaction control: https://docs.python.org/3/library/sqlite3.html#sqlite3-controlling-transactions """ conn = None if self.SQLITE_THREADING_MODE == "serialized": # Theoretically it is possible to reuse the DB cursor across threads # as of Python 3.12, in practice the threading of the cursor seems # to me a little faulty that I prefer to establish one connection # per thread. # # may we can activate this code one day .. # if self._DB is None: # self._DB = self.connect() # conn = self._DB conn = DBSession.get_connect(self) else: conn = DBSession.get_connect(self) # Since more than one instance of SQLiteAppl share the same DB # connection, we need to make sure that each SQLiteAppl instance has run # its init method at least once. self.init(conn) return conn
[docs] def init(self, conn: sqlite3.Connection) -> bool: """Initializes the DB schema and properties, is only executed once even if called several times. If the initialization has not yet taken place, it is carried out and a `True` is returned to the caller at the end. If the initialization has already been carried out in the past, `False` is returned. """ if self._init_done: return False self._init_done = True logger.debug("init DB: %s", self.db_url) self.properties.init(conn) ver = self.properties("DB_SCHEMA") if ver is None: with conn: self.create_schema(conn) else: ver = int(ver) if ver != self.DB_SCHEMA: raise sqlite3.DatabaseError("Expected DB schema v%s, DB schema is v%s" % (self.DB_SCHEMA, ver)) logger.debug("DB_SCHEMA = %s", ver) return True
def create_schema(self, conn: sqlite3.Connection): logger.debug("create schema ..") self.properties.set("DB_SCHEMA", self.DB_SCHEMA) self.properties.set("LAST_MAINTENANCE", "") with conn: for table_name, sql in self.DDL_CREATE_TABLES.items(): conn.execute(sql) self.properties.set(f"Table {table_name} created", table_name)
[docs] class SQLiteProperties(SQLiteAppl): """Simple class to manage properties of a DB application in the DB. The object has its own DB connection and transaction area. .. code:: sql CREATE TABLE IF NOT EXISTS properties ( name TEXT, value TEXT, m_time INTEGER DEFAULT (strftime('%s', 'now')), PRIMARY KEY (name)) """ SQLITE_JOURNAL_MODE = "WAL" DDL_PROPERTIES = """\ CREATE TABLE IF NOT EXISTS properties ( name TEXT, value TEXT, m_time INTEGER DEFAULT (strftime('%s', 'now')), -- last modified (unix epoch) time in sec. PRIMARY KEY (name))""" """Table to store properties of the DB application""" SQL_GET = "SELECT value FROM properties WHERE name = ?" SQL_M_TIME = "SELECT m_time FROM properties WHERE name = ?" SQL_SET = ( "INSERT INTO properties (name, value) VALUES (?, ?)" " ON CONFLICT(name) DO UPDATE" " SET value=excluded.value, m_time=strftime('%s', 'now')" ) SQL_DELETE = "DELETE FROM properties WHERE name = ?" SQL_TABLE_EXISTS = ( "SELECT name FROM sqlite_master" " WHERE type='table' AND name='properties'" ) # fmt:skip SQLITE_CONNECT_ARGS = dict(SQLiteAppl.SQLITE_CONNECT_ARGS) def __init__(self, db_url: str): # pylint: disable=super-init-not-called self.db_url = db_url self._init_done = False self._compatibility()
[docs] def init(self, conn: sqlite3.Connection) -> bool: """Initializes DB schema of the properties in the DB.""" if self._init_done: return False self._init_done = True logger.debug("init properties of DB: %s", self.db_url) res = conn.execute(self.SQL_TABLE_EXISTS) if res.fetchone() is None: # DB schema needs to be be created self.create_schema(conn) return True
def __call__(self, name: str, default=None): """Returns the value of the property ``name`` or ``default`` if property not exists in DB.""" res = self.DB.execute(self.SQL_GET, (name,)).fetchone() if res is None: return default return res[0]
[docs] def set(self, name: str, value: str | int): """Set ``value`` of property ``name`` in DB. If property already exists, update the ``m_time`` (and the value).""" with self.DB: self.DB.execute(self.SQL_SET, (name, value))
[docs] def delete(self, name: str) -> int: """Delete of property ``name`` from DB.""" with self.DB: cur = self.DB.execute(self.SQL_DELETE, (name,)) return cur.rowcount
[docs] def row(self, name: str, default=None): """Returns the DB row of property ``name`` or ``default`` if property not exists in DB.""" res = self.DB.execute("SELECT * FROM properties WHERE name = ?", (name,)) row = res.fetchone() if row is None: return default col_names = [column[0] for column in row.description] return dict(zip(col_names, row))
[docs] def m_time(self, name: str, default: int = 0) -> int: """Last modification time of this property.""" res = self.DB.execute(self.SQL_M_TIME, (name,)) row = res.fetchone() if row is None: return default return int(row[0])
def create_schema(self, conn): with conn: conn.execute(self.DDL_PROPERTIES) def __str__(self) -> str: lines = [] for row in self.DB.execute("SELECT name, value, m_time FROM properties"): name, value, m_time = row m_time = datetime.datetime.fromtimestamp(m_time).strftime("%Y-%m-%d %H:%M:%S") lines.append(f"[last modified: {m_time}] {name:20s}: {value}") return "\n".join(lines)