| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 | # SPDX-License-Identifier: AGPL-3.0-or-later"""Configuration class :py:class:`Config` with deep-update, schema validationand deprecated names.The :py:class:`Config` class implements a configuration that is based onstructured dictionaries.  The configuration schema is defined in a dictionarystructure and the configuration data is given in a dictionary structure."""from __future__ import annotationsfrom typing import Anyimport copyimport typingimport loggingimport pathlibtry:    import tomllib    pytomlpp = None    USE_TOMLLIB = Trueexcept ImportError:    import pytomlpp    tomllib = None    USE_TOMLLIB = False__all__ = ['Config', 'UNSET', 'SchemaIssue']log = logging.getLogger(__name__)class FALSE:    """Class of ``False`` singelton"""    # pylint: disable=multiple-statements    def __init__(self, msg):        self.msg = msg    def __bool__(self):        return False    def __str__(self):        return self.msg    __repr__ = __str__UNSET = FALSE('<UNSET>')class SchemaIssue(ValueError):    """Exception to store and/or raise a message from a schema issue."""    def __init__(self, level: typing.Literal['warn', 'invalid'], msg: str):        self.level = level        super().__init__(msg)    def __str__(self):        return f"[cfg schema {self.level}] {self.args[0]}"class Config:    """Base class used for configuration"""    UNSET = UNSET    @classmethod    def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict) -> Config:        # init schema        log.debug("load schema file: %s", schema_file)        cfg = cls(cfg_schema=toml_load(schema_file), deprecated=deprecated)        if not cfg_file.exists():            log.warning("missing config file: %s", cfg_file)            return cfg        # load configuration        log.debug("load config file: %s", cfg_file)        upd_cfg = toml_load(cfg_file)        is_valid, issue_list = cfg.validate(upd_cfg)        for msg in issue_list:            log.error(str(msg))        if not is_valid:            raise TypeError(f"schema of {cfg_file} is invalid!")        cfg.update(upd_cfg)        return cfg    def __init__(self, cfg_schema: typing.Dict, deprecated: typing.Dict[str, str]):        """Construtor of class Config.        :param cfg_schema: Schema of the configuration        :param deprecated: dictionary that maps deprecated configuration names to a messages        These values are needed for validation, see :py:obj:`validate`.        """        self.cfg_schema = cfg_schema        self.deprecated = deprecated        self.cfg = copy.deepcopy(cfg_schema)    def __getitem__(self, key: str) -> Any:        return self.get(key)    def validate(self, cfg: dict):        """Validation of dictionary ``cfg`` on :py:obj:`Config.SCHEMA`.        Validation is done by :py:obj:`validate`."""        return validate(self.cfg_schema, cfg, self.deprecated)    def update(self, upd_cfg: dict):        """Update this configuration by ``upd_cfg``."""        dict_deepupdate(self.cfg, upd_cfg)    def default(self, name: str):        """Returns default value of field ``name`` in ``self.cfg_schema``."""        return value(name, self.cfg_schema)    def get(self, name: str, default: Any = UNSET, replace: bool = True) -> Any:        """Returns the value to which ``name`` points in the configuration.        If there is no such ``name`` in the config and the ``default`` is        :py:obj:`UNSET`, a :py:obj:`KeyError` is raised.        """        parent = self._get_parent_dict(name)        val = parent.get(name.split('.')[-1], UNSET)        if val is UNSET:            if default is UNSET:                raise KeyError(name)            val = default        if replace and isinstance(val, str):            val = val % self        return val    def set(self, name: str, val):        """Set the value to which ``name`` points in the configuration.        If there is no such ``name`` in the config, a :py:obj:`KeyError` is        raised.        """        parent = self._get_parent_dict(name)        parent[name.split('.')[-1]] = val    def _get_parent_dict(self, name):        parent_name = '.'.join(name.split('.')[:-1])        if parent_name:            parent = value(parent_name, self.cfg)        else:            parent = self.cfg        if (parent is UNSET) or (not isinstance(parent, dict)):            raise KeyError(parent_name)        return parent    def path(self, name: str, default=UNSET):        """Get a :py:class:`pathlib.Path` object from a config string."""        val = self.get(name, default)        if val is UNSET:            if default is UNSET:                raise KeyError(name)            return default        return pathlib.Path(str(val))    def pyobj(self, name, default=UNSET):        """Get python object refered by full qualiffied name (FQN) in the config        string."""        fqn = self.get(name, default)        if fqn is UNSET:            if default is UNSET:                raise KeyError(name)            return default        (modulename, name) = str(fqn).rsplit('.', 1)        m = __import__(modulename, {}, {}, [name], 0)        return getattr(m, name)def toml_load(file_name):    if USE_TOMLLIB:        # Python >= 3.11        try:            with open(file_name, "rb") as f:                return tomllib.load(f)        except tomllib.TOMLDecodeError as exc:            msg = str(exc).replace('\t', '').replace('\n', ' ')            log.error("%s: %s", file_name, msg)            raise    # fallback to pytomlpp for Python < 3.11    try:        return pytomlpp.load(file_name)    except pytomlpp.DecodeError as exc:        msg = str(exc).replace('\t', '').replace('\n', ' ')        log.error("%s: %s", file_name, msg)        raise# working with dictionariesdef value(name: str, data_dict: dict):    """Returns the value to which ``name`` points in the ``dat_dict``.    .. code: python        >>> data_dict = {                "foo": {"bar": 1 },                "bar": {"foo": 2 },                "foobar": [1, 2, 3],            }        >>> value('foobar', data_dict)        [1, 2, 3]        >>> value('foo.bar', data_dict)        1        >>> value('foo.bar.xxx', data_dict)        <UNSET>    """    ret_val = data_dict    for part in name.split('.'):        if isinstance(ret_val, dict):            ret_val = ret_val.get(part, UNSET)        if ret_val is UNSET:            break    return ret_valdef validate(    schema_dict: typing.Dict, data_dict: typing.Dict, deprecated: typing.Dict[str, str]) -> typing.Tuple[bool, list]:    """Deep validation of dictionary in ``data_dict`` against dictionary in    ``schema_dict``.  Argument deprecated is a dictionary that maps deprecated    configuration names to a messages::        deprecated = {            "foo.bar" : "config 'foo.bar' is deprecated, use 'bar.foo'",            "..."     : "..."        }    The function returns a python tuple ``(is_valid, issue_list)``:    ``is_valid``:      A bool value indicating ``data_dict`` is valid or not.    ``issue_list``:      A list of messages (:py:obj:`SchemaIssue`) from the validation::          [schema warn] data_dict: deprecated 'fontlib.foo': <DEPRECATED['foo.bar']>          [schema invalid] data_dict: key unknown 'fontlib.foo'          [schema invalid] data_dict: type mismatch 'fontlib.foo': expected ..., is ...    If ``schema_dict`` or ``data_dict`` is not a dictionary type a    :py:obj:`SchemaIssue` is raised.    """    names = []    is_valid = True    issue_list = []    if not isinstance(schema_dict, dict):        raise SchemaIssue('invalid', "schema_dict is not a dict type")    if not isinstance(data_dict, dict):        raise SchemaIssue('invalid', f"data_dict issue{'.'.join(names)} is not a dict type")    is_valid, issue_list = _validate(names, issue_list, schema_dict, data_dict, deprecated)    return is_valid, issue_listdef _validate(    names: typing.List,    issue_list: typing.List,    schema_dict: typing.Dict,    data_dict: typing.Dict,    deprecated: typing.Dict[str, str],) -> typing.Tuple[bool, typing.List]:    is_valid = True    for key, data_value in data_dict.items():        names.append(key)        name = '.'.join(names)        deprecated_msg = deprecated.get(name)        # print("XXX %s: key %s //   data_value: %s" % (name, key, data_value))        if deprecated_msg:            issue_list.append(SchemaIssue('warn', f"data_dict '{name}': deprecated - {deprecated_msg}"))        schema_value = value(name, schema_dict)        # print("YYY %s: key %s // schema_value: %s" % (name, key, schema_value))        if schema_value is UNSET:            if not deprecated_msg:                issue_list.append(SchemaIssue('invalid', f"data_dict '{name}': key unknown in schema_dict"))                is_valid = False        elif type(schema_value) != type(data_value):  # pylint: disable=unidiomatic-typecheck            issue_list.append(                SchemaIssue(                    'invalid',                    (f"data_dict: type mismatch '{name}':" f" expected {type(schema_value)}, is: {type(data_value)}"),                )            )            is_valid = False        elif isinstance(data_value, dict):            _valid, _ = _validate(names, issue_list, schema_dict, data_value, deprecated)            is_valid = is_valid and _valid        names.pop()    return is_valid, issue_listdef dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):    """Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.    For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:    0. If types of ``base_dict[upd_key]`` and ``upd_val`` do not match raise a       :py:obj:`TypeError`.    1. If ``base_dict[upd_key]`` is a dict: recursively deep-update it by ``upd_val``.    2. If ``base_dict[upd_key]`` not exist: set ``base_dict[upd_key]`` from a       (deep-) copy of ``upd_val``.    3. If ``upd_val`` is a list, extend list in ``base_dict[upd_key]`` by the       list in ``upd_val``.    4. If ``upd_val`` is a set, update set in ``base_dict[upd_key]`` by set in       ``upd_val``.    """    # pylint: disable=too-many-branches    if not isinstance(base_dict, dict):        raise TypeError("argument 'base_dict' is not a ditionary type")    if not isinstance(upd_dict, dict):        raise TypeError("argument 'upd_dict' is not a ditionary type")    if names is None:        names = []    for upd_key, upd_val in upd_dict.items():        # For each upd_key & upd_val pair in upd_dict:        if isinstance(upd_val, dict):            if upd_key in base_dict:                # if base_dict[upd_key] exists, recursively deep-update it                if not isinstance(base_dict[upd_key], dict):                    raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")                dict_deepupdate(                    base_dict[upd_key],                    upd_val,                    names                    + [                        upd_key,                    ],                )            else:                # if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val                base_dict[upd_key] = copy.deepcopy(upd_val)        elif isinstance(upd_val, list):            if upd_key in base_dict:                # if base_dict[upd_key] exists, base_dict[up_key] is extended by                # the list from upd_val                if not isinstance(base_dict[upd_key], list):                    raise TypeError(f"type mismatch {'.'.join(names)}: is not a list type in base_dict")                base_dict[upd_key].extend(upd_val)            else:                # if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the                # list in upd_val.                base_dict[upd_key] = copy.deepcopy(upd_val)        elif isinstance(upd_val, set):            if upd_key in base_dict:                # if base_dict[upd_key] exists, base_dict[up_key] is updated by the set in upd_val                if not isinstance(base_dict[upd_key], set):                    raise TypeError(f"type mismatch {'.'.join(names)}: is not a set type in base_dict")                base_dict[upd_key].update(upd_val.copy())            else:                # if base_dict[upd_key] doesn't exists, set base_dict[upd_key] from a copy of the                # set in upd_val                base_dict[upd_key] = upd_val.copy()        else:            # for any other type of upd_val replace or add base_dict[upd_key] by a copy            # of upd_val            base_dict[upd_key] = copy.copy(upd_val)
 |