results.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=missing-module-docstring, missing-class-docstring
  3. from __future__ import annotations
  4. import warnings
  5. from collections import defaultdict
  6. from threading import RLock
  7. from typing import List, NamedTuple, Set
  8. from searx import logger as log
  9. import searx.engines
  10. from searx.metrics import histogram_observe, counter_add
  11. from searx.result_types import Result, LegacyResult, MainResult
  12. from searx.result_types.answer import AnswerSet, BaseAnswer
  13. def calculate_score(result, priority) -> float:
  14. weight = 1.0
  15. for result_engine in result['engines']:
  16. if hasattr(searx.engines.engines.get(result_engine), 'weight'):
  17. weight *= float(searx.engines.engines[result_engine].weight)
  18. weight *= len(result['positions'])
  19. score = 0
  20. for position in result['positions']:
  21. if priority == 'low':
  22. continue
  23. if priority == 'high':
  24. score += weight
  25. else:
  26. score += weight / position
  27. return score
  28. class Timing(NamedTuple):
  29. engine: str
  30. total: float
  31. load: float
  32. class UnresponsiveEngine(NamedTuple):
  33. engine: str
  34. error_type: str
  35. suspended: bool
  36. class ResultContainer:
  37. """In the result container, the results are collected, sorted and duplicates
  38. will be merged."""
  39. # pylint: disable=too-many-statements
  40. main_results_map: dict[int, MainResult | LegacyResult]
  41. infoboxes: list[LegacyResult]
  42. suggestions: set[str]
  43. answers: AnswerSet
  44. corrections: set[str]
  45. def __init__(self):
  46. self.main_results_map = {}
  47. self.infoboxes = []
  48. self.suggestions = set()
  49. self.answers = AnswerSet()
  50. self.corrections = set()
  51. self._number_of_results: list[int] = []
  52. self.engine_data: dict[str, dict[str, str]] = defaultdict(dict)
  53. self._closed: bool = False
  54. self.paging: bool = False
  55. self.unresponsive_engines: Set[UnresponsiveEngine] = set()
  56. self.timings: List[Timing] = []
  57. self.redirect_url: str | None = None
  58. self.on_result = lambda _: True
  59. self._lock = RLock()
  60. self._main_results_sorted: list[MainResult | LegacyResult] = None # type: ignore
  61. def extend(self, engine_name: str | None, results): # pylint: disable=too-many-branches
  62. if self._closed:
  63. log.debug("container is closed, ignoring results: %s", results)
  64. return
  65. main_count = 0
  66. for result in list(results):
  67. if isinstance(result, Result):
  68. result.engine = result.engine or engine_name
  69. result.normalize_result_fields()
  70. if not self.on_result(result):
  71. continue
  72. if isinstance(result, BaseAnswer):
  73. self.answers.add(result)
  74. elif isinstance(result, MainResult):
  75. main_count += 1
  76. self._merge_main_result(result, main_count)
  77. else:
  78. # more types need to be implemented in the future ..
  79. raise NotImplementedError(f"no handler implemented to process the result of type {result}")
  80. else:
  81. result["engine"] = result.get("engine") or engine_name or ""
  82. result = LegacyResult(result) # for backward compatibility, will be romeved one day
  83. result.normalize_result_fields()
  84. if "suggestion" in result:
  85. if self.on_result(result):
  86. self.suggestions.add(result["suggestion"])
  87. continue
  88. if "answer" in result:
  89. if self.on_result(result):
  90. warnings.warn(
  91. f"answer results from engine {result.engine}"
  92. " are without typification / migrate to Answer class.",
  93. DeprecationWarning,
  94. )
  95. self.answers.add(result) # type: ignore
  96. continue
  97. if "correction" in result:
  98. if self.on_result(result):
  99. self.corrections.add(result["correction"])
  100. continue
  101. if "infobox" in result:
  102. if self.on_result(result):
  103. self._merge_infobox(result)
  104. continue
  105. if "number_of_results" in result:
  106. if self.on_result(result):
  107. self._number_of_results.append(result["number_of_results"])
  108. continue
  109. if "engine_data" in result:
  110. if self.on_result(result):
  111. if result.engine:
  112. self.engine_data[result.engine][result["key"]] = result["engine_data"]
  113. continue
  114. if self.on_result(result):
  115. main_count += 1
  116. self._merge_main_result(result, main_count)
  117. continue
  118. if engine_name in searx.engines.engines:
  119. eng = searx.engines.engines[engine_name]
  120. histogram_observe(main_count, "engine", eng.name, "result", "count")
  121. if not self.paging and eng.paging:
  122. self.paging = True
  123. def _merge_infobox(self, new_infobox: LegacyResult):
  124. add_infobox = True
  125. new_id = getattr(new_infobox, "id", None)
  126. if new_id is not None:
  127. with self._lock:
  128. for existing_infobox in self.infoboxes:
  129. if new_id == getattr(existing_infobox, "id", None):
  130. merge_two_infoboxes(existing_infobox, new_infobox)
  131. add_infobox = False
  132. if add_infobox:
  133. self.infoboxes.append(new_infobox)
  134. def _merge_main_result(self, result: MainResult | LegacyResult, position):
  135. result_hash = hash(result)
  136. with self._lock:
  137. merged = self.main_results_map.get(result_hash)
  138. if not merged:
  139. # if there is no duplicate in the merged results, append result
  140. result.positions = [position]
  141. self.main_results_map[result_hash] = result
  142. return
  143. merge_two_main_results(merged, result)
  144. # add the new position
  145. merged.positions.append(position)
  146. def close(self):
  147. self._closed = True
  148. for result in self.main_results_map.values():
  149. result.score = calculate_score(result, result.priority)
  150. for eng_name in result.engines:
  151. counter_add(result.score, 'engine', eng_name, 'score')
  152. def get_ordered_results(self) -> list[MainResult | LegacyResult]:
  153. """Returns a sorted list of results to be displayed in the main result
  154. area (:ref:`result types`)."""
  155. if not self._closed:
  156. self.close()
  157. if self._main_results_sorted:
  158. return self._main_results_sorted
  159. # first pass, sort results by "score" (descanding)
  160. results = sorted(self.main_results_map.values(), key=lambda x: x.score, reverse=True)
  161. # pass 2 : group results by category and template
  162. gresults = []
  163. categoryPositions = {}
  164. max_count = 8
  165. max_distance = 20
  166. for res in results:
  167. # do we need to handle more than one category per engine?
  168. engine = searx.engines.engines.get(res.engine or "")
  169. if engine:
  170. res.category = engine.categories[0] if len(engine.categories) > 0 else ""
  171. # do we need to handle more than one category per engine?
  172. category = f"{res.category}:{res.template}:{'img_src' if (res.thumbnail or res.img_src) else ''}"
  173. grp = categoryPositions.get(category)
  174. # group with previous results using the same category, if the group
  175. # can accept more result and is not too far from the current
  176. # position
  177. if (grp is not None) and (grp["count"] > 0) and (len(gresults) - grp["index"] < max_distance):
  178. # group with the previous results using the same category with
  179. # this one
  180. index = grp["index"]
  181. gresults.insert(index, res)
  182. # update every index after the current one (including the
  183. # current one)
  184. for item in categoryPositions.values():
  185. v = item["index"]
  186. if v >= index:
  187. item["index"] = v + 1
  188. # update this category
  189. grp["count"] -= 1
  190. else:
  191. gresults.append(res)
  192. # update categoryIndex
  193. categoryPositions[category] = {"index": len(gresults), "count": max_count}
  194. continue
  195. self._main_results_sorted = gresults
  196. return self._main_results_sorted
  197. @property
  198. def number_of_results(self) -> int:
  199. """Returns the average of results number, returns zero if the average
  200. result number is smaller than the actual result count."""
  201. if not self._closed:
  202. log.error("call to ResultContainer.number_of_results before ResultContainer.close")
  203. return 0
  204. with self._lock:
  205. resultnum_sum = sum(self._number_of_results)
  206. if not resultnum_sum or not self._number_of_results:
  207. return 0
  208. average = int(resultnum_sum / len(self._number_of_results))
  209. if average < len(self.get_ordered_results()):
  210. average = 0
  211. return average
  212. def add_unresponsive_engine(self, engine_name: str, error_type: str, suspended: bool = False):
  213. with self._lock:
  214. if self._closed:
  215. log.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")
  216. return
  217. if searx.engines.engines[engine_name].display_error_messages:
  218. self.unresponsive_engines.add(UnresponsiveEngine(engine_name, error_type, suspended))
  219. def add_timing(self, engine_name: str, engine_time: float, page_load_time: float):
  220. with self._lock:
  221. if self._closed:
  222. log.error("call to ResultContainer.add_timing after ResultContainer.close")
  223. return
  224. self.timings.append(Timing(engine_name, total=engine_time, load=page_load_time))
  225. def get_timings(self):
  226. with self._lock:
  227. if not self._closed:
  228. log.error("call to ResultContainer.get_timings before ResultContainer.close")
  229. return []
  230. return self.timings
  231. def merge_two_infoboxes(origin: LegacyResult, other: LegacyResult):
  232. """Merges the values from ``other`` into ``origin``."""
  233. # pylint: disable=too-many-branches
  234. weight1 = getattr(searx.engines.engines[origin.engine], "weight", 1)
  235. weight2 = getattr(searx.engines.engines[other.engine], "weight", 1)
  236. if weight2 > weight1:
  237. origin.engine = other.engine
  238. origin.engines |= other.engines
  239. if other.urls:
  240. url_items = origin.get("urls", [])
  241. for url2 in other.urls:
  242. unique_url = True
  243. entity_url2 = url2.get("entity")
  244. for url1 in origin.get("urls", []):
  245. if (entity_url2 is not None and entity_url2 == url1.get("entity")) or (
  246. url1.get("url") == url2.get("url")
  247. ):
  248. unique_url = False
  249. break
  250. if unique_url:
  251. url_items.append(url2)
  252. origin.urls = url_items
  253. if other.img_src:
  254. if not origin.img_src:
  255. origin.img_src = other.img_src
  256. elif weight2 > weight1:
  257. origin.img_src = other.img_src
  258. if other.attributes:
  259. if not origin.attributes:
  260. origin.attributes = other.attributes
  261. else:
  262. attr_names_1 = set()
  263. for attr in origin.attributes:
  264. label = attr.get("label")
  265. if label:
  266. attr_names_1.add(label)
  267. entity = attr.get("entity")
  268. if entity:
  269. attr_names_1.add(entity)
  270. for attr in other.attributes:
  271. if attr.get("label") not in attr_names_1 and attr.get('entity') not in attr_names_1:
  272. origin.attributes.append(attr)
  273. if other.content:
  274. if not origin.content:
  275. origin.content = other.content
  276. elif len(other.content) > len(origin.content):
  277. origin.content = other.content
  278. def merge_two_main_results(origin: MainResult | LegacyResult, other: MainResult | LegacyResult):
  279. """Merges the values from ``other`` into ``origin``."""
  280. if len(other.content) > len(origin.content):
  281. # use content with more text
  282. origin.content = other.content
  283. # use title with more text
  284. if len(other.title) > len(origin.title):
  285. origin.title = other.title
  286. # merge all result's parameters not found in origin
  287. if isinstance(other, MainResult) and isinstance(origin, MainResult):
  288. origin.defaults_from(other)
  289. elif isinstance(other, LegacyResult) and isinstance(origin, LegacyResult):
  290. origin.defaults_from(other)
  291. # add engine to list of result-engines
  292. origin.engines.add(other.engine or "")
  293. # use https, ftps, .. if possible
  294. if origin.parsed_url and not origin.parsed_url.scheme.endswith("s"):
  295. if other.parsed_url and other.parsed_url.scheme.endswith("s"):
  296. origin.parsed_url = origin.parsed_url._replace(scheme=other.parsed_url.scheme)
  297. origin.url = origin.parsed_url.geturl()