| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 | # SPDX-License-Identifier: AGPL-3.0-or-later# lint: pylint# pylint: disable=missing-module-docstring, missing-class-docstringimport sysfrom hashlib import sha256from importlib import import_modulefrom os import listdir, makedirs, remove, stat, utimefrom os.path import abspath, basename, dirname, exists, joinfrom shutil import copyfilefrom pkgutil import iter_modulesfrom logging import getLoggerfrom searx import logger, settingslogger = logger.getChild("plugins")required_attrs = (    ("name", str),    ("description", str),    ("default_on", bool))optional_attrs = (    ("js_dependencies", tuple),    ("css_dependencies", tuple),    ("preference_section", str),)def sha_sum(filename):    with open(filename, "rb") as f:        file_content_bytes = f.read()        return sha256(file_content_bytes).hexdigest()def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):    dep_path = join(base_path, resource_path)    file_name = basename(dep_path)    resource_path = join(target_dir, file_name)    if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):        try:            copyfile(dep_path, resource_path)            # copy atime_ns and mtime_ns, so the weak ETags (generated by            # the HTTP server) do not change            dep_stat = stat(dep_path)            utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))        except IOError:            logger.critical(                "failed to copy plugin resource {0} for plugin {1}".format(                    file_name, name                )            )            sys.exit(3)    # returning with the web path of the resource    return join("plugins/external_plugins", plugin_dir, file_name)def prepare_package_resources(plugin, plugin_module_name):    plugin_base_path = dirname(abspath(plugin.__file__))    plugin_dir = plugin_module_name    target_dir = join(        settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir    )    try:        makedirs(target_dir, exist_ok=True)    except IOError:        logger.critical(            "failed to create resource directory {0} for plugin {1}".format(                target_dir, plugin_module_name            )        )        sys.exit(3)    resources = []    if hasattr(plugin, "js_dependencies"):        resources.extend(map(basename, plugin.js_dependencies))        plugin.js_dependencies = ([            sync_resource(                plugin_base_path, x, plugin_module_name, target_dir, plugin_dir            ) for x in plugin.js_dependencies            ])    if hasattr(plugin, "css_dependencies"):        resources.extend(map(basename, plugin.css_dependencies))        plugin.css_dependencies = ([            sync_resource(                plugin_base_path, x, plugin_module_name, target_dir, plugin_dir            ) for x in plugin.css_dependencies            ])    for f in listdir(target_dir):        if basename(f) not in resources:            resource_path = join(target_dir, basename(f))            try:                remove(resource_path)            except IOError:                logger.critical(                    "failed to remove unused resource file {0} for plugin {1}".format(                        resource_path, plugin_module_name                    )                )                sys.exit(3)def load_plugin(plugin_module_name, external):    # pylint: disable=too-many-branches    try:        plugin = import_module(plugin_module_name)    except (        SyntaxError,        KeyboardInterrupt,        SystemExit,        SystemError,        ImportError,        RuntimeError,    ) as e:        logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)        sys.exit(3)    except BaseException:        logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)        return None    # difference with searx: use module name instead of the user name    plugin.id = plugin_module_name    #    plugin.logger = getLogger(plugin_module_name)    for plugin_attr, plugin_attr_type in required_attrs:        if not hasattr(plugin, plugin_attr):            logger.critical(                '%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr            )            sys.exit(3)        attr = getattr(plugin, plugin_attr)        if not isinstance(attr, plugin_attr_type):            type_attr = str(type(attr))            logger.critical(                '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(                    plugin, plugin_attr, type_attr, plugin_attr_type                )            )            sys.exit(3)    for plugin_attr, plugin_attr_type in optional_attrs:        if not hasattr(plugin, plugin_attr) or not isinstance(            getattr(plugin, plugin_attr), plugin_attr_type        ):            setattr(plugin, plugin_attr, plugin_attr_type())    if not hasattr(plugin, "preference_section"):        plugin.preference_section = "general"    # query plugin    if plugin.preference_section == "query":        for plugin_attr in ("query_keywords", "query_examples"):            if not hasattr(plugin, plugin_attr):                logger.critical(                    'missing attribute "{0}", cannot load plugin: {1}'.format(                        plugin_attr, plugin                    )                )                sys.exit(3)    if settings.get("enabled_plugins"):        # searx compatibility: plugin.name in settings['enabled_plugins']        plugin.default_on = (            plugin.name in settings["enabled_plugins"]            or plugin.id in settings["enabled_plugins"]        )    # copy ressources if this is an external plugin    if external:        prepare_package_resources(plugin, plugin_module_name)    logger.debug("%s: loaded", plugin_module_name)    return plugindef load_and_initialize_plugin(plugin_module_name, external, init_args):    plugin = load_plugin(plugin_module_name, external)    if plugin and hasattr(plugin, 'init'):        try:            return plugin if plugin.init(*init_args) else None        except Exception:  # pylint: disable=broad-except            plugin.logger.exception(                "Exception while calling init, the plugin is disabled"            )            return None    return pluginclass PluginStore:    def __init__(self):        self.plugins = []    def __iter__(self):        for plugin in self.plugins:            yield plugin    def register(self, plugin):        self.plugins.append(plugin)    def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):        # pylint: disable=no-self-use        ret = True        for plugin in ordered_plugin_list:            if hasattr(plugin, plugin_type):                try:                    ret = getattr(plugin, plugin_type)(*args, **kwargs)                    if not ret:                        break                except Exception:  # pylint: disable=broad-except                    plugin.logger.exception("Exception while calling %s", plugin_type)        return retplugins = PluginStore()def plugin_module_names():    yield_plugins = set()    # embedded plugins    for module in iter_modules(path=[dirname(__file__)]):        yield (__name__ + "." + module.name, False)        yield_plugins.add(module.name)    # external plugins    for module_name in settings['plugins']:        if module_name not in yield_plugins:            yield (module_name, True)            yield_plugins.add(module_name)def initialize(app):    for module_name, external in plugin_module_names():        plugin = load_and_initialize_plugin(module_name, external, (app, settings))        if plugin:            plugins.register(plugin)
 |