calculator.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
  3. """
  4. from __future__ import annotations
  5. from typing import Callable
  6. import ast
  7. import re
  8. import operator
  9. import multiprocessing
  10. import babel
  11. import babel.numbers
  12. from flask_babel import gettext
  13. from searx.result_types import EngineResults
  14. name = "Basic Calculator"
  15. description = gettext("Calculate mathematical expressions via the search bar")
  16. default_on = True
  17. preference_section = 'general'
  18. plugin_id = 'calculator'
  19. operators: dict[type, Callable] = {
  20. ast.Add: operator.add,
  21. ast.Sub: operator.sub,
  22. ast.Mult: operator.mul,
  23. ast.Div: operator.truediv,
  24. ast.Pow: operator.pow,
  25. ast.BitXor: operator.xor,
  26. ast.USub: operator.neg,
  27. }
  28. # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
  29. # the old behavior "fork") but it will not solve the core problem of fork, nor
  30. # will it remove the deprecation warnings in py3.12 & py3.13. Issue is
  31. # ddiscussed here: https://github.com/searxng/searxng/issues/4159
  32. mp_fork = multiprocessing.get_context("fork")
  33. def _eval_expr(expr):
  34. """
  35. >>> _eval_expr('2^6')
  36. 64
  37. >>> _eval_expr('2**6')
  38. 64
  39. >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
  40. -5.0
  41. """
  42. try:
  43. return _eval(ast.parse(expr, mode='eval').body)
  44. except ZeroDivisionError:
  45. # This is undefined
  46. return ""
  47. def _eval(node):
  48. if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
  49. return node.value
  50. if isinstance(node, ast.BinOp):
  51. return operators[type(node.op)](_eval(node.left), _eval(node.right))
  52. if isinstance(node, ast.UnaryOp):
  53. return operators[type(node.op)](_eval(node.operand))
  54. raise TypeError(node)
  55. def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
  56. try:
  57. q.put(func(*args, **kwargs))
  58. except:
  59. q.put(None)
  60. raise
  61. def timeout_func(timeout, func, *args, **kwargs):
  62. que = mp_fork.Queue()
  63. p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
  64. p.start()
  65. p.join(timeout=timeout)
  66. ret_val = None
  67. # pylint: disable=used-before-assignment,undefined-variable
  68. if not p.is_alive():
  69. ret_val = que.get()
  70. else:
  71. logger.debug("terminate function after timeout is exceeded") # type: ignore
  72. p.terminate()
  73. p.join()
  74. p.close()
  75. return ret_val
  76. def post_search(request, search) -> EngineResults:
  77. results = EngineResults()
  78. # only show the result of the expression on the first page
  79. if search.search_query.pageno > 1:
  80. return results
  81. query = search.search_query.query
  82. # in order to avoid DoS attacks with long expressions, ignore long expressions
  83. if len(query) > 100:
  84. return results
  85. # replace commonly used math operators with their proper Python operator
  86. query = query.replace("x", "*").replace(":", "/")
  87. # use UI language
  88. ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
  89. # parse the number system in a localized way
  90. def _decimal(match: re.Match) -> str:
  91. val = match.string[match.start() : match.end()]
  92. val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
  93. return str(val)
  94. decimal = ui_locale.number_symbols["latn"]["decimal"]
  95. group = ui_locale.number_symbols["latn"]["group"]
  96. query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
  97. # only numbers and math operators are accepted
  98. if any(str.isalpha(c) for c in query):
  99. return results
  100. # in python, powers are calculated via **
  101. query_py_formatted = query.replace("^", "**")
  102. # Prevent the runtime from being longer than 50 ms
  103. res = timeout_func(0.05, _eval_expr, query_py_formatted)
  104. if res is None or res == "":
  105. return results
  106. res = babel.numbers.format_decimal(res, locale=ui_locale)
  107. results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
  108. return results