Source code for searx.plugins.calculator

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""

from __future__ import annotations
import typing

import ast
import math
import re
import operator
import multiprocessing

import babel
import babel.numbers
from flask_babel import gettext

from searx.result_types import EngineResults
from searx.plugins import Plugin, PluginInfo

if typing.TYPE_CHECKING:
    from searx.search import SearchWithPlugins
    from searx.extended_types import SXNG_Request
    from searx.plugins import PluginCfg


[docs] class SXNGPlugin(Plugin): """Plugin converts strings to different hash digests. The results are displayed in area for the "answers". """ id = "calculator" def __init__(self, plg_cfg: "PluginCfg") -> None: super().__init__(plg_cfg) self.info = PluginInfo( id=self.id, name=gettext("Basic Calculator"), description=gettext("Calculate mathematical expressions via the search bar"), preference_section="general", )
def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int: """ 2 < 3 becomes ops=[ast.Lt] and values=[2,3] 2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4] """ for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name if isinstance(op, ast.Eq) and a == b: continue if isinstance(op, ast.NotEq) and a != b: continue if isinstance(op, ast.Lt) and a < b: continue if isinstance(op, ast.LtE) and a <= b: continue if isinstance(op, ast.Gt) and a > b: continue if isinstance(op, ast.GtE) and a >= b: continue # Ignore impossible ops: # * ast.Is # * ast.IsNot # * ast.In # * ast.NotIn # the result is False for a and b and operation op return 0 # the results for all the ops are True return 1 operators: dict[type, typing.Callable] = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Pow: operator.pow, ast.BitXor: operator.xor, ast.BitOr: operator.or_, ast.BitAnd: operator.and_, ast.USub: operator.neg, ast.RShift: operator.rshift, ast.LShift: operator.lshift, ast.Mod: operator.mod, ast.Compare: _compare, } math_constants = { 'e': math.e, 'pi': math.pi, } # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating # the old behavior "fork") but it will not solve the core problem of fork, nor # will it remove the deprecation warnings in py3.12 & py3.13. Issue is # ddiscussed here: https://github.com/searxng/searxng/issues/4159 mp_fork = multiprocessing.get_context("fork") def _eval_expr(expr): """ Evaluates the given textual expression. Returns a tuple of (numericResult, isBooleanResult). >>> _eval_expr('2^6') 64, False >>> _eval_expr('2**6') 64, False >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)') -5.0, False >>> _eval_expr('1 < 3') 1, True >>> _eval_expr('5 < 3') 0, True >>> _eval_expr('17 == 11+1+5 == 7+5+5') 1, True """ try: root_expr = ast.parse(expr, mode='eval').body return _eval(root_expr), isinstance(root_expr, ast.Compare) except ZeroDivisionError: # This is undefined return "", False def _eval(node): if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): return node.value if isinstance(node, ast.BinOp): return operators[type(node.op)](_eval(node.left), _eval(node.right)) if isinstance(node, ast.UnaryOp): return operators[type(node.op)](_eval(node.operand)) if isinstance(node, ast.Compare): return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators]) if isinstance(node, ast.Name) and node.id in math_constants: return math_constants[node.id] raise TypeError(node) def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name try: q.put(func(*args, **kwargs)) except: q.put(None) raise def timeout_func(timeout, func, *args, **kwargs): que = mp_fork.Queue() p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs) p.start() p.join(timeout=timeout) ret_val = None # pylint: disable=used-before-assignment,undefined-variable if not p.is_alive(): ret_val = que.get() else: logger.debug("terminate function after timeout is exceeded") # type: ignore p.terminate() p.join() p.close() return ret_val