sqlite.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # lint: pylint
  3. """SQLite database (Offline)
  4. """
  5. import sqlite3
  6. import contextlib
  7. engine_type = 'offline'
  8. database = ""
  9. query_str = ""
  10. limit = 10
  11. paging = True
  12. result_template = 'key-value.html'
  13. def init(engine_settings):
  14. if 'query_str' not in engine_settings:
  15. raise ValueError('query_str cannot be empty')
  16. if not engine_settings['query_str'].lower().startswith('select '):
  17. raise ValueError('only SELECT query is supported')
  18. @contextlib.contextmanager
  19. def sqlite_cursor():
  20. """Implements a `Context Manager`_ for a :py:obj:`sqlite3.Cursor`.
  21. Open database in read only mode: if the database doesn't exist.
  22. The default mode creates an empty file on the file system.
  23. see:
  24. * https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
  25. * https://www.sqlite.org/uri.html
  26. """
  27. global database # pylint: disable=global-statement
  28. uri = 'file:' + database + '?mode=ro'
  29. with contextlib.closing(sqlite3.connect(uri, uri=True)) as connect:
  30. connect.row_factory = sqlite3.Row
  31. with contextlib.closing(connect.cursor()) as cursor:
  32. yield cursor
  33. def search(query, params):
  34. global query_str, result_template # pylint: disable=global-statement
  35. results = []
  36. query_params = {
  37. 'query': query,
  38. 'wildcard': r'%' + query.replace(' ', r'%') + r'%',
  39. 'limit': limit,
  40. 'offset': (params['pageno'] - 1) * limit
  41. }
  42. query_to_run = query_str + ' LIMIT :limit OFFSET :offset'
  43. with sqlite_cursor() as cur:
  44. cur.execute(query_to_run, query_params)
  45. col_names = [cn[0] for cn in cur.description]
  46. for row in cur.fetchall():
  47. item = dict( zip(col_names, map(str, row)) )
  48. item['template'] = result_template
  49. logger.debug("append result --> %s", item)
  50. results.append(item)
  51. return results