| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 | 
							- # SPDX-License-Identifier: AGPL-3.0-or-later
 
- """Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval").
 
- """
 
- from __future__ import annotations
 
- import typing
 
- import ast
 
- import re
 
- import operator
 
- import multiprocessing
 
- import babel
 
- import babel.numbers
 
- from flask_babel import gettext
 
- from searx.result_types import EngineResults
 
- from searx.plugins import Plugin, PluginInfo
 
- if typing.TYPE_CHECKING:
 
-     from searx.search import SearchWithPlugins
 
-     from searx.extended_types import SXNG_Request
 
-     from searx.plugins import PluginCfg
 
- class SXNGPlugin(Plugin):
 
-     """Plugin converts strings to different hash digests.  The results are
 
-     displayed in area for the "answers".
 
-     """
 
-     id = "calculator"
 
-     def __init__(self, plg_cfg: "PluginCfg") -> None:
 
-         super().__init__(plg_cfg)
 
-         self.info = PluginInfo(
 
-             id=self.id,
 
-             name=gettext("Basic Calculator"),
 
-             description=gettext("Calculate mathematical expressions via the search bar"),
 
-             preference_section="general",
 
-         )
 
-     def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
 
-         results = EngineResults()
 
-         # only show the result of the expression on the first page
 
-         if search.search_query.pageno > 1:
 
-             return results
 
-         query = search.search_query.query
 
-         # in order to avoid DoS attacks with long expressions, ignore long expressions
 
-         if len(query) > 100:
 
-             return results
 
-         # replace commonly used math operators with their proper Python operator
 
-         query = query.replace("x", "*").replace(":", "/")
 
-         # use UI language
 
-         ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
 
-         # parse the number system in a localized way
 
-         def _decimal(match: re.Match) -> str:
 
-             val = match.string[match.start() : match.end()]
 
-             val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
 
-             return str(val)
 
-         decimal = ui_locale.number_symbols["latn"]["decimal"]
 
-         group = ui_locale.number_symbols["latn"]["group"]
 
-         query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
 
-         # only numbers and math operators are accepted
 
-         if any(str.isalpha(c) for c in query):
 
-             return results
 
-         # in python, powers are calculated via **
 
-         query_py_formatted = query.replace("^", "**")
 
-         # Prevent the runtime from being longer than 50 ms
 
-         res = timeout_func(0.05, _eval_expr, query_py_formatted)
 
-         if res is None or res == "":
 
-             return results
 
-         res = babel.numbers.format_decimal(res, locale=ui_locale)
 
-         results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
 
-         return results
 
- operators: dict[type, typing.Callable] = {
 
-     ast.Add: operator.add,
 
-     ast.Sub: operator.sub,
 
-     ast.Mult: operator.mul,
 
-     ast.Div: operator.truediv,
 
-     ast.Pow: operator.pow,
 
-     ast.BitXor: operator.xor,
 
-     ast.USub: operator.neg,
 
- }
 
- # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
 
- # the old behavior "fork") but it will not solve the core problem of fork, nor
 
- # will it remove the deprecation warnings in py3.12 & py3.13.  Issue is
 
- # ddiscussed here: https://github.com/searxng/searxng/issues/4159
 
- mp_fork = multiprocessing.get_context("fork")
 
- def _eval_expr(expr):
 
-     """
 
-     >>> _eval_expr('2^6')
 
-     64
 
-     >>> _eval_expr('2**6')
 
-     64
 
-     >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
 
-     -5.0
 
-     """
 
-     try:
 
-         return _eval(ast.parse(expr, mode='eval').body)
 
-     except ZeroDivisionError:
 
-         # This is undefined
 
-         return ""
 
- def _eval(node):
 
-     if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
 
-         return node.value
 
-     if isinstance(node, ast.BinOp):
 
-         return operators[type(node.op)](_eval(node.left), _eval(node.right))
 
-     if isinstance(node, ast.UnaryOp):
 
-         return operators[type(node.op)](_eval(node.operand))
 
-     raise TypeError(node)
 
- def handler(q: multiprocessing.Queue, func, args, **kwargs):  # pylint:disable=invalid-name
 
-     try:
 
-         q.put(func(*args, **kwargs))
 
-     except:
 
-         q.put(None)
 
-         raise
 
- def timeout_func(timeout, func, *args, **kwargs):
 
-     que = mp_fork.Queue()
 
-     p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
 
-     p.start()
 
-     p.join(timeout=timeout)
 
-     ret_val = None
 
-     # pylint: disable=used-before-assignment,undefined-variable
 
-     if not p.is_alive():
 
-         ret_val = que.get()
 
-     else:
 
-         logger.debug("terminate function after timeout is exceeded")  # type: ignore
 
-         p.terminate()
 
-     p.join()
 
-     p.close()
 
-     return ret_val
 
 
  |