calculator.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
  3. from __future__ import annotations
  4. import typing
  5. import ast
  6. import math
  7. import re
  8. import operator
  9. import multiprocessing
  10. import babel
  11. import babel.numbers
  12. from flask_babel import gettext
  13. from searx.result_types import EngineResults
  14. from searx.plugins import Plugin, PluginInfo
  15. if typing.TYPE_CHECKING:
  16. from searx.search import SearchWithPlugins
  17. from searx.extended_types import SXNG_Request
  18. from searx.plugins import PluginCfg
  19. class SXNGPlugin(Plugin):
  20. """Plugin converts strings to different hash digests. The results are
  21. displayed in area for the "answers".
  22. """
  23. id = "calculator"
  24. def __init__(self, plg_cfg: "PluginCfg") -> None:
  25. super().__init__(plg_cfg)
  26. self.info = PluginInfo(
  27. id=self.id,
  28. name=gettext("Basic Calculator"),
  29. description=gettext("Calculate mathematical expressions via the search bar"),
  30. preference_section="general",
  31. )
  32. def timeout_func(self, timeout, func, *args, **kwargs):
  33. que = mp_fork.Queue()
  34. p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
  35. p.start()
  36. p.join(timeout=timeout)
  37. ret_val = None
  38. # pylint: disable=used-before-assignment,undefined-variable
  39. if not p.is_alive():
  40. ret_val = que.get()
  41. else:
  42. self.log.debug("terminate function (%s: %s // %s) after timeout is exceeded", func.__name__, args, kwargs)
  43. p.terminate()
  44. p.join()
  45. p.close()
  46. return ret_val
  47. def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
  48. results = EngineResults()
  49. # only show the result of the expression on the first page
  50. if search.search_query.pageno > 1:
  51. return results
  52. query = search.search_query.query
  53. # in order to avoid DoS attacks with long expressions, ignore long expressions
  54. if len(query) > 100:
  55. return results
  56. # replace commonly used math operators with their proper Python operator
  57. query = query.replace("x", "*").replace(":", "/")
  58. # Is this a term that can be calculated?
  59. word, constants = "", set()
  60. for x in query:
  61. # Alphabetic characters are defined as "Letters" in the Unicode
  62. # character database and are the constants in an equation.
  63. if x.isalpha():
  64. word += x.strip()
  65. elif word:
  66. constants.add(word)
  67. word = ""
  68. # In the term of an arithmetic operation there should be no other
  69. # alphabetic characters besides the constants
  70. if constants - set(math_constants):
  71. return results
  72. # use UI language
  73. ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
  74. # parse the number system in a localized way
  75. def _decimal(match: re.Match) -> str:
  76. val = match.string[match.start() : match.end()]
  77. val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
  78. return str(val)
  79. decimal = ui_locale.number_symbols["latn"]["decimal"]
  80. group = ui_locale.number_symbols["latn"]["group"]
  81. query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
  82. # in python, powers are calculated via **
  83. query_py_formatted = query.replace("^", "**")
  84. # Prevent the runtime from being longer than 50 ms
  85. res = self.timeout_func(0.05, _eval_expr, query_py_formatted)
  86. if res is None or res[0] == "":
  87. return results
  88. res, is_boolean = res
  89. if is_boolean:
  90. res = "True" if res != 0 else "False"
  91. else:
  92. res = babel.numbers.format_decimal(res, locale=ui_locale)
  93. results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
  94. return results
  95. def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
  96. """
  97. 2 < 3 becomes ops=[ast.Lt] and values=[2,3]
  98. 2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
  99. """
  100. for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
  101. if isinstance(op, ast.Eq) and a == b:
  102. continue
  103. if isinstance(op, ast.NotEq) and a != b:
  104. continue
  105. if isinstance(op, ast.Lt) and a < b:
  106. continue
  107. if isinstance(op, ast.LtE) and a <= b:
  108. continue
  109. if isinstance(op, ast.Gt) and a > b:
  110. continue
  111. if isinstance(op, ast.GtE) and a >= b:
  112. continue
  113. # Ignore impossible ops:
  114. # * ast.Is
  115. # * ast.IsNot
  116. # * ast.In
  117. # * ast.NotIn
  118. # the result is False for a and b and operation op
  119. return 0
  120. # the results for all the ops are True
  121. return 1
  122. operators: dict[type, typing.Callable] = {
  123. ast.Add: operator.add,
  124. ast.Sub: operator.sub,
  125. ast.Mult: operator.mul,
  126. ast.Div: operator.truediv,
  127. ast.Pow: operator.pow,
  128. ast.BitXor: operator.xor,
  129. ast.BitOr: operator.or_,
  130. ast.BitAnd: operator.and_,
  131. ast.USub: operator.neg,
  132. ast.RShift: operator.rshift,
  133. ast.LShift: operator.lshift,
  134. ast.Mod: operator.mod,
  135. ast.Compare: _compare,
  136. }
  137. math_constants = {
  138. 'e': math.e,
  139. 'pi': math.pi,
  140. }
  141. # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
  142. # the old behavior "fork") but it will not solve the core problem of fork, nor
  143. # will it remove the deprecation warnings in py3.12 & py3.13. Issue is
  144. # ddiscussed here: https://github.com/searxng/searxng/issues/4159
  145. mp_fork = multiprocessing.get_context("fork")
  146. def _eval_expr(expr):
  147. """
  148. Evaluates the given textual expression.
  149. Returns a tuple of (numericResult, isBooleanResult).
  150. >>> _eval_expr('2^6')
  151. 64, False
  152. >>> _eval_expr('2**6')
  153. 64, False
  154. >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
  155. -5.0, False
  156. >>> _eval_expr('1 < 3')
  157. 1, True
  158. >>> _eval_expr('5 < 3')
  159. 0, True
  160. >>> _eval_expr('17 == 11+1+5 == 7+5+5')
  161. 1, True
  162. """
  163. try:
  164. root_expr = ast.parse(expr, mode='eval').body
  165. return _eval(root_expr), isinstance(root_expr, ast.Compare)
  166. except (SyntaxError, TypeError, ZeroDivisionError):
  167. # Expression that can't be evaluated (i.e. not a math expression)
  168. return "", False
  169. def _eval(node):
  170. if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
  171. return node.value
  172. if isinstance(node, ast.BinOp):
  173. return operators[type(node.op)](_eval(node.left), _eval(node.right))
  174. if isinstance(node, ast.UnaryOp):
  175. return operators[type(node.op)](_eval(node.operand))
  176. if isinstance(node, ast.Compare):
  177. return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
  178. if isinstance(node, ast.Name) and node.id in math_constants:
  179. return math_constants[node.id]
  180. raise TypeError(node)
  181. def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
  182. try:
  183. q.put(func(*args, **kwargs))
  184. except:
  185. q.put(None)
  186. raise