| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 | # SPDX-License-Identifier: AGPL-3.0-or-later"""MySQL is said to be the most popular open source database.  Before enablingMySQL engine, you must install the package ``mysql-connector-python``.The authentication plugin is configurable by setting ``auth_plugin`` in theattributes.  By default it is set to ``caching_sha2_password``.Example=======This is an example configuration for querying a MySQL server:.. code:: yaml   - name: my_database     engine: mysql_server     database: my_database     username: searxng     password: password     limit: 5     query_str: 'SELECT * from my_table WHERE my_column=%(query)s'Implementations==============="""try:    import mysql.connector  # type: ignoreexcept ImportError:    # import error is ignored because the admin has to install mysql manually to use    # the engine    passengine_type = 'offline'auth_plugin = 'caching_sha2_password'host = "127.0.0.1"port = 3306database = ""username = ""password = ""query_str = ""limit = 10paging = Trueresult_template = 'key-value.html'_connection = Nonedef init(engine_settings):    global _connection  # pylint: disable=global-statement    if 'query_str' not in engine_settings:        raise ValueError('query_str cannot be empty')    if not engine_settings['query_str'].lower().startswith('select '):        raise ValueError('only SELECT query is supported')    _connection = mysql.connector.connect(        database=database,        user=username,        password=password,        host=host,        port=port,        auth_plugin=auth_plugin,    )def search(query, params):    query_params = {'query': query}    query_to_run = query_str + ' LIMIT {0} OFFSET {1}'.format(limit, (params['pageno'] - 1) * limit)    with _connection.cursor() as cur:        cur.execute(query_to_run, query_params)        return _fetch_results(cur)def _fetch_results(cur):    results = []    for res in cur:        result = dict(zip(cur.column_names, map(str, res)))        result['template'] = result_template        results.append(result)    return results
 |