# SPDX-License-Identifier: AGPL-3.0-or-later
"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
from __future__ import annotations
import typing
import ast
import math
import re
import operator
import multiprocessing
import babel
import babel.numbers
from flask_babel import gettext
from searx.result_types import EngineResults
from searx.plugins import Plugin, PluginInfo
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.plugins import PluginCfg
[docs]
class SXNGPlugin(Plugin):
"""Plugin converts strings to different hash digests. The results are
displayed in area for the "answers".
"""
id = "calculator"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Basic Calculator"),
description=gettext("Calculate mathematical expressions via the search bar"),
preference_section="general",
)
[docs]
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
results = EngineResults()
# only show the result of the expression on the first page
if search.search_query.pageno > 1:
return results
query = search.search_query.query
# in order to avoid DoS attacks with long expressions, ignore long expressions
if len(query) > 100:
return results
# replace commonly used math operators with their proper Python operator
query = query.replace("x", "*").replace(":", "/")
# use UI language
ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
# parse the number system in a localized way
def _decimal(match: re.Match) -> str:
val = match.string[match.start() : match.end()]
val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
return str(val)
decimal = ui_locale.number_symbols["latn"]["decimal"]
group = ui_locale.number_symbols["latn"]["group"]
query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
# in python, powers are calculated via **
query_py_formatted = query.replace("^", "**")
# Prevent the runtime from being longer than 50 ms
res = timeout_func(0.05, _eval_expr, query_py_formatted)
if res is None or res[0] == "":
return results
res, is_boolean = res
if is_boolean:
res = "True" if res != 0 else "False"
else:
res = babel.numbers.format_decimal(res, locale=ui_locale)
results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
return results
def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
"""
2 < 3 becomes ops=[ast.Lt] and values=[2,3]
2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
"""
for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
if isinstance(op, ast.Eq) and a == b:
continue
if isinstance(op, ast.NotEq) and a != b:
continue
if isinstance(op, ast.Lt) and a < b:
continue
if isinstance(op, ast.LtE) and a <= b:
continue
if isinstance(op, ast.Gt) and a > b:
continue
if isinstance(op, ast.GtE) and a >= b:
continue
# Ignore impossible ops:
# * ast.Is
# * ast.IsNot
# * ast.In
# * ast.NotIn
# the result is False for a and b and operation op
return 0
# the results for all the ops are True
return 1
operators: dict[type, typing.Callable] = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.BitXor: operator.xor,
ast.BitOr: operator.or_,
ast.BitAnd: operator.and_,
ast.USub: operator.neg,
ast.RShift: operator.rshift,
ast.LShift: operator.lshift,
ast.Mod: operator.mod,
ast.Compare: _compare,
}
math_constants = {
'e': math.e,
'pi': math.pi,
}
# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
# the old behavior "fork") but it will not solve the core problem of fork, nor
# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
# ddiscussed here: https://github.com/searxng/searxng/issues/4159
mp_fork = multiprocessing.get_context("fork")
def _eval_expr(expr):
"""
Evaluates the given textual expression.
Returns a tuple of (numericResult, isBooleanResult).
>>> _eval_expr('2^6')
64, False
>>> _eval_expr('2**6')
64, False
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
-5.0, False
>>> _eval_expr('1 < 3')
1, True
>>> _eval_expr('5 < 3')
0, True
>>> _eval_expr('17 == 11+1+5 == 7+5+5')
1, True
"""
try:
root_expr = ast.parse(expr, mode='eval').body
return _eval(root_expr), isinstance(root_expr, ast.Compare)
except ZeroDivisionError:
# This is undefined
return "", False
def _eval(node):
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.BinOp):
return operators[type(node.op)](_eval(node.left), _eval(node.right))
if isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand))
if isinstance(node, ast.Compare):
return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
if isinstance(node, ast.Name) and node.id in math_constants:
return math_constants[node.id]
raise TypeError(node)
def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
try:
q.put(func(*args, **kwargs))
except:
q.put(None)
raise
def timeout_func(timeout, func, *args, **kwargs):
que = mp_fork.Queue()
p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
p.start()
p.join(timeout=timeout)
ret_val = None
# pylint: disable=used-before-assignment,undefined-variable
if not p.is_alive():
ret_val = que.get()
else:
logger.debug("terminate function after timeout is exceeded") # type: ignore
p.terminate()
p.join()
p.close()
return ret_val