config.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. """Configuration class :py:class:`Config` with deep-update, schema validation
  3. and deprecated names.
  4. The :py:class:`Config` class implements a configuration that is based on
  5. structured dictionaries. The configuration schema is defined in a dictionary
  6. structure and the configuration data is given in a dictionary structure.
  7. """
  8. from __future__ import annotations
  9. from typing import Any
  10. import copy
  11. import typing
  12. import logging
  13. import pathlib
  14. try:
  15. import tomllib
  16. pytomlpp = None
  17. USE_TOMLLIB = True
  18. except ImportError:
  19. import pytomlpp
  20. tomllib = None
  21. USE_TOMLLIB = False
  22. __all__ = ['Config', 'UNSET', 'SchemaIssue']
  23. log = logging.getLogger(__name__)
  24. class FALSE:
  25. """Class of ``False`` singelton"""
  26. # pylint: disable=multiple-statements
  27. def __init__(self, msg):
  28. self.msg = msg
  29. def __bool__(self):
  30. return False
  31. def __str__(self):
  32. return self.msg
  33. __repr__ = __str__
  34. UNSET = FALSE('<UNSET>')
  35. class SchemaIssue(ValueError):
  36. """Exception to store and/or raise a message from a schema issue."""
  37. def __init__(self, level: typing.Literal['warn', 'invalid'], msg: str):
  38. self.level = level
  39. super().__init__(msg)
  40. def __str__(self):
  41. return f"[cfg schema {self.level}] {self.args[0]}"
  42. class Config:
  43. """Base class used for configuration"""
  44. UNSET = UNSET
  45. @classmethod
  46. def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict) -> Config:
  47. # init schema
  48. log.debug("load schema file: %s", schema_file)
  49. cfg = cls(cfg_schema=toml_load(schema_file), deprecated=deprecated)
  50. if not cfg_file.exists():
  51. log.warning("missing config file: %s", cfg_file)
  52. return cfg
  53. # load configuration
  54. log.debug("load config file: %s", cfg_file)
  55. upd_cfg = toml_load(cfg_file)
  56. is_valid, issue_list = cfg.validate(upd_cfg)
  57. for msg in issue_list:
  58. log.error(str(msg))
  59. if not is_valid:
  60. raise TypeError(f"schema of {cfg_file} is invalid!")
  61. cfg.update(upd_cfg)
  62. return cfg
  63. def __init__(self, cfg_schema: typing.Dict, deprecated: typing.Dict[str, str]):
  64. """Construtor of class Config.
  65. :param cfg_schema: Schema of the configuration
  66. :param deprecated: dictionary that maps deprecated configuration names to a messages
  67. These values are needed for validation, see :py:obj:`validate`.
  68. """
  69. self.cfg_schema = cfg_schema
  70. self.deprecated = deprecated
  71. self.cfg = copy.deepcopy(cfg_schema)
  72. def __getitem__(self, key: str) -> Any:
  73. return self.get(key)
  74. def validate(self, cfg: dict):
  75. """Validation of dictionary ``cfg`` on :py:obj:`Config.SCHEMA`.
  76. Validation is done by :py:obj:`validate`."""
  77. return validate(self.cfg_schema, cfg, self.deprecated)
  78. def update(self, upd_cfg: dict):
  79. """Update this configuration by ``upd_cfg``."""
  80. dict_deepupdate(self.cfg, upd_cfg)
  81. def default(self, name: str):
  82. """Returns default value of field ``name`` in ``self.cfg_schema``."""
  83. return value(name, self.cfg_schema)
  84. def get(self, name: str, default: Any = UNSET, replace: bool = True) -> Any:
  85. """Returns the value to which ``name`` points in the configuration.
  86. If there is no such ``name`` in the config and the ``default`` is
  87. :py:obj:`UNSET`, a :py:obj:`KeyError` is raised.
  88. """
  89. parent = self._get_parent_dict(name)
  90. val = parent.get(name.split('.')[-1], UNSET)
  91. if val is UNSET:
  92. if default is UNSET:
  93. raise KeyError(name)
  94. val = default
  95. if replace and isinstance(val, str):
  96. val = val % self
  97. return val
  98. def set(self, name: str, val):
  99. """Set the value to which ``name`` points in the configuration.
  100. If there is no such ``name`` in the config, a :py:obj:`KeyError` is
  101. raised.
  102. """
  103. parent = self._get_parent_dict(name)
  104. parent[name.split('.')[-1]] = val
  105. def _get_parent_dict(self, name):
  106. parent_name = '.'.join(name.split('.')[:-1])
  107. if parent_name:
  108. parent = value(parent_name, self.cfg)
  109. else:
  110. parent = self.cfg
  111. if (parent is UNSET) or (not isinstance(parent, dict)):
  112. raise KeyError(parent_name)
  113. return parent
  114. def path(self, name: str, default=UNSET):
  115. """Get a :py:class:`pathlib.Path` object from a config string."""
  116. val = self.get(name, default)
  117. if val is UNSET:
  118. if default is UNSET:
  119. raise KeyError(name)
  120. return default
  121. return pathlib.Path(str(val))
  122. def pyobj(self, name, default=UNSET):
  123. """Get python object refered by full qualiffied name (FQN) in the config
  124. string."""
  125. fqn = self.get(name, default)
  126. if fqn is UNSET:
  127. if default is UNSET:
  128. raise KeyError(name)
  129. return default
  130. (modulename, name) = str(fqn).rsplit('.', 1)
  131. m = __import__(modulename, {}, {}, [name], 0)
  132. return getattr(m, name)
  133. def toml_load(file_name):
  134. if USE_TOMLLIB:
  135. # Python >= 3.11
  136. try:
  137. with open(file_name, "rb") as f:
  138. return tomllib.load(f)
  139. except tomllib.TOMLDecodeError as exc:
  140. msg = str(exc).replace('\t', '').replace('\n', ' ')
  141. log.error("%s: %s", file_name, msg)
  142. raise
  143. # fallback to pytomlpp for Python < 3.11
  144. try:
  145. return pytomlpp.load(file_name)
  146. except pytomlpp.DecodeError as exc:
  147. msg = str(exc).replace('\t', '').replace('\n', ' ')
  148. log.error("%s: %s", file_name, msg)
  149. raise
  150. # working with dictionaries
  151. def value(name: str, data_dict: dict):
  152. """Returns the value to which ``name`` points in the ``dat_dict``.
  153. .. code: python
  154. >>> data_dict = {
  155. "foo": {"bar": 1 },
  156. "bar": {"foo": 2 },
  157. "foobar": [1, 2, 3],
  158. }
  159. >>> value('foobar', data_dict)
  160. [1, 2, 3]
  161. >>> value('foo.bar', data_dict)
  162. 1
  163. >>> value('foo.bar.xxx', data_dict)
  164. <UNSET>
  165. """
  166. ret_val = data_dict
  167. for part in name.split('.'):
  168. if isinstance(ret_val, dict):
  169. ret_val = ret_val.get(part, UNSET)
  170. if ret_val is UNSET:
  171. break
  172. return ret_val
  173. def validate(
  174. schema_dict: typing.Dict, data_dict: typing.Dict, deprecated: typing.Dict[str, str]
  175. ) -> typing.Tuple[bool, list]:
  176. """Deep validation of dictionary in ``data_dict`` against dictionary in
  177. ``schema_dict``. Argument deprecated is a dictionary that maps deprecated
  178. configuration names to a messages::
  179. deprecated = {
  180. "foo.bar" : "config 'foo.bar' is deprecated, use 'bar.foo'",
  181. "..." : "..."
  182. }
  183. The function returns a python tuple ``(is_valid, issue_list)``:
  184. ``is_valid``:
  185. A bool value indicating ``data_dict`` is valid or not.
  186. ``issue_list``:
  187. A list of messages (:py:obj:`SchemaIssue`) from the validation::
  188. [schema warn] data_dict: deprecated 'fontlib.foo': <DEPRECATED['foo.bar']>
  189. [schema invalid] data_dict: key unknown 'fontlib.foo'
  190. [schema invalid] data_dict: type mismatch 'fontlib.foo': expected ..., is ...
  191. If ``schema_dict`` or ``data_dict`` is not a dictionary type a
  192. :py:obj:`SchemaIssue` is raised.
  193. """
  194. names = []
  195. is_valid = True
  196. issue_list = []
  197. if not isinstance(schema_dict, dict):
  198. raise SchemaIssue('invalid', "schema_dict is not a dict type")
  199. if not isinstance(data_dict, dict):
  200. raise SchemaIssue('invalid', f"data_dict issue{'.'.join(names)} is not a dict type")
  201. is_valid, issue_list = _validate(names, issue_list, schema_dict, data_dict, deprecated)
  202. return is_valid, issue_list
  203. def _validate(
  204. names: typing.List,
  205. issue_list: typing.List,
  206. schema_dict: typing.Dict,
  207. data_dict: typing.Dict,
  208. deprecated: typing.Dict[str, str],
  209. ) -> typing.Tuple[bool, typing.List]:
  210. is_valid = True
  211. for key, data_value in data_dict.items():
  212. names.append(key)
  213. name = '.'.join(names)
  214. deprecated_msg = deprecated.get(name)
  215. # print("XXX %s: key %s // data_value: %s" % (name, key, data_value))
  216. if deprecated_msg:
  217. issue_list.append(SchemaIssue('warn', f"data_dict '{name}': deprecated - {deprecated_msg}"))
  218. schema_value = value(name, schema_dict)
  219. # print("YYY %s: key %s // schema_value: %s" % (name, key, schema_value))
  220. if schema_value is UNSET:
  221. if not deprecated_msg:
  222. issue_list.append(SchemaIssue('invalid', f"data_dict '{name}': key unknown in schema_dict"))
  223. is_valid = False
  224. elif type(schema_value) != type(data_value): # pylint: disable=unidiomatic-typecheck
  225. issue_list.append(
  226. SchemaIssue(
  227. 'invalid',
  228. (f"data_dict: type mismatch '{name}':" f" expected {type(schema_value)}, is: {type(data_value)}"),
  229. )
  230. )
  231. is_valid = False
  232. elif isinstance(data_value, dict):
  233. _valid, _ = _validate(names, issue_list, schema_dict, data_value, deprecated)
  234. is_valid = is_valid and _valid
  235. names.pop()
  236. return is_valid, issue_list
  237. def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
  238. """Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
  239. For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
  240. 0. If types of ``base_dict[upd_key]`` and ``upd_val`` do not match raise a
  241. :py:obj:`TypeError`.
  242. 1. If ``base_dict[upd_key]`` is a dict: recursively deep-update it by ``upd_val``.
  243. 2. If ``base_dict[upd_key]`` not exist: set ``base_dict[upd_key]`` from a
  244. (deep-) copy of ``upd_val``.
  245. 3. If ``upd_val`` is a list, extend list in ``base_dict[upd_key]`` by the
  246. list in ``upd_val``.
  247. 4. If ``upd_val`` is a set, update set in ``base_dict[upd_key]`` by set in
  248. ``upd_val``.
  249. """
  250. # pylint: disable=too-many-branches
  251. if not isinstance(base_dict, dict):
  252. raise TypeError("argument 'base_dict' is not a ditionary type")
  253. if not isinstance(upd_dict, dict):
  254. raise TypeError("argument 'upd_dict' is not a ditionary type")
  255. if names is None:
  256. names = []
  257. for upd_key, upd_val in upd_dict.items():
  258. # For each upd_key & upd_val pair in upd_dict:
  259. if isinstance(upd_val, dict):
  260. if upd_key in base_dict:
  261. # if base_dict[upd_key] exists, recursively deep-update it
  262. if not isinstance(base_dict[upd_key], dict):
  263. raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
  264. dict_deepupdate(
  265. base_dict[upd_key],
  266. upd_val,
  267. names
  268. + [
  269. upd_key,
  270. ],
  271. )
  272. else:
  273. # if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
  274. base_dict[upd_key] = copy.deepcopy(upd_val)
  275. elif isinstance(upd_val, list):
  276. if upd_key in base_dict:
  277. # if base_dict[upd_key] exists, base_dict[up_key] is extended by
  278. # the list from upd_val
  279. if not isinstance(base_dict[upd_key], list):
  280. raise TypeError(f"type mismatch {'.'.join(names)}: is not a list type in base_dict")
  281. base_dict[upd_key].extend(upd_val)
  282. else:
  283. # if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
  284. # list in upd_val.
  285. base_dict[upd_key] = copy.deepcopy(upd_val)
  286. elif isinstance(upd_val, set):
  287. if upd_key in base_dict:
  288. # if base_dict[upd_key] exists, base_dict[up_key] is updated by the set in upd_val
  289. if not isinstance(base_dict[upd_key], set):
  290. raise TypeError(f"type mismatch {'.'.join(names)}: is not a set type in base_dict")
  291. base_dict[upd_key].update(upd_val.copy())
  292. else:
  293. # if base_dict[upd_key] doesn't exists, set base_dict[upd_key] from a copy of the
  294. # set in upd_val
  295. base_dict[upd_key] = upd_val.copy()
  296. else:
  297. # for any other type of upd_val replace or add base_dict[upd_key] by a copy
  298. # of upd_val
  299. base_dict[upd_key] = copy.copy(upd_val)