external_bang.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. from searx.data import EXTERNAL_BANGS
  3. LEAF_KEY = chr(16)
  4. def get_node(external_bangs_db, bang):
  5. node = external_bangs_db['trie']
  6. after = ''
  7. before = ''
  8. for bang_letter in bang:
  9. after += bang_letter
  10. if after in node and isinstance(node, dict):
  11. node = node[after]
  12. before += after
  13. after = ''
  14. return node, before, after
  15. def get_bang_definition_and_ac(external_bangs_db, bang):
  16. node, before, after = get_node(external_bangs_db, bang)
  17. bang_definition = None
  18. bang_ac_list = []
  19. if after != '':
  20. for k in node:
  21. if k.startswith(after):
  22. bang_ac_list.append(before + k)
  23. elif isinstance(node, dict):
  24. bang_definition = node.get(LEAF_KEY)
  25. bang_ac_list = [before + k for k in node.keys() if k != LEAF_KEY]
  26. elif isinstance(node, str):
  27. bang_definition = node
  28. bang_ac_list = []
  29. return bang_definition, bang_ac_list
  30. def resolve_bang_definition(bang_definition, query):
  31. url, rank = bang_definition.split(chr(1))
  32. url = url.replace(chr(2), query)
  33. if url.startswith('//'):
  34. url = 'https:' + url
  35. rank = int(rank) if len(rank) > 0 else 0
  36. return (url, rank)
  37. def get_bang_definition_and_autocomplete(bang, external_bangs_db=None):
  38. if external_bangs_db is None:
  39. external_bangs_db = EXTERNAL_BANGS
  40. bang_definition, bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang)
  41. new_autocomplete = []
  42. current = [*bang_ac_list]
  43. done = set()
  44. while len(current) > 0:
  45. bang_ac = current.pop(0)
  46. done.add(bang_ac)
  47. current_bang_definition, current_bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang_ac)
  48. if current_bang_definition:
  49. _, order = resolve_bang_definition(current_bang_definition, '')
  50. new_autocomplete.append((bang_ac, order))
  51. for new_bang in current_bang_ac_list:
  52. if new_bang not in done and new_bang not in current:
  53. current.append(new_bang)
  54. new_autocomplete.sort(key=lambda t: (-t[1], t[0]))
  55. new_autocomplete = list(map(lambda t: t[0], new_autocomplete))
  56. return bang_definition, new_autocomplete
  57. def get_bang_url(search_query, external_bangs_db=None):
  58. """
  59. Redirects if the user supplied a correct bang search.
  60. :param search_query: This is a search_query object which contains preferences and the submitted queries.
  61. :return: None if the bang was invalid, else a string of the redirect url.
  62. """
  63. ret_val = None
  64. if external_bangs_db is None:
  65. external_bangs_db = EXTERNAL_BANGS
  66. if search_query.external_bang:
  67. bang_definition, _ = get_bang_definition_and_ac(external_bangs_db, search_query.external_bang)
  68. if bang_definition and isinstance(bang_definition, str):
  69. ret_val = resolve_bang_definition(bang_definition, search_query.query)[0]
  70. return ret_val