json_engine.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. from collections.abc import Iterable
  3. from json import loads
  4. from urllib.parse import urlencode
  5. from searx.utils import to_string, html_to_text
  6. search_url = None
  7. url_query = None
  8. content_query = None
  9. title_query = None
  10. content_html_to_text = False
  11. title_html_to_text = False
  12. paging = False
  13. suggestion_query = ''
  14. results_query = ''
  15. cookies = {}
  16. headers = {}
  17. '''Some engines might offer different result based on cookies or headers.
  18. Possible use-case: To set safesearch cookie or header to moderate.'''
  19. # parameters for engines with paging support
  20. #
  21. # number of results on each page
  22. # (only needed if the site requires not a page number, but an offset)
  23. page_size = 1
  24. # number of the first page (usually 0 or 1)
  25. first_page_num = 1
  26. def iterate(iterable):
  27. if type(iterable) == dict:
  28. it = iterable.items()
  29. else:
  30. it = enumerate(iterable)
  31. for index, value in it:
  32. yield str(index), value
  33. def is_iterable(obj):
  34. if type(obj) == str:
  35. return False
  36. return isinstance(obj, Iterable)
  37. def parse(query):
  38. q = []
  39. for part in query.split('/'):
  40. if part == '':
  41. continue
  42. else:
  43. q.append(part)
  44. return q
  45. def do_query(data, q):
  46. ret = []
  47. if not q:
  48. return ret
  49. qkey = q[0]
  50. for key, value in iterate(data):
  51. if len(q) == 1:
  52. if key == qkey:
  53. ret.append(value)
  54. elif is_iterable(value):
  55. ret.extend(do_query(value, q))
  56. else:
  57. if not is_iterable(value):
  58. continue
  59. if key == qkey:
  60. ret.extend(do_query(value, q[1:]))
  61. else:
  62. ret.extend(do_query(value, q))
  63. return ret
  64. def query(data, query_string):
  65. q = parse(query_string)
  66. return do_query(data, q)
  67. def request(query, params):
  68. query = urlencode({'q': query})[2:]
  69. fp = {'query': query}
  70. if paging and search_url.find('{pageno}') >= 0:
  71. fp['pageno'] = (params['pageno'] - 1) * page_size + first_page_num
  72. params['cookies'].update(cookies)
  73. params['headers'].update(headers)
  74. params['url'] = search_url.format(**fp)
  75. params['query'] = query
  76. return params
  77. def identity(arg):
  78. return arg
  79. def response(resp):
  80. results = []
  81. json = loads(resp.text)
  82. title_filter = html_to_text if title_html_to_text else identity
  83. content_filter = html_to_text if content_html_to_text else identity
  84. if results_query:
  85. rs = query(json, results_query)
  86. if not len(rs):
  87. return results
  88. for result in rs[0]:
  89. try:
  90. url = query(result, url_query)[0]
  91. title = query(result, title_query)[0]
  92. except:
  93. continue
  94. try:
  95. content = query(result, content_query)[0]
  96. except:
  97. content = ""
  98. results.append(
  99. {
  100. 'url': to_string(url),
  101. 'title': title_filter(to_string(title)),
  102. 'content': content_filter(to_string(content)),
  103. }
  104. )
  105. else:
  106. for url, title, content in zip(query(json, url_query), query(json, title_query), query(json, content_query)):
  107. results.append(
  108. {
  109. 'url': to_string(url),
  110. 'title': title_filter(to_string(title)),
  111. 'content': content_filter(to_string(content)),
  112. }
  113. )
  114. if not suggestion_query:
  115. return results
  116. for suggestion in query(json, suggestion_query):
  117. results.append({'suggestion': suggestion})
  118. return results