123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- # SPDX-License-Identifier: AGPL-3.0-or-later
- # lint: pylint
- # pylint: disable=missing-module-docstring, missing-class-docstring
- import sys
- from hashlib import sha256
- from importlib import import_module
- from os import listdir, makedirs, remove, stat, utime
- from os.path import abspath, basename, dirname, exists, join
- from shutil import copyfile
- from pkgutil import iter_modules
- from logging import getLogger
- from typing import List, Tuple
- from searx import logger, settings
- class Plugin: # pylint: disable=too-few-public-methods
- """This class is currently never initialized and only used for type hinting."""
- id: str
- name: str
- description: str
- default_on: bool
- js_dependencies: Tuple[str]
- css_dependencies: Tuple[str]
- preference_section: str
- logger = logger.getChild("plugins")
- required_attrs = (
- # fmt: off
- ("name", str),
- ("description", str),
- ("default_on", bool)
- # fmt: on
- )
- optional_attrs = (
- # fmt: off
- ("js_dependencies", tuple),
- ("css_dependencies", tuple),
- ("preference_section", str),
- # fmt: on
- )
- def sha_sum(filename):
- with open(filename, "rb") as f:
- file_content_bytes = f.read()
- return sha256(file_content_bytes).hexdigest()
- def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
- dep_path = join(base_path, resource_path)
- file_name = basename(dep_path)
- resource_path = join(target_dir, file_name)
- if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
- try:
- copyfile(dep_path, resource_path)
- # copy atime_ns and mtime_ns, so the weak ETags (generated by
- # the HTTP server) do not change
- dep_stat = stat(dep_path)
- utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
- except IOError:
- logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name))
- sys.exit(3)
- # returning with the web path of the resource
- return join("plugins/external_plugins", plugin_dir, file_name)
- def prepare_package_resources(plugin, plugin_module_name):
- plugin_base_path = dirname(abspath(plugin.__file__))
- plugin_dir = plugin_module_name
- target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir)
- try:
- makedirs(target_dir, exist_ok=True)
- except IOError:
- logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name))
- sys.exit(3)
- resources = []
- if hasattr(plugin, "js_dependencies"):
- resources.extend(map(basename, plugin.js_dependencies))
- plugin.js_dependencies = [
- sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
- for x in plugin.js_dependencies
- ]
- if hasattr(plugin, "css_dependencies"):
- resources.extend(map(basename, plugin.css_dependencies))
- plugin.css_dependencies = [
- sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
- for x in plugin.css_dependencies
- ]
- for f in listdir(target_dir):
- if basename(f) not in resources:
- resource_path = join(target_dir, basename(f))
- try:
- remove(resource_path)
- except IOError:
- logger.critical(
- "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name)
- )
- sys.exit(3)
- def load_plugin(plugin_module_name, external):
- # pylint: disable=too-many-branches
- try:
- plugin = import_module(plugin_module_name)
- except (
- SyntaxError,
- KeyboardInterrupt,
- SystemExit,
- SystemError,
- ImportError,
- RuntimeError,
- ) as e:
- logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
- sys.exit(3)
- except BaseException:
- logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
- return None
- # difference with searx: use module name instead of the user name
- plugin.id = plugin_module_name
- #
- plugin.logger = getLogger(plugin_module_name)
- for plugin_attr, plugin_attr_type in required_attrs:
- if not hasattr(plugin, plugin_attr):
- logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr)
- sys.exit(3)
- attr = getattr(plugin, plugin_attr)
- if not isinstance(attr, plugin_attr_type):
- type_attr = str(type(attr))
- logger.critical(
- '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
- plugin, plugin_attr, type_attr, plugin_attr_type
- )
- )
- sys.exit(3)
- for plugin_attr, plugin_attr_type in optional_attrs:
- if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type):
- setattr(plugin, plugin_attr, plugin_attr_type())
- if not hasattr(plugin, "preference_section"):
- plugin.preference_section = "general"
- # query plugin
- if plugin.preference_section == "query":
- for plugin_attr in ("query_keywords", "query_examples"):
- if not hasattr(plugin, plugin_attr):
- logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin))
- sys.exit(3)
- if settings.get("enabled_plugins"):
- # searx compatibility: plugin.name in settings['enabled_plugins']
- plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"]
- # copy ressources if this is an external plugin
- if external:
- prepare_package_resources(plugin, plugin_module_name)
- logger.debug("%s: loaded", plugin_module_name)
- return plugin
- def load_and_initialize_plugin(plugin_module_name, external, init_args):
- plugin = load_plugin(plugin_module_name, external)
- if plugin and hasattr(plugin, 'init'):
- try:
- return plugin if plugin.init(*init_args) else None
- except Exception: # pylint: disable=broad-except
- plugin.logger.exception("Exception while calling init, the plugin is disabled")
- return None
- return plugin
- class PluginStore:
- def __init__(self):
- self.plugins: List[Plugin] = []
- def __iter__(self):
- for plugin in self.plugins:
- yield plugin
- def register(self, plugin):
- self.plugins.append(plugin)
- def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
- # pylint: disable=no-self-use
- ret = True
- for plugin in ordered_plugin_list:
- if hasattr(plugin, plugin_type):
- try:
- ret = getattr(plugin, plugin_type)(*args, **kwargs)
- if not ret:
- break
- except Exception: # pylint: disable=broad-except
- plugin.logger.exception("Exception while calling %s", plugin_type)
- return ret
- plugins = PluginStore()
- def plugin_module_names():
- yield_plugins = set()
- # embedded plugins
- for module in iter_modules(path=[dirname(__file__)]):
- yield (__name__ + "." + module.name, False)
- yield_plugins.add(module.name)
- # external plugins
- for module_name in settings['plugins']:
- if module_name not in yield_plugins:
- yield (module_name, True)
- yield_plugins.add(module_name)
- def initialize(app):
- for module_name, external in plugin_module_names():
- plugin = load_and_initialize_plugin(module_name, external, (app, settings))
- if plugin:
- plugins.register(plugin)
|