# SPDX-License-Identifier: AGPL-3.0-or-later
"""Implementations to make access to SQLite databases a little more convenient.
:py:obj:`SQLiteAppl`
Abstract class with which DB applications can be implemented.
:py:obj:`SQLiteProperties`:
Class to manage properties stored in a database.
Examplarical implementations based on :py:obj:`SQLiteAppl`:
:py:obj:`searx.cache.ExpireCacheSQLite` :
Cache that manages key/value pairs in a SQLite DB, in which the key/value
pairs are deleted after an "expire" time. This type of cache is used, for
example, for the engines, see :py:obj:`searx.enginelib.EngineCache`.
:py:obj:`searx.favicons.cache.FaviconCacheSQLite` :
Favicon cache that manages the favicon BLOBs in a SQLite DB.
----
"""
from __future__ import annotations
import abc
import datetime
import re
import sqlite3
import sys
import threading
import uuid
from searx import logger
logger = logger.getChild("sqlitedb")
THREAD_LOCAL = threading.local()
[docs]
class DBSession:
"""A *thead-local* DB session"""
[docs]
@classmethod
def get_connect(cls, app: SQLiteAppl) -> sqlite3.Connection:
"""Returns a thread local DB connection. The connection is only
established once per thread.
"""
if getattr(THREAD_LOCAL, "DBSession_map", None) is None:
THREAD_LOCAL.DBSession_map = {}
session = THREAD_LOCAL.DBSession_map.get(app.db_url)
if session is None:
session = cls(app)
return session.conn
def __init__(self, app: SQLiteAppl):
self.uuid = uuid.uuid4()
self.app = app
self._conn = None
# self.__del__ will be called, when thread ends
if getattr(THREAD_LOCAL, "DBSession_map", None) is None:
THREAD_LOCAL.DBSession_map = {}
THREAD_LOCAL.DBSession_map[self.app.db_url] = self
@property
def conn(self) -> sqlite3.Connection:
msg = f"[{threading.current_thread().ident}] DBSession: " f"{self.app.__class__.__name__}({self.app.db_url})"
if self._conn is None:
self._conn = self.app.connect()
logger.debug("%s --> created new connection", msg)
# else:
# logger.debug("%s --> already connected", msg)
return self._conn
def __del__(self):
try:
if self._conn is not None:
# HINT: Don't use Python's logging facility in a destructor, it
# will produce error reports when python aborts the process or
# thread, because at this point objects that the logging module
# needs, do not exist anymore.
# msg = f"DBSession: close [{self.uuid}] {self.app.__class__.__name__}({self.app.db_url})"
# logger.debug(msg)
self._conn.close()
except Exception: # pylint: disable=broad-exception-caught
pass
[docs]
class SQLiteAppl(abc.ABC):
"""Abstract base class for implementing convenient DB access in SQLite
applications. In the constructor, a :py:obj:`SQLiteProperties` instance is
already aggregated under ``self.properties``."""
DDL_CREATE_TABLES: dict[str, str] = {}
DB_SCHEMA: int = 1
"""As soon as changes are made to the DB schema, the version number must be
increased. Changes to the version number require the DB to be recreated (or
migrated / if an migration path exists and is implemented)."""
SQLITE_THREADING_MODE = {
0: "single-thread",
1: "multi-thread",
3: "serialized"}[sqlite3.threadsafety] # fmt:skip
"""Threading mode of the SQLite library. Depends on the options used at
compile time and is different for different distributions and architectures.
Possible values are 0:``single-thread``, 1:``multi-thread``,
3:``serialized`` (see :py:obj:`sqlite3.threadsafety`). Pre- Python 3.11
this value was hard coded to 1.
Depending on this value, optimizations are made, e.g. in “serialized” mode
it is not necessary to create a separate DB connector for each thread.
"""
SQLITE_JOURNAL_MODE = "WAL"
"""``SQLiteAppl`` applications are optimzed for WAL_ mode, its not recommend
to change the journal mode (see :py:obj:`SQLiteAppl.tear_down`).
.. _WAL: https://sqlite.org/wal.html
"""
SQLITE_CONNECT_ARGS = {
# "timeout": 5.0,
# "detect_types": 0,
"check_same_thread": bool(SQLITE_THREADING_MODE != "serialized"),
"cached_statements": 0, # https://github.com/python/cpython/issues/118172
# "uri": False,
"isolation_level": None,
} # fmt:skip
"""Connection arguments (:py:obj:`sqlite3.connect`)
``check_same_thread``:
Is disabled by default when :py:obj:`SQLITE_THREADING_MODE` is
``serialized``. The check is more of a hindrance in this case because it
would prevent a DB connector from being used in multiple threads.
``cached_statements``:
Is set to ``0`` by default. Note: Python 3.12+ fetch result are not
consistent in multi-threading application and causing an API misuse error.
The multithreading use in SQLiteAppl is intended and supported if
threadsafety is set to 3 (aka "serialized"). CPython supports “serialized”
from version 3.12 on, but unfortunately only with errors:
- https://github.com/python/cpython/issues/118172
- https://github.com/python/cpython/issues/123873
The workaround for SQLite3 multithreading cache inconsistency ist to set
option ``cached_statements`` to ``0`` by default.
"""
def __init__(self, db_url):
self.db_url = db_url
self.properties = SQLiteProperties(db_url)
self._init_done = False
self._compatibility()
# atexit.register(self.tear_down)
# def tear_down(self):
# """:ref:`Vacuuming the WALs` upon normal interpreter termination
# (:py:obj:`atexit.register`).
# .. _SQLite: Vacuuming the WALs: https://www.theunterminatedstring.com/sqlite-vacuuming/
# """
# self.DB.execute("PRAGMA wal_checkpoint(TRUNCATE)")
def _compatibility(self):
if self.SQLITE_THREADING_MODE == "serialized":
self._DB = None
else:
msg = (
f"SQLite library is compiled with {self.SQLITE_THREADING_MODE} mode,"
" read https://docs.python.org/3/library/sqlite3.html#sqlite3.threadsafety"
)
if threading.active_count() > 1:
logger.error(msg)
else:
logger.warning(msg)
if sqlite3.sqlite_version_info <= (3, 35):
# See "Generalize UPSERT:" in https://sqlite.org/releaselog/3_35_0.html
logger.critical(
"SQLite runtime library version %s is not supported (require >= 3.35)", sqlite3.sqlite_version
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.Connection(self.db_url, **self.SQLITE_CONNECT_ARGS) # type: ignore
conn.execute(f"PRAGMA journal_mode={self.SQLITE_JOURNAL_MODE}")
self.register_functions(conn)
return conn
[docs]
def connect(self) -> sqlite3.Connection:
"""Creates a new DB connection (:py:obj:`SQLITE_CONNECT_ARGS`). If not
already done, the DB schema is set up. The caller must take care of
closing the resource. Alternatively, :py:obj:`SQLiteAppl.DB` can also
be used (the resource behind `self.DB` is automatically closed when the
process or thread is terminated).
"""
if sys.version_info < (3, 12):
# Prior Python 3.12 there is no "autocommit" option
self.SQLITE_CONNECT_ARGS.pop("autocommit", None)
msg = (
f"[{threading.current_thread().ident}] {self.__class__.__name__}({self.db_url})"
f" {self.SQLITE_CONNECT_ARGS} // {self.SQLITE_JOURNAL_MODE}"
)
logger.debug(msg)
with self._connect() as conn:
self.init(conn)
return conn
[docs]
def register_functions(self, conn):
"""Create user-defined_ SQL functions.
``REGEXP(<pattern>, <field>)`` : 0 | 1
`re.search`_ returns (int) 1 for a match and 0 for none match of
``<pattern>`` in ``<field>``.
.. code:: sql
SELECT '12' AS field WHERE REGEXP('^[0-9][0-9]$', field)
-- 12
SELECT REGEXP('[0-9][0-9]', 'X12Y')
-- 1
SELECT REGEXP('[0-9][0-9]', 'X1Y')
-- 0
.. _user-defined: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function
.. _deterministic: https://sqlite.org/deterministic.html
.. _re.search: https://docs.python.org/3/library/re.html#re.search
"""
conn.create_function("regexp", 2, lambda x, y: 1 if re.search(x, y) else 0, deterministic=True)
@property
def DB(self) -> sqlite3.Connection:
"""Provides a DB connection. The connection is a *singleton* and
therefore well suited for read access. If
:py:obj:`SQLITE_THREADING_MODE` is ``serialized`` only one DB connection
is created for all threads.
.. note::
For dedicated `transaction control`_, it is recommended to create a
new connection (:py:obj:`SQLiteAppl.connect`).
.. _transaction control:
https://docs.python.org/3/library/sqlite3.html#sqlite3-controlling-transactions
"""
conn = None
if self.SQLITE_THREADING_MODE == "serialized":
# Theoretically it is possible to reuse the DB cursor across threads
# as of Python 3.12, in practice the threading of the cursor seems
# to me a little faulty that I prefer to establish one connection
# per thread.
#
# may we can activate this code one day ..
# if self._DB is None:
# self._DB = self.connect()
# conn = self._DB
conn = DBSession.get_connect(self)
else:
conn = DBSession.get_connect(self)
# Since more than one instance of SQLiteAppl share the same DB
# connection, we need to make sure that each SQLiteAppl instance has run
# its init method at least once.
self.init(conn)
return conn
[docs]
def init(self, conn: sqlite3.Connection) -> bool:
"""Initializes the DB schema and properties, is only executed once even
if called several times.
If the initialization has not yet taken place, it is carried out and a
`True` is returned to the caller at the end. If the initialization has
already been carried out in the past, `False` is returned.
"""
if self._init_done:
return False
self._init_done = True
logger.debug("init DB: %s", self.db_url)
self.properties.init(conn)
ver = self.properties("DB_SCHEMA")
if ver is None:
with conn:
self.create_schema(conn)
else:
ver = int(ver)
if ver != self.DB_SCHEMA:
raise sqlite3.DatabaseError("Expected DB schema v%s, DB schema is v%s" % (self.DB_SCHEMA, ver))
logger.debug("DB_SCHEMA = %s", ver)
return True
def create_schema(self, conn: sqlite3.Connection):
logger.debug("create schema ..")
self.properties.set("DB_SCHEMA", self.DB_SCHEMA)
self.properties.set("LAST_MAINTENANCE", "")
with conn:
for table_name, sql in self.DDL_CREATE_TABLES.items():
conn.execute(sql)
self.properties.set(f"Table {table_name} created", table_name)
[docs]
class SQLiteProperties(SQLiteAppl):
"""Simple class to manage properties of a DB application in the DB. The
object has its own DB connection and transaction area.
.. code:: sql
CREATE TABLE IF NOT EXISTS properties (
name TEXT,
value TEXT,
m_time INTEGER DEFAULT (strftime('%s', 'now')),
PRIMARY KEY (name))
"""
SQLITE_JOURNAL_MODE = "WAL"
DDL_PROPERTIES = """\
CREATE TABLE IF NOT EXISTS properties (
name TEXT,
value TEXT,
m_time INTEGER DEFAULT (strftime('%s', 'now')), -- last modified (unix epoch) time in sec.
PRIMARY KEY (name))"""
"""Table to store properties of the DB application"""
SQL_GET = "SELECT value FROM properties WHERE name = ?"
SQL_M_TIME = "SELECT m_time FROM properties WHERE name = ?"
SQL_SET = (
"INSERT INTO properties (name, value) VALUES (?, ?)"
" ON CONFLICT(name) DO UPDATE"
" SET value=excluded.value, m_time=strftime('%s', 'now')"
)
SQL_DELETE = "DELETE FROM properties WHERE name = ?"
SQL_TABLE_EXISTS = (
"SELECT name FROM sqlite_master"
" WHERE type='table' AND name='properties'"
) # fmt:skip
SQLITE_CONNECT_ARGS = dict(SQLiteAppl.SQLITE_CONNECT_ARGS)
def __init__(self, db_url: str): # pylint: disable=super-init-not-called
self.db_url = db_url
self._init_done = False
self._compatibility()
[docs]
def init(self, conn: sqlite3.Connection) -> bool:
"""Initializes DB schema of the properties in the DB."""
if self._init_done:
return False
self._init_done = True
logger.debug("init properties of DB: %s", self.db_url)
res = conn.execute(self.SQL_TABLE_EXISTS)
if res.fetchone() is None: # DB schema needs to be be created
self.create_schema(conn)
return True
def __call__(self, name: str, default=None):
"""Returns the value of the property ``name`` or ``default`` if property
not exists in DB."""
res = self.DB.execute(self.SQL_GET, (name,)).fetchone()
if res is None:
return default
return res[0]
[docs]
def set(self, name: str, value: str | int):
"""Set ``value`` of property ``name`` in DB. If property already
exists, update the ``m_time`` (and the value)."""
with self.DB:
self.DB.execute(self.SQL_SET, (name, value))
[docs]
def delete(self, name: str) -> int:
"""Delete of property ``name`` from DB."""
with self.DB:
cur = self.DB.execute(self.SQL_DELETE, (name,))
return cur.rowcount
[docs]
def row(self, name: str, default=None):
"""Returns the DB row of property ``name`` or ``default`` if property
not exists in DB."""
res = self.DB.execute("SELECT * FROM properties WHERE name = ?", (name,))
row = res.fetchone()
if row is None:
return default
col_names = [column[0] for column in row.description]
return dict(zip(col_names, row))
[docs]
def m_time(self, name: str, default: int = 0) -> int:
"""Last modification time of this property."""
res = self.DB.execute(self.SQL_M_TIME, (name,))
row = res.fetchone()
if row is None:
return default
return int(row[0])
def create_schema(self, conn):
with conn:
conn.execute(self.DDL_PROPERTIES)
def __str__(self) -> str:
lines = []
for row in self.DB.execute("SELECT name, value, m_time FROM properties"):
name, value, m_time = row
m_time = datetime.datetime.fromtimestamp(m_time).strftime("%Y-%m-%d %H:%M:%S")
lines.append(f"[last modified: {m_time}] {name:20s}: {value}")
return "\n".join(lines)