| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 | 
							- # SPDX-License-Identifier: AGPL-3.0-or-later
 
- # lint: pylint
 
- # pylint: disable=missing-module-docstring, missing-class-docstring
 
- import sys
 
- from hashlib import sha256
 
- from importlib import import_module
 
- from os import listdir, makedirs, remove, stat, utime
 
- from os.path import abspath, basename, dirname, exists, join
 
- from shutil import copyfile
 
- from pkgutil import iter_modules
 
- from logging import getLogger
 
- from searx import logger, settings
 
- logger = logger.getChild("plugins")
 
- required_attrs = (
 
-     ("name", str),
 
-     ("description", str),
 
-     ("default_on", bool)
 
- )
 
- optional_attrs = (
 
-     ("js_dependencies", tuple),
 
-     ("css_dependencies", tuple),
 
-     ("preference_section", str),
 
- )
 
- def sha_sum(filename):
 
-     with open(filename, "rb") as f:
 
-         file_content_bytes = f.read()
 
-         return sha256(file_content_bytes).hexdigest()
 
- def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
 
-     dep_path = join(base_path, resource_path)
 
-     file_name = basename(dep_path)
 
-     resource_path = join(target_dir, file_name)
 
-     if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
 
-         try:
 
-             copyfile(dep_path, resource_path)
 
-             # copy atime_ns and mtime_ns, so the weak ETags (generated by
 
-             # the HTTP server) do not change
 
-             dep_stat = stat(dep_path)
 
-             utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
 
-         except IOError:
 
-             logger.critical(
 
-                 "failed to copy plugin resource {0} for plugin {1}".format(
 
-                     file_name, name
 
-                 )
 
-             )
 
-             sys.exit(3)
 
-     # returning with the web path of the resource
 
-     return join("plugins/external_plugins", plugin_dir, file_name)
 
- def prepare_package_resources(plugin, plugin_module_name):
 
-     plugin_base_path = dirname(abspath(plugin.__file__))
 
-     plugin_dir = plugin_module_name
 
-     target_dir = join(
 
-         settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir
 
-     )
 
-     try:
 
-         makedirs(target_dir, exist_ok=True)
 
-     except IOError:
 
-         logger.critical(
 
-             "failed to create resource directory {0} for plugin {1}".format(
 
-                 target_dir, plugin_module_name
 
-             )
 
-         )
 
-         sys.exit(3)
 
-     resources = []
 
-     if hasattr(plugin, "js_dependencies"):
 
-         resources.extend(map(basename, plugin.js_dependencies))
 
-         plugin.js_dependencies = ([
 
-             sync_resource(
 
-                 plugin_base_path, x, plugin_module_name, target_dir, plugin_dir
 
-             ) for x in plugin.js_dependencies
 
-             ])
 
-     if hasattr(plugin, "css_dependencies"):
 
-         resources.extend(map(basename, plugin.css_dependencies))
 
-         plugin.css_dependencies = ([
 
-             sync_resource(
 
-                 plugin_base_path, x, plugin_module_name, target_dir, plugin_dir
 
-             ) for x in plugin.css_dependencies
 
-             ])
 
-     for f in listdir(target_dir):
 
-         if basename(f) not in resources:
 
-             resource_path = join(target_dir, basename(f))
 
-             try:
 
-                 remove(resource_path)
 
-             except IOError:
 
-                 logger.critical(
 
-                     "failed to remove unused resource file {0} for plugin {1}".format(
 
-                         resource_path, plugin_module_name
 
-                     )
 
-                 )
 
-                 sys.exit(3)
 
- def load_plugin(plugin_module_name, external):
 
-     # pylint: disable=too-many-branches
 
-     try:
 
-         plugin = import_module(plugin_module_name)
 
-     except (
 
-         SyntaxError,
 
-         KeyboardInterrupt,
 
-         SystemExit,
 
-         SystemError,
 
-         ImportError,
 
-         RuntimeError,
 
-     ) as e:
 
-         logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
 
-         sys.exit(3)
 
-     except BaseException:
 
-         logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
 
-         return None
 
-     # difference with searx: use module name instead of the user name
 
-     plugin.id = plugin_module_name
 
-     #
 
-     plugin.logger = getLogger(plugin_module_name)
 
-     for plugin_attr, plugin_attr_type in required_attrs:
 
-         if not hasattr(plugin, plugin_attr):
 
-             logger.critical(
 
-                 '%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr
 
-             )
 
-             sys.exit(3)
 
-         attr = getattr(plugin, plugin_attr)
 
-         if not isinstance(attr, plugin_attr_type):
 
-             type_attr = str(type(attr))
 
-             logger.critical(
 
-                 '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
 
-                     plugin, plugin_attr, type_attr, plugin_attr_type
 
-                 )
 
-             )
 
-             sys.exit(3)
 
-     for plugin_attr, plugin_attr_type in optional_attrs:
 
-         if not hasattr(plugin, plugin_attr) or not isinstance(
 
-             getattr(plugin, plugin_attr), plugin_attr_type
 
-         ):
 
-             setattr(plugin, plugin_attr, plugin_attr_type())
 
-     if not hasattr(plugin, "preference_section"):
 
-         plugin.preference_section = "general"
 
-     # query plugin
 
-     if plugin.preference_section == "query":
 
-         for plugin_attr in ("query_keywords", "query_examples"):
 
-             if not hasattr(plugin, plugin_attr):
 
-                 logger.critical(
 
-                     'missing attribute "{0}", cannot load plugin: {1}'.format(
 
-                         plugin_attr, plugin
 
-                     )
 
-                 )
 
-                 sys.exit(3)
 
-     if settings.get("enabled_plugins"):
 
-         # searx compatibility: plugin.name in settings['enabled_plugins']
 
-         plugin.default_on = (
 
-             plugin.name in settings["enabled_plugins"]
 
-             or plugin.id in settings["enabled_plugins"]
 
-         )
 
-     # copy ressources if this is an external plugin
 
-     if external:
 
-         prepare_package_resources(plugin, plugin_module_name)
 
-     logger.debug("%s: loaded", plugin_module_name)
 
-     return plugin
 
- def load_and_initialize_plugin(plugin_module_name, external, init_args):
 
-     plugin = load_plugin(plugin_module_name, external)
 
-     if plugin and hasattr(plugin, 'init'):
 
-         try:
 
-             return plugin if plugin.init(*init_args) else None
 
-         except Exception:  # pylint: disable=broad-except
 
-             plugin.logger.exception(
 
-                 "Exception while calling init, the plugin is disabled"
 
-             )
 
-             return None
 
-     return plugin
 
- class PluginStore:
 
-     def __init__(self):
 
-         self.plugins = []
 
-     def __iter__(self):
 
-         for plugin in self.plugins:
 
-             yield plugin
 
-     def register(self, plugin):
 
-         self.plugins.append(plugin)
 
-     def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
 
-         # pylint: disable=no-self-use
 
-         ret = True
 
-         for plugin in ordered_plugin_list:
 
-             if hasattr(plugin, plugin_type):
 
-                 try:
 
-                     ret = getattr(plugin, plugin_type)(*args, **kwargs)
 
-                     if not ret:
 
-                         break
 
-                 except Exception:  # pylint: disable=broad-except
 
-                     plugin.logger.exception("Exception while calling %s", plugin_type)
 
-         return ret
 
- plugins = PluginStore()
 
- def plugin_module_names():
 
-     yield_plugins = set()
 
-     # embedded plugins
 
-     for module in iter_modules(path=[dirname(__file__)]):
 
-         yield (__name__ + "." + module.name, False)
 
-         yield_plugins.add(module.name)
 
-     # external plugins
 
-     for module_name in settings['plugins']:
 
-         if module_name not in yield_plugins:
 
-             yield (module_name, True)
 
-             yield_plugins.add(module_name)
 
- def initialize(app):
 
-     for module_name, external in plugin_module_names():
 
-         plugin = load_and_initialize_plugin(module_name, external, (app, settings))
 
-         if plugin:
 
-             plugins.register(plugin)
 
 
  |