123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- # SPDX-License-Identifier: AGPL-3.0-or-later
- """Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
- """
- from __future__ import annotations
- from typing import Callable
- import ast
- import re
- import operator
- import multiprocessing
- import babel
- import babel.numbers
- from flask_babel import gettext
- from searx.result_types import EngineResults
- name = "Basic Calculator"
- description = gettext("Calculate mathematical expressions via the search bar")
- default_on = True
- preference_section = 'general'
- plugin_id = 'calculator'
- operators: dict[type, Callable] = {
- ast.Add: operator.add,
- ast.Sub: operator.sub,
- ast.Mult: operator.mul,
- ast.Div: operator.truediv,
- ast.Pow: operator.pow,
- ast.BitXor: operator.xor,
- ast.USub: operator.neg,
- }
- # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
- # the old behavior "fork") but it will not solve the core problem of fork, nor
- # will it remove the deprecation warnings in py3.12 & py3.13. Issue is
- # ddiscussed here: https://github.com/searxng/searxng/issues/4159
- mp_fork = multiprocessing.get_context("fork")
- def _eval_expr(expr):
- """
- >>> _eval_expr('2^6')
- 64
- >>> _eval_expr('2**6')
- 64
- >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
- -5.0
- """
- try:
- return _eval(ast.parse(expr, mode='eval').body)
- except ZeroDivisionError:
- # This is undefined
- return ""
- def _eval(node):
- if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
- return node.value
- if isinstance(node, ast.BinOp):
- return operators[type(node.op)](_eval(node.left), _eval(node.right))
- if isinstance(node, ast.UnaryOp):
- return operators[type(node.op)](_eval(node.operand))
- raise TypeError(node)
- def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
- try:
- q.put(func(*args, **kwargs))
- except:
- q.put(None)
- raise
- def timeout_func(timeout, func, *args, **kwargs):
- que = mp_fork.Queue()
- p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
- p.start()
- p.join(timeout=timeout)
- ret_val = None
- # pylint: disable=used-before-assignment,undefined-variable
- if not p.is_alive():
- ret_val = que.get()
- else:
- logger.debug("terminate function after timeout is exceeded") # type: ignore
- p.terminate()
- p.join()
- p.close()
- return ret_val
- def post_search(request, search) -> EngineResults:
- results = EngineResults()
- # only show the result of the expression on the first page
- if search.search_query.pageno > 1:
- return results
- query = search.search_query.query
- # in order to avoid DoS attacks with long expressions, ignore long expressions
- if len(query) > 100:
- return results
- # replace commonly used math operators with their proper Python operator
- query = query.replace("x", "*").replace(":", "/")
- # use UI language
- ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
- # parse the number system in a localized way
- def _decimal(match: re.Match) -> str:
- val = match.string[match.start() : match.end()]
- val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
- return str(val)
- decimal = ui_locale.number_symbols["latn"]["decimal"]
- group = ui_locale.number_symbols["latn"]["group"]
- query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
- # only numbers and math operators are accepted
- if any(str.isalpha(c) for c in query):
- return results
- # in python, powers are calculated via **
- query_py_formatted = query.replace("^", "**")
- # Prevent the runtime from being longer than 50 ms
- res = timeout_func(0.05, _eval_expr, query_py_formatted)
- if res is None or res == "":
- return results
- res = babel.numbers.format_decimal(res, locale=ui_locale)
- results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
- return results
|