calculator.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
  3. from __future__ import annotations
  4. import typing
  5. import ast
  6. import re
  7. import operator
  8. import multiprocessing
  9. import babel
  10. import babel.numbers
  11. from flask_babel import gettext
  12. from searx.result_types import EngineResults
  13. from searx.plugins import Plugin, PluginInfo
  14. if typing.TYPE_CHECKING:
  15. from searx.search import SearchWithPlugins
  16. from searx.extended_types import SXNG_Request
  17. from searx.plugins import PluginCfg
  18. class SXNGPlugin(Plugin):
  19. """Plugin converts strings to different hash digests. The results are
  20. displayed in area for the "answers".
  21. """
  22. id = "calculator"
  23. def __init__(self, plg_cfg: "PluginCfg") -> None:
  24. super().__init__(plg_cfg)
  25. self.info = PluginInfo(
  26. id=self.id,
  27. name=gettext("Basic Calculator"),
  28. description=gettext("Calculate mathematical expressions via the search bar"),
  29. preference_section="general",
  30. )
  31. def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
  32. results = EngineResults()
  33. # only show the result of the expression on the first page
  34. if search.search_query.pageno > 1:
  35. return results
  36. query = search.search_query.query
  37. # in order to avoid DoS attacks with long expressions, ignore long expressions
  38. if len(query) > 100:
  39. return results
  40. # replace commonly used math operators with their proper Python operator
  41. query = query.replace("x", "*").replace(":", "/")
  42. # use UI language
  43. ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
  44. # parse the number system in a localized way
  45. def _decimal(match: re.Match) -> str:
  46. val = match.string[match.start() : match.end()]
  47. val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
  48. return str(val)
  49. decimal = ui_locale.number_symbols["latn"]["decimal"]
  50. group = ui_locale.number_symbols["latn"]["group"]
  51. query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
  52. # only numbers and math operators are accepted
  53. if any(str.isalpha(c) for c in query):
  54. return results
  55. # in python, powers are calculated via **
  56. query_py_formatted = query.replace("^", "**")
  57. # Prevent the runtime from being longer than 50 ms
  58. res = timeout_func(0.05, _eval_expr, query_py_formatted)
  59. if res is None or res == "":
  60. return results
  61. res = babel.numbers.format_decimal(res, locale=ui_locale)
  62. results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
  63. return results
  64. operators: dict[type, typing.Callable] = {
  65. ast.Add: operator.add,
  66. ast.Sub: operator.sub,
  67. ast.Mult: operator.mul,
  68. ast.Div: operator.truediv,
  69. ast.Pow: operator.pow,
  70. ast.BitXor: operator.xor,
  71. ast.BitOr: operator.or_,
  72. ast.BitAnd: operator.and_,
  73. ast.USub: operator.neg,
  74. ast.RShift: operator.rshift,
  75. ast.LShift: operator.lshift,
  76. ast.Mod: operator.mod,
  77. }
  78. # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
  79. # the old behavior "fork") but it will not solve the core problem of fork, nor
  80. # will it remove the deprecation warnings in py3.12 & py3.13. Issue is
  81. # ddiscussed here: https://github.com/searxng/searxng/issues/4159
  82. mp_fork = multiprocessing.get_context("fork")
  83. def _eval_expr(expr):
  84. """
  85. >>> _eval_expr('2^6')
  86. 64
  87. >>> _eval_expr('2**6')
  88. 64
  89. >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
  90. -5.0
  91. """
  92. try:
  93. return _eval(ast.parse(expr, mode='eval').body)
  94. except ZeroDivisionError:
  95. # This is undefined
  96. return ""
  97. def _eval(node):
  98. if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
  99. return node.value
  100. if isinstance(node, ast.BinOp):
  101. return operators[type(node.op)](_eval(node.left), _eval(node.right))
  102. if isinstance(node, ast.UnaryOp):
  103. return operators[type(node.op)](_eval(node.operand))
  104. raise TypeError(node)
  105. def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
  106. try:
  107. q.put(func(*args, **kwargs))
  108. except:
  109. q.put(None)
  110. raise
  111. def timeout_func(timeout, func, *args, **kwargs):
  112. que = mp_fork.Queue()
  113. p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
  114. p.start()
  115. p.join(timeout=timeout)
  116. ret_val = None
  117. # pylint: disable=used-before-assignment,undefined-variable
  118. if not p.is_alive():
  119. ret_val = que.get()
  120. else:
  121. logger.debug("terminate function after timeout is exceeded") # type: ignore
  122. p.terminate()
  123. p.join()
  124. p.close()
  125. return ret_val