| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | # SPDX-License-Identifier: AGPL-3.0-or-later# pylint: disable=missing-module-docstring, invalid-nameimport typingimport inspectfrom json import JSONDecodeErrorfrom urllib.parse import urlparsefrom httpx import HTTPError, HTTPStatusErrorfrom searx.exceptions import (    SearxXPathSyntaxException,    SearxEngineXPathException,    SearxEngineAPIException,    SearxEngineAccessDeniedException,)from searx import searx_parent_dir, settingsfrom searx.engines import engineserrors_per_engines = {}class ErrorContext:  # pylint: disable=missing-class-docstring    __slots__ = (        'filename',        'function',        'line_no',        'code',        'exception_classname',        'log_message',        'log_parameters',        'secondary',    )    def __init__(  # pylint: disable=too-many-arguments        self, filename, function, line_no, code, exception_classname, log_message, log_parameters, secondary    ):        self.filename = filename        self.function = function        self.line_no = line_no        self.code = code        self.exception_classname = exception_classname        self.log_message = log_message        self.log_parameters = log_parameters        self.secondary = secondary    def __eq__(self, o) -> bool:  # pylint: disable=invalid-name        if not isinstance(o, ErrorContext):            return False        return (            self.filename == o.filename            and self.function == o.function            and self.line_no == o.line_no            and self.code == o.code            and self.exception_classname == o.exception_classname            and self.log_message == o.log_message            and self.log_parameters == o.log_parameters            and self.secondary == o.secondary        )    def __hash__(self):        return hash(            (                self.filename,                self.function,                self.line_no,                self.code,                self.exception_classname,                self.log_message,                self.log_parameters,                self.secondary,            )        )    def __repr__(self):        return "ErrorContext({!r}, {!r}, {!r}, {!r}, {!r}, {!r}) {!r}".format(            self.filename,            self.line_no,            self.code,            self.exception_classname,            self.log_message,            self.log_parameters,            self.secondary,        )def add_error_context(engine_name: str, error_context: ErrorContext) -> None:    errors_for_engine = errors_per_engines.setdefault(engine_name, {})    errors_for_engine[error_context] = errors_for_engine.get(error_context, 0) + 1    engines[engine_name].logger.warning('%s', str(error_context))def get_trace(traces):    for trace in reversed(traces):        split_filename = trace.filename.split('/')        if '/'.join(split_filename[-3:-1]) == 'searx/engines':            return trace        if '/'.join(split_filename[-4:-1]) == 'searx/search/processors':            return trace    return traces[-1]def get_hostname(exc: HTTPError) -> typing.Optional[None]:    url = exc.request.url    if url is None and exc.response is not None:        url = exc.response.url    return urlparse(url).netlocdef get_request_exception_messages(    exc: HTTPError,) -> typing.Tuple[typing.Optional[str], typing.Optional[str], typing.Optional[str]]:    url = None    status_code = None    reason = None    hostname = None    if hasattr(exc, '_request') and exc._request is not None:  # pylint: disable=protected-access        # exc.request is property that raise an RuntimeException        # if exc._request is not defined.        url = exc.request.url    if url is None and hasattr(exc, 'response') and exc.response is not None:        url = exc.response.url    if url is not None:        hostname = url.host    if isinstance(exc, HTTPStatusError):        status_code = str(exc.response.status_code)        reason = exc.response.reason_phrase    return (status_code, reason, hostname)def get_messages(exc, filename) -> typing.Tuple:  # pylint: disable=too-many-return-statements    if isinstance(exc, JSONDecodeError):        return (exc.msg,)    if isinstance(exc, TypeError):        return (str(exc),)    if isinstance(exc, ValueError) and 'lxml' in filename:        return (str(exc),)    if isinstance(exc, HTTPError):        return get_request_exception_messages(exc)    if isinstance(exc, SearxXPathSyntaxException):        return (exc.xpath_str, exc.message)    if isinstance(exc, SearxEngineXPathException):        return (exc.xpath_str, exc.message)    if isinstance(exc, SearxEngineAPIException):        return (str(exc.args[0]),)    if isinstance(exc, SearxEngineAccessDeniedException):        return (exc.message,)    return ()def get_exception_classname(exc: Exception) -> str:    exc_class = exc.__class__    exc_name = exc_class.__qualname__    exc_module = exc_class.__module__    if exc_module is None or exc_module == str.__class__.__module__:        return exc_name    return exc_module + '.' + exc_namedef get_error_context(framerecords, exception_classname, log_message, log_parameters, secondary) -> ErrorContext:    searx_frame = get_trace(framerecords)    filename = searx_frame.filename    if filename.startswith(searx_parent_dir):        filename = filename[len(searx_parent_dir) + 1 :]    function = searx_frame.function    line_no = searx_frame.lineno    code = searx_frame.code_context[0].strip()    del framerecords    return ErrorContext(filename, function, line_no, code, exception_classname, log_message, log_parameters, secondary)def count_exception(engine_name: str, exc: Exception, secondary: bool = False) -> None:    if not settings['general']['enable_metrics']:        return    framerecords = inspect.trace()    try:        exception_classname = get_exception_classname(exc)        log_parameters = get_messages(exc, framerecords[-1][1])        error_context = get_error_context(framerecords, exception_classname, None, log_parameters, secondary)        add_error_context(engine_name, error_context)    finally:        del framerecordsdef count_error(    engine_name: str, log_message: str, log_parameters: typing.Optional[typing.Tuple] = None, secondary: bool = False) -> None:    if not settings['general']['enable_metrics']:        return    framerecords = list(reversed(inspect.stack()[1:]))    try:        error_context = get_error_context(framerecords, None, log_message, log_parameters or (), secondary)        add_error_context(engine_name, error_context)    finally:        del framerecords
 |