Commit d1f09fce authored by Jerome Flesch's avatar Jerome Flesch

Core: add plugin 'config_file'

Signed-off-by: Jerome Flesch's avatarJerome Flesch <jflesch@openpaper.work>
parent b19e388d
......@@ -55,7 +55,8 @@ class Core(object):
self.explicits = []
self.plugins = {}
self._to_initialize = [] # because initialization order matters
self._to_initialize = set()
self._initialized = set() # avoid double-init
self.interfaces = collections.defaultdict(list)
self.callbacks = collections.defaultdict(list)
......@@ -74,6 +75,9 @@ class Core(object):
by the user (used to track which modules must possibly be saved
in a configuration file)
"""
if module_name in self.plugins:
return
LOGGER.info(
"Loading plugin '%s' (explicit=%b) ...",
module_name, explicit
......@@ -101,7 +105,7 @@ class Core(object):
if explicit:
self.explicits.append(module_name)
self._to_initialize.append(plugin)
self._to_initialize.add(plugin)
LOGGER.info("Plugin '%s' loaded", module_name)
return plugin
......@@ -148,27 +152,27 @@ class Core(object):
to_examine.append(self.load(dep_default))
assert(len(self.interfaces[dep_interface]) > 0)
def _init(self, plugin, initialized=set()):
if plugin in initialized:
def _init(self, plugin):
if plugin in self._initialized:
return
deps = plugin.get_deps()
if 'plugins' in deps:
for dep_plugin in deps['plugins']:
assert(dep_plugin in self.plugins)
self._init(self.plugins[dep_plugin], initialized)
self._init(self.plugins[dep_plugin])
if 'interfaces' in deps:
for (dep_interface, _) in deps['interfaces']:
dep_plugins = self.interfaces[dep_interface]
assert(len(dep_plugins) > 0)
for dep_plugin in dep_plugins:
self._init(dep_plugin, initialized)
self._init(dep_plugin)
LOGGER.info("Initializing plugin '%s' ...", type(plugin))
plugin.init()
initialized.add(plugin)
self._initialized.add(plugin)
def init(self):
"""
......@@ -178,12 +182,12 @@ class Core(object):
BEWARE of dependency loops !
"""
LOGGER.info("Initializing core")
LOGGER.info("Initializing all plugins")
self._load_deps()
for plugin in self._to_initialize:
self._init(plugin)
self._to_initialize = []
LOGGER.info("Core initialized")
self._to_initialize = set()
LOGGER.info("All plugins initialized")
def get(self, module_name):
"""
......
"""
Manages a configuration file using configparser.
"""
import configparser
import logging
import os
import os.path
from . import PluginBase
LOGGER = logging.getLogger(__name__)
class ConfigList(object):
SEPARATOR = ", "
def __init__(self, string=None):
self.elements = []
if string is not None:
elements = string.split(self.SEPARATOR, 1)
for i in elements:
(t, v) = i.split(_TYPE_SEPARATOR, 1)
self.elements.append(_STR_TO_TYPE[t](v))
def __iter__(self):
return iter(self.elements)
def __contains__(self, o):
return o in self.elements
def __get_item__(self, *args, **kwargs):
return self.elements.__get_items__(*args, **kwargs)
def __str__(self):
out = []
for e in self.elements:
out.append("{}{}{}".format(
_TYPE_TO_STR[type(e)], _TYPE_SEPARATOR, str(e)
))
return self.SEPARATOR.join(out)
_STR_TO_TYPE = {
"bool": bool,
"int": int,
"float": float,
"str": str,
"list": ConfigList,
}
_TYPE_TO_STR = {v: k for (k, v) in _STR_TO_TYPE.items()}
_TYPE_SEPARATOR = ":"
class Plugin(PluginBase):
def __init__(self):
self.config = configparser.RawConfigParser()
self.base_path = os.getenv(
"XDG_CONFIG_HOME",
os.path.expanduser("~/.config")
)
self.config_file_path_fmt = os.path.join(
"{directory}", "{app_name}.conf"
)
def get_interfaces(self):
return ['configuration']
def config_load(self, application_name):
config_path = self.config_file_path_fmt.format(
directory=self.base_path,
app_name=application_name,
)
self.config = configparser.RawConfigParser()
LOGGER.info("Loading configuration '%s' ...", config_path)
with open(config_path, 'r') as fd:
self.config.read_file(fd)
def config_save(self, application_name):
config_path = self.config_file_path_fmt.format(
directory=self.base_path,
app_name=application_name,
)
LOGGER.info("Writing configuration '%s' ...", config_path)
with open(config_path, 'w') as fd:
self.config.write(fd)
def config_load_plugins(self, core):
"""
Load and init the plugin list from the configuration.
"""
modules = self.config_get("plugins", "modules", ConfigList())
LOGGER.info(
"Loading plugins from configuration: %s",
str(modules)
)
for module in modules:
core.load(module)
core.init()
def config_add_plugin(self, module_name):
LOGGER.info("Adding plugin '%s' to configuration", module_name)
modules = self.config_get("plugins", "modules", ConfigList())
modules.elements.append(module_name)
self.config_put("plugins", "modules", modules)
def config_remove_plugin(self, module_name):
LOGGER.info("Removing plugin '%s' from configuration", module_name)
modules = self.config_get("plugins", "modules", ConfigList())
modules.elements.remove(module_name)
self.config_put("plugins", "modules", modules)
def config_put(self, section, key, value):
"""
Section must be a string.
Key must be a string.
"""
LOGGER.debug("Configuration: %s:%s <-- %s", section, key, str(value))
t = _TYPE_TO_STR[type(value)]
value = "{}{}{}".format(t, _TYPE_SEPARATOR, str(value))
if section not in self.config:
self.config[section] = {key: value}
else:
self.config[section][key] = value
def config_get(self, section, key, default=None):
try:
value = self.config[section][key]
(t, value) = value.split(_TYPE_SEPARATOR, 1)
r = _STR_TO_TYPE[t](value)
LOGGER.debug("Configuration: %s:%s --> %s", section, key, str(r))
return r
except KeyError:
if default is None:
raise KeyError(
"Configuration: {}:{} not found".format(section, key)
)
LOGGER.debug(
"Configuration: %s:%s --> %s (default value)",
section, key, str(default)
)
return default
import tempfile
import unittest
import unittest.mock
import openpaperwork_core
class TestReadWrite(unittest.TestCase):
def test_simple_getset(self):
core = openpaperwork_core.Core()
core.load('openpaperwork_core.config_file')
core.init()
core.call_all('config_put', 'test_section', 'test_key', 'test_value')
v = core.call_one('config_get', 'test_section', 'test_key')
self.assertEqual(v, 'test_value')
v = core.call_one('config_get', 'wrong_section', 'test_key', 'default')
self.assertEqual(v, 'default')
with self.assertRaises(KeyError):
core.call_one('config_get', 'test_section', 'wrong_key')
core.call_all('config_add_plugin', 'some_test_module')
def test_simple_readwrite(self):
core = openpaperwork_core.Core()
core.load('openpaperwork_core.config_file')
core.get('openpaperwork_core.config_file').base_path = tempfile.mkdtemp(
prefix='openpaperwork_core_config_tests'
)
core.init()
core.call_all('config_put', 'test_section', 'test_key', 'test_value')
core.call_all('config_add_plugin', 'some_test_module')
core.call_all('config_save', 'openpaperwork_test')
core.call_all('config_load', 'openpaperwork_test')
v = core.call_one('config_get', 'test_section', 'test_key')
self.assertEqual(v, 'test_value')
v = core.call_one('config_get', 'wrong_section', 'test_key', 'default')
self.assertEqual(v, 'default')
with self.assertRaises(KeyError):
core.call_one('config_get', 'test_section', 'wrong_key')
@unittest.mock.patch("importlib.import_module")
def test_simple_load_module(self, import_module):
import openpaperwork_core.config_file
core = openpaperwork_core.Core()
import_module.return_value = openpaperwork_core.config_file
core.load('openpaperwork_core.config_file')
import_module.assert_called_once_with('openpaperwork_core.config_file')
core.init()
class TestModule(object):
class Plugin(openpaperwork_core.PluginBase):
def __init__(self):
self.initialized = False
def init(self):
self.initialized = True
core.call_all('config_add_plugin', 'some_test_module')
core.call_all('config_add_plugin', 'some_test_module_2')
import_module.reset_mock()
import_module.side_effect = [TestModule(), TestModule()]
core.call_all('config_load_plugins', core)
import_module.assert_called_with('some_test_module_2')
self.assertEqual(core.get('some_test_module').initialized, True)
self.assertEqual(core.get('some_test_module_2').initialized, True)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment