Browse Source

[feat] implement mariadb engine

Grant Lanham 7 months ago
parent
commit
2a29e16d25

+ 79 - 0
searx/engines/mariadb_server.py

@@ -0,0 +1,79 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+"""MariaDB is a community driven fork of MySQL. Before enabling MariaDB engine,
+you must the install the pip package ``mariadb`` along with the necessary
+prerequities.
+
+`See the following documentation for more details
+<https://mariadb.com/docs/server/connect/programming-languages/c/install/>`_
+
+Example
+=======
+
+This is an example configuration for querying a MariaDB server:
+
+.. code:: yaml
+
+   - name: my_database
+     engine: mariadb_server
+     database: my_database
+     username: searxng
+     password: password
+     limit: 5
+     query_str: 'SELECT * from my_table WHERE my_column=%(query)s'
+
+"""
+
+from typing import TYPE_CHECKING
+
+try:
+    import mariadb
+except ImportError:
+    # import error is ignored because the admin has to install mysql manually to use
+    # the engine
+    pass
+
+if TYPE_CHECKING:
+    import logging
+
+    logger = logging.getLogger()
+
+
+engine_type = 'offline'
+host = "127.0.0.1"
+port = 3306
+database = ""
+username = ""
+password = ""
+query_str = ""
+limit = 10
+paging = True
+result_template = 'key-value.html'
+_connection = None
+
+
+def init(engine_settings):
+    global _connection  # pylint: disable=global-statement
+
+    if 'query_str' not in engine_settings:
+        raise ValueError('query_str cannot be empty')
+
+    if not engine_settings['query_str'].lower().startswith('select '):
+        raise ValueError('only SELECT query is supported')
+
+    _connection = mariadb.connect(database=database, user=username, password=password, host=host, port=port)
+
+
+def search(query, params):
+    query_params = {'query': query}
+    query_to_run = query_str + ' LIMIT {0} OFFSET {1}'.format(limit, (params['pageno'] - 1) * limit)
+    logger.debug("SQL Query: %s", query_to_run)
+
+    with _connection.cursor() as cur:
+        cur.execute(query_to_run, query_params)
+        results = []
+        col_names = [i[0] for i in cur.description]
+        for res in cur:
+            result = dict(zip(col_names, map(str, res)))
+            result['template'] = result_template
+            results.append(result)
+        return results

+ 10 - 0
searx/settings.yml

@@ -2031,6 +2031,16 @@ engines:
   #    query_str: 'SELECT * from mytable WHERE fieldname=%(query)s'
   #    shortcut: mysql
 
+  # Required dependency: mariadb
+  #  - name: mariadb
+  #    engine: mariadb_server
+  #    database: mydatabase
+  #    username: user
+  #    password: pass
+  #    limit: 10
+  #    query_str: 'SELECT * from mytable WHERE fieldname=%(query)s'
+  #    shortcut: mdb
+
   - name: 1337x
     engine: 1337x
     shortcut: 1337x

+ 44 - 0
tests/unit/test_engine_mariadb_server.py

@@ -0,0 +1,44 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+# pylint: disable=missing-module-docstring
+
+from unittest.mock import MagicMock, Mock
+from searx.engines import load_engines, mariadb_server
+from tests import SearxTestCase
+
+
+class MariadbServerTests(SearxTestCase):  # pylint: disable=missing-class-docstring
+    def setUp(self):
+        load_engines(
+            [
+                {
+                    'name': 'mariadb server',
+                    'engine': 'mariadb_server',
+                    'shortcut': 'mdb',
+                    'timeout': 9.0,
+                    'disabled': True,
+                }
+            ]
+        )
+
+    def tearDown(self):
+        load_engines([])
+
+    def test_init_no_query_str_raises(self):
+        self.assertRaises(ValueError, lambda: mariadb_server.init({}))
+
+    def test_init_non_select_raises(self):
+        self.assertRaises(ValueError, lambda: mariadb_server.init({'query_str': 'foobar'}))
+
+    def test_search_returns_results(self):
+        test_string = 'FOOBAR'
+        cursor_mock = MagicMock()
+        with cursor_mock as setup:  # pylint: disable=not-context-manager
+            setup.__iter__ = Mock(return_value=iter([{test_string, 1}]))
+            setup.description = [[test_string]]
+        conn_mock = Mock()
+        conn_mock.cursor.return_value = cursor_mock
+        mariadb_server._connection = conn_mock  # pylint: disable=protected-access
+        results = mariadb_server.search(test_string, {'pageno': 1})
+        self.assertEqual(1, len(results))
+        self.assertIn(test_string, results[0])
+        self.assertEqual(mariadb_server.result_template, results[0]['template'])

+ 0 - 0
tests/unit/test_tineye.py → tests/unit/test_engine_tineye.py