| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 | 
							- import typing
 
- import inspect
 
- from json import JSONDecodeError
 
- from urllib.parse import urlparse
 
- from httpx import HTTPError, HTTPStatusError
 
- from searx.exceptions import (SearxXPathSyntaxException, SearxEngineXPathException, SearxEngineAPIException,
 
-                               SearxEngineAccessDeniedException)
 
- from searx import logger, searx_parent_dir
 
- errors_per_engines = {}
 
- class ErrorContext:
 
-     __slots__ = ('filename', 'function', 'line_no', 'code', 'exception_classname',
 
-                  'log_message', 'log_parameters', 'secondary')
 
-     def __init__(self, filename, function, line_no, code, exception_classname, log_message, log_parameters, secondary):
 
-         self.filename = filename
 
-         self.function = function
 
-         self.line_no = line_no
 
-         self.code = code
 
-         self.exception_classname = exception_classname
 
-         self.log_message = log_message
 
-         self.log_parameters = log_parameters
 
-         self.secondary = secondary
 
-     def __eq__(self, o) -> bool:
 
-         if not isinstance(o, ErrorContext):
 
-             return False
 
-         return self.filename == o.filename and self.function == o.function and self.line_no == o.line_no\
 
-             and self.code == o.code and self.exception_classname == o.exception_classname\
 
-             and self.log_message == o.log_message and self.log_parameters == o.log_parameters \
 
-             and self.secondary == o.secondary
 
-     def __hash__(self):
 
-         return hash((self.filename, self.function, self.line_no, self.code, self.exception_classname, self.log_message,
 
-                      self.log_parameters, self.secondary))
 
-     def __repr__(self):
 
-         return "ErrorContext({!r}, {!r}, {!r}, {!r}, {!r}, {!r}) {!r}".\
 
-             format(self.filename, self.line_no, self.code, self.exception_classname, self.log_message,
 
-                    self.log_parameters, self.secondary)
 
- def add_error_context(engine_name: str, error_context: ErrorContext) -> None:
 
-     errors_for_engine = errors_per_engines.setdefault(engine_name, {})
 
-     errors_for_engine[error_context] = errors_for_engine.get(error_context, 0) + 1
 
-     logger.debug('%s: %s', engine_name, str(error_context))
 
- def get_trace(traces):
 
-     for trace in reversed(traces):
 
-         split_filename = trace.filename.split('/')
 
-         if '/'.join(split_filename[-3:-1]) == 'searx/engines':
 
-             return trace
 
-         if '/'.join(split_filename[-4:-1]) == 'searx/search/processors':
 
-             return trace
 
-     return traces[-1]
 
- def get_hostname(exc: HTTPError) -> typing.Optional[None]:
 
-     url = exc.request.url
 
-     if url is None and exc.response is not None:
 
-         url = exc.response.url
 
-     return urlparse(url).netloc
 
- def get_request_exception_messages(exc: HTTPError)\
 
-         -> typing.Tuple[typing.Optional[str], typing.Optional[str], typing.Optional[str]]:
 
-     url = None
 
-     status_code = None
 
-     reason = None
 
-     hostname = None
 
-     if hasattr(exc, 'request') and exc.request is not None:
 
-         url = exc.request.url
 
-     if url is None and hasattr(exc, 'response') and exc.respones is not None:
 
-         url = exc.response.url
 
-     if url is not None:
 
-         hostname = url.host
 
-     if isinstance(exc, HTTPStatusError):
 
-         status_code = str(exc.response.status_code)
 
-         reason = exc.response.reason_phrase
 
-     return (status_code, reason, hostname)
 
- def get_messages(exc, filename) -> typing.Tuple:
 
-     if isinstance(exc, JSONDecodeError):
 
-         return (exc.msg, )
 
-     if isinstance(exc, TypeError):
 
-         return (str(exc), )
 
-     if isinstance(exc, ValueError) and 'lxml' in filename:
 
-         return (str(exc), )
 
-     if isinstance(exc, HTTPError):
 
-         return get_request_exception_messages(exc)
 
-     if isinstance(exc, SearxXPathSyntaxException):
 
-         return (exc.xpath_str, exc.message)
 
-     if isinstance(exc, SearxEngineXPathException):
 
-         return (exc.xpath_str, exc.message)
 
-     if isinstance(exc, SearxEngineAPIException):
 
-         return (str(exc.args[0]), )
 
-     if isinstance(exc, SearxEngineAccessDeniedException):
 
-         return (exc.message, )
 
-     return ()
 
- def get_exception_classname(exc: Exception) -> str:
 
-     exc_class = exc.__class__
 
-     exc_name = exc_class.__qualname__
 
-     exc_module = exc_class.__module__
 
-     if exc_module is None or exc_module == str.__class__.__module__:
 
-         return exc_name
 
-     return exc_module + '.' + exc_name
 
- def get_error_context(framerecords, exception_classname, log_message, log_parameters, secondary) -> ErrorContext:
 
-     searx_frame = get_trace(framerecords)
 
-     filename = searx_frame.filename
 
-     if filename.startswith(searx_parent_dir):
 
-         filename = filename[len(searx_parent_dir) + 1:]
 
-     function = searx_frame.function
 
-     line_no = searx_frame.lineno
 
-     code = searx_frame.code_context[0].strip()
 
-     del framerecords
 
-     return ErrorContext(filename, function, line_no, code, exception_classname, log_message, log_parameters, secondary)
 
- def count_exception(engine_name: str, exc: Exception, secondary: bool = False) -> None:
 
-     framerecords = inspect.trace()
 
-     try:
 
-         exception_classname = get_exception_classname(exc)
 
-         log_parameters = get_messages(exc, framerecords[-1][1])
 
-         error_context = get_error_context(framerecords, exception_classname, None, log_parameters, secondary)
 
-         add_error_context(engine_name, error_context)
 
-     finally:
 
-         del framerecords
 
- def count_error(engine_name: str, log_message: str, log_parameters: typing.Optional[typing.Tuple] = None,
 
-                 secondary: bool = False) -> None:
 
-     framerecords = list(reversed(inspect.stack()[1:]))
 
-     try:
 
-         error_context = get_error_context(framerecords, None, log_message, log_parameters or (), secondary)
 
-         add_error_context(engine_name, error_context)
 
-     finally:
 
-         del framerecords
 
 
  |