hostnames.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=missing-module-docstring, too-many-branches
  3. import re
  4. from urllib.parse import urlunparse, urlparse
  5. from flask_babel import gettext
  6. from searx import settings
  7. from searx.plugins import logger
  8. from searx.settings_loader import get_yaml_file
  9. name = gettext('Hostnames plugin')
  10. description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname')
  11. default_on = False
  12. preference_section = 'general'
  13. plugin_id = 'hostnames'
  14. logger = logger.getChild(plugin_id)
  15. parsed = 'parsed_url'
  16. _url_fields = ['iframe_src', 'audio_src']
  17. def _load_regular_expressions(settings_key):
  18. setting_value = settings.get(plugin_id, {}).get(settings_key)
  19. if not setting_value:
  20. return {}
  21. # load external file with configuration
  22. if isinstance(setting_value, str):
  23. setting_value = get_yaml_file(setting_value)
  24. if isinstance(setting_value, list):
  25. return {re.compile(r) for r in setting_value}
  26. if isinstance(setting_value, dict):
  27. return {re.compile(p): r for (p, r) in setting_value.items()}
  28. return {}
  29. replacements = _load_regular_expressions('replace')
  30. removables = _load_regular_expressions('remove')
  31. high_priority = _load_regular_expressions('high_priority')
  32. low_priority = _load_regular_expressions('low_priority')
  33. def _matches_parsed_url(result, pattern):
  34. return parsed in result and pattern.search(result[parsed].netloc)
  35. def on_result(_request, _search, result):
  36. for pattern, replacement in replacements.items():
  37. if _matches_parsed_url(result, pattern):
  38. logger.debug(result['url'])
  39. result[parsed] = result[parsed]._replace(netloc=pattern.sub(replacement, result[parsed].netloc))
  40. result['url'] = urlunparse(result[parsed])
  41. logger.debug(result['url'])
  42. for url_field in _url_fields:
  43. if not result.get(url_field):
  44. continue
  45. url_src = urlparse(result[url_field])
  46. if pattern.search(url_src.netloc):
  47. url_src = url_src._replace(netloc=pattern.sub(replacement, url_src.netloc))
  48. result[url_field] = urlunparse(url_src)
  49. for pattern in removables:
  50. if _matches_parsed_url(result, pattern):
  51. return False
  52. for url_field in _url_fields:
  53. if not result.get(url_field):
  54. continue
  55. url_src = urlparse(result[url_field])
  56. if pattern.search(url_src.netloc):
  57. del result[url_field]
  58. for pattern in low_priority:
  59. if _matches_parsed_url(result, pattern):
  60. result['priority'] = 'low'
  61. for pattern in high_priority:
  62. if _matches_parsed_url(result, pattern):
  63. result['priority'] = 'high'
  64. return True