json_engine.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. from collections.abc import Iterable
  3. from json import loads
  4. from urllib.parse import urlencode
  5. from searx.utils import to_string
  6. search_url = None
  7. url_query = None
  8. content_query = None
  9. title_query = None
  10. paging = False
  11. suggestion_query = ''
  12. results_query = ''
  13. # parameters for engines with paging support
  14. #
  15. # number of results on each page
  16. # (only needed if the site requires not a page number, but an offset)
  17. page_size = 1
  18. # number of the first page (usually 0 or 1)
  19. first_page_num = 1
  20. def iterate(iterable):
  21. if type(iterable) == dict:
  22. it = iterable.items()
  23. else:
  24. it = enumerate(iterable)
  25. for index, value in it:
  26. yield str(index), value
  27. def is_iterable(obj):
  28. if type(obj) == str:
  29. return False
  30. return isinstance(obj, Iterable)
  31. def parse(query):
  32. q = []
  33. for part in query.split('/'):
  34. if part == '':
  35. continue
  36. else:
  37. q.append(part)
  38. return q
  39. def do_query(data, q):
  40. ret = []
  41. if not q:
  42. return ret
  43. qkey = q[0]
  44. for key, value in iterate(data):
  45. if len(q) == 1:
  46. if key == qkey:
  47. ret.append(value)
  48. elif is_iterable(value):
  49. ret.extend(do_query(value, q))
  50. else:
  51. if not is_iterable(value):
  52. continue
  53. if key == qkey:
  54. ret.extend(do_query(value, q[1:]))
  55. else:
  56. ret.extend(do_query(value, q))
  57. return ret
  58. def query(data, query_string):
  59. q = parse(query_string)
  60. return do_query(data, q)
  61. def request(query, params):
  62. query = urlencode({'q': query})[2:]
  63. fp = {'query': query}
  64. if paging and search_url.find('{pageno}') >= 0:
  65. fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
  66. params['url'] = search_url.format(**fp)
  67. params['query'] = query
  68. return params
  69. def response(resp):
  70. results = []
  71. json = loads(resp.text)
  72. if results_query:
  73. rs = query(json, results_query)
  74. if not len(rs):
  75. return results
  76. for result in rs[0]:
  77. try:
  78. url = query(result, url_query)[0]
  79. title = query(result, title_query)[0]
  80. except:
  81. continue
  82. try:
  83. content = query(result, content_query)[0]
  84. except:
  85. content = ""
  86. results.append({
  87. 'url': to_string(url),
  88. 'title': to_string(title),
  89. 'content': to_string(content),
  90. })
  91. else:
  92. for url, title, content in zip(
  93. query(json, url_query),
  94. query(json, title_query),
  95. query(json, content_query)
  96. ):
  97. results.append({
  98. 'url': to_string(url),
  99. 'title': to_string(title),
  100. 'content': to_string(content),
  101. })
  102. if not suggestion_query:
  103. return results
  104. for suggestion in query(json, suggestion_query):
  105. results.append({'suggestion': suggestion})
  106. return results