calculator.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """Calculate mathematical expressions using ack#eval
  3. """
  4. import ast
  5. import operator
  6. from multiprocessing import Process, Queue
  7. from flask_babel import gettext
  8. from searx.plugins import logger
  9. name = "Basic Calculator"
  10. description = gettext("Calculate mathematical expressions via the search bar")
  11. default_on = True
  12. preference_section = 'general'
  13. plugin_id = 'calculator'
  14. logger = logger.getChild(plugin_id)
  15. operators = {
  16. ast.Add: operator.add,
  17. ast.Sub: operator.sub,
  18. ast.Mult: operator.mul,
  19. ast.Div: operator.truediv,
  20. ast.Pow: operator.pow,
  21. ast.BitXor: operator.xor,
  22. ast.USub: operator.neg,
  23. }
  24. def _eval_expr(expr):
  25. """
  26. >>> _eval_expr('2^6')
  27. 4
  28. >>> _eval_expr('2**6')
  29. 64
  30. >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
  31. -5.0
  32. """
  33. return _eval(ast.parse(expr, mode='eval').body)
  34. def _eval(node):
  35. if isinstance(node, ast.Constant) and isinstance(node.value, int):
  36. return node.value
  37. if isinstance(node, ast.BinOp):
  38. return operators[type(node.op)](_eval(node.left), _eval(node.right))
  39. if isinstance(node, ast.UnaryOp):
  40. return operators[type(node.op)](_eval(node.operand))
  41. raise TypeError(node)
  42. def timeout_func(timeout, func, *args, **kwargs):
  43. def handler(q: Queue, func, args, **kwargs): # pylint:disable=invalid-name
  44. try:
  45. q.put(func(*args, **kwargs))
  46. except:
  47. q.put(None)
  48. raise
  49. que = Queue()
  50. p = Process(target=handler, args=(que, func, args), kwargs=kwargs)
  51. p.start()
  52. p.join(timeout=timeout)
  53. ret_val = None
  54. if not p.is_alive():
  55. ret_val = que.get()
  56. else:
  57. logger.debug("terminate function after timeout is exceeded")
  58. p.terminate()
  59. p.join()
  60. p.close()
  61. return ret_val
  62. def post_search(_request, search):
  63. # only show the result of the expression on the first page
  64. if search.search_query.pageno > 1:
  65. return True
  66. query = search.search_query.query
  67. # in order to avoid DoS attacks with long expressions, ignore long expressions
  68. if len(query) > 100:
  69. return True
  70. # replace commonly used math operators with their proper Python operator
  71. query = query.replace("x", "*").replace(":", "/")
  72. # only numbers and math operators are accepted
  73. if any(str.isalpha(c) for c in query):
  74. return True
  75. # in python, powers are calculated via **
  76. query_py_formatted = query.replace("^", "**")
  77. # Prevent the runtime from being longer than 50 ms
  78. result = timeout_func(0.05, _eval_expr, query_py_formatted)
  79. if result is None:
  80. return True
  81. result = str(result)
  82. if result != query:
  83. search.result_container.answers['calculate'] = {'answer': f"{query} = {result}"}
  84. return True