| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 | # SPDX-License-Identifier: AGPL-3.0-or-later"""With *command engines* administrators can run engines to integrate arbitraryshell commands... attention::   When creating and enabling a ``command`` engine on a public instance, you   must be careful to avoid leaking private data.The easiest solution is to limit the access by setting ``tokens`` as describedin section :ref:`private engines`.  The engine base is flexible.  Only yourimagination can limit the power of this engine (and maybe security concerns).Configuration=============The following options are available:``command``:  A comma separated list of the elements of the command.  A special token  ``{{QUERY}}`` tells where to put the search terms of the user. Example:  .. code:: yaml     ['ls', '-l', '-h', '{{QUERY}}']``delimiter``:  A mapping containing a delimiter ``char`` and the *titles* of each element in  ``keys``.``parse_regex``:  A dict containing the regular expressions for each result key.``query_type``:  The expected type of user search terms.  Possible values: ``path`` and  ``enum``.  ``path``:    Checks if the user provided path is inside the working directory.  If not,    the query is not executed.  ``enum``:    Is a list of allowed search terms.  If the user submits something which is    not included in the list, the query returns an error.``query_enum``:  A list containing allowed search terms if ``query_type`` is set to ``enum``.``working_dir``:  The directory where the command has to be executed.  Default: ``./``.``result_separator``:  The character that separates results. Default: ``\\n``.Example=======The example engine below can be used to find files with a specific name in theconfigured working directory:.. code:: yaml  - name: find    engine: command    command: ['find', '.', '-name', '{{QUERY}}']    query_type: path    shortcut: fnd    delimiter:        chars: ' '        keys: ['line']Implementations==============="""import refrom os.path import expanduser, isabs, realpath, commonprefixfrom shlex import split as shlex_splitfrom subprocess import Popen, PIPEfrom threading import Threadfrom searx import loggerengine_type = 'offline'paging = Truecommand = []delimiter = {}parse_regex = {}query_type = ''query_enum = []environment_variables = {}working_dir = realpath('.')result_separator = '\n'result_template = 'key-value.html'timeout = 4.0_command_logger = logger.getChild('command')_compiled_parse_regex = {}def init(engine_settings):    check_parsing_options(engine_settings)    if 'command' not in engine_settings:        raise ValueError('engine command : missing configuration key: command')    global command, working_dir, delimiter, parse_regex, environment_variables    command = engine_settings['command']    if 'working_dir' in engine_settings:        working_dir = engine_settings['working_dir']        if not isabs(engine_settings['working_dir']):            working_dir = realpath(working_dir)    if 'parse_regex' in engine_settings:        parse_regex = engine_settings['parse_regex']        for result_key, regex in parse_regex.items():            _compiled_parse_regex[result_key] = re.compile(regex, flags=re.MULTILINE)    if 'delimiter' in engine_settings:        delimiter = engine_settings['delimiter']    if 'environment_variables' in engine_settings:        environment_variables = engine_settings['environment_variables']def search(query, params):    cmd = _get_command_to_run(query)    if not cmd:        return []    results = []    reader_thread = Thread(target=_get_results_from_process, args=(results, cmd, params['pageno']))    reader_thread.start()    reader_thread.join(timeout=timeout)    return resultsdef _get_command_to_run(query):    params = shlex_split(query)    __check_query_params(params)    cmd = []    for c in command:        if c == '{{QUERY}}':            cmd.extend(params)        else:            cmd.append(c)    return cmddef _get_results_from_process(results, cmd, pageno):    leftover = ''    count = 0    start, end = __get_results_limits(pageno)    with Popen(cmd, stdout=PIPE, stderr=PIPE, env=environment_variables) as process:        line = process.stdout.readline()        while line:            buf = leftover + line.decode('utf-8')            raw_results = buf.split(result_separator)            if raw_results[-1]:                leftover = raw_results[-1]            raw_results = raw_results[:-1]            for raw_result in raw_results:                result = __parse_single_result(raw_result)                if result is None:                    _command_logger.debug('skipped result:', raw_result)                    continue                if start <= count and count <= end:                    result['template'] = result_template                    results.append(result)                count += 1                if end < count:                    return results            line = process.stdout.readline()        return_code = process.wait(timeout=timeout)        if return_code != 0:            raise RuntimeError('non-zero return code when running command', cmd, return_code)def __get_results_limits(pageno):    start = (pageno - 1) * 10    end = start + 9    return start, enddef __check_query_params(params):    if not query_type:        return    if query_type == 'path':        query_path = params[-1]        query_path = expanduser(query_path)        if commonprefix([realpath(query_path), working_dir]) != working_dir:            raise ValueError('requested path is outside of configured working directory')    elif query_type == 'enum' and len(query_enum) > 0:        for param in params:            if param not in query_enum:                raise ValueError('submitted query params is not allowed', param, 'allowed params:', query_enum)def check_parsing_options(engine_settings):    """Checks if delimiter based parsing or regex parsing is configured correctly"""    if 'delimiter' not in engine_settings and 'parse_regex' not in engine_settings:        raise ValueError('failed to init settings for parsing lines: missing delimiter or parse_regex')    if 'delimiter' in engine_settings and 'parse_regex' in engine_settings:        raise ValueError('failed to init settings for parsing lines: too many settings')    if 'delimiter' in engine_settings:        if 'chars' not in engine_settings['delimiter'] or 'keys' not in engine_settings['delimiter']:            raise ValueErrordef __parse_single_result(raw_result):    """Parses command line output based on configuration"""    result = {}    if delimiter:        elements = raw_result.split(delimiter['chars'], maxsplit=len(delimiter['keys']) - 1)        if len(elements) != len(delimiter['keys']):            return {}        for i in range(len(elements)):            result[delimiter['keys'][i]] = elements[i]    if parse_regex:        for result_key, regex in _compiled_parse_regex.items():            found = regex.search(raw_result)            if not found:                return {}            result[result_key] = raw_result[found.start() : found.end()]    return result
 |