# HG changeset patch # User Alain Leufroy # Date 1614720450 -3600 # Tue Mar 02 22:27:30 2021 +0100 # Node ID 200ba211497961bb9b5f6de8dd87753fcf8dd024 # Parent 85a05129a9f59a575f6734edf68a6be9a921811e rewrite code to make it a bit more easy to improve and add tests Tests for list are still missing. But it's late. diff --git a/Makefile b/Makefile --- a/Makefile +++ b/Makefile @@ -1,2 +1,8 @@ normalize: black -S overlayctl + +test: + python -m unittest test_overlayctl + +devtest: + hg manifest | entr -s 'pytest test_overlayctl.py --failed-first --exitfirst' diff --git a/overlayctl b/overlayctl --- a/overlayctl +++ b/overlayctl @@ -17,13 +17,13 @@ import json import logging -import operator import os import re import shutil import sys from collections import OrderedDict, defaultdict -from functools import reduce +from contextlib import contextmanager +from functools import lru_cache from itertools import chain from pathlib import Path from subprocess import PIPE, Popen @@ -52,25 +52,6 @@ UNITSDIR = Path(os.environ.get('UNITSDIR', Path('/') / 'etc' / 'systemd' / 'system')) -MOUNTTMPL = ''' -[Unit] -Description = {name} Container overlay -ConditionPathExists = {mountdir} -[Mount] -What = overlay -Where = {mountdir} -Type = overlay -Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir} -''' - -AUTOMOUNTTMPL = ''' -[Automount] -Where={mountdir} -[Install] -WantedBy=local-fs.target -''' - - class OverlayctlError(SystemExit): """Raised on exception.""" @@ -85,222 +66,452 @@ """raised when an overlay can not be deleted.""" -def _name2mountdir(name): - """return the mount point path for the given overlay name. +class BaseLayer: + def __init__(self, name): + self.name = name - If the name contains a path separator just return it, otherwise - return a folder named name as a subfolder of MOUNTDIR. - """ - if os.path.sep in name: - return Path(name) - return MOUNTDIR / name - + def __eq__(self, layer): + return isinstance(layer, BaseLayer) and self.mountdir == layer.mountdir -def _name2unit(name, _cache={}): - """return systemd Unit file name for the given overlay name""" - if name in _cache: - return _cache[name] - mountdir = _name2mountdir(name) - _cache[name] = str(sh.systemd_escape(mountdir, path=True)).strip() - return _cache[name] - + def __hash__(self): + return hash(str(self.mountdir)) -def _name2upperdir(name): - """return upperdir path for the given overlay name""" - return UPPERSDIR / _name2unit(name) - - -def _name2workdir(name): - """return workdir path for the given overlay name""" - return WORKSDIR / _name2unit(name) - - -def _name2lowerdir(name): - """return path for the given overlay name to be used as a lowerdir - for another overlays. + @property + def mountdir(self): + """return the mount point path for the given overlay name. - - If name is not a managed overlay, return the corresponding mount point path. - - If name is a managed overlay, return the corresponding name to get around overlayfs - limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html) - """ - upper = _name2upperdir(name) - if upper.exists(): - return upper - return _name2mountdir(name) + If the name contains a path separator just return it, otherwise + return a folder named name as a subfolder of MOUNTDIR. + """ + if os.path.sep in self.name: + return Path(self.name) + return MOUNTDIR / self.name - -def _name2infopath(name): - """return the metadata file path for the given overlay name""" - return INFOSDIR / _name2unit(name) - + def __str__(self): + return self.name -def _compute_info(name, lowers): - """return a dictionary containing metadata for the given overlay name - stacking the given lower directories. - Existing overlay metadata are ignored. - """ - info = {} - info['name'] = name - info['lowers'] = lowers - info['unitname'] = _name2unit(name) - info['mountdir'] = _name2mountdir(name) - info['lowerdir'] = [ - _name2lowerdir(lower) for lower in _get_linearized_lowerdirs(name, lowers) - ] - info['upperdir'] = _name2upperdir(name) - info['workdir'] = _name2workdir(name) - info['mountpath'] = (UNITSDIR / info['unitname']).with_suffix('.mount') - info['automountpath'] = (UNITSDIR / info['unitname']).with_suffix('.automount') - return info + @property + def lowerdir(self): + raise NotImplementedError + + @property + def lowers(self): + """Lower layers.""" + raise NotImplementedError + + def is_managed(self): + raise NotImplementedError -def _read_info(name, missing_ok=False): - """return saved metadata of an existing overlay name. +class Layer(BaseLayer): + """An overlay layer.""" + + def __init__(self, name): + super(Layer, self).__init__(name) + self._lowers = None + #XXX cache properties + + @property + def lowers(self): + """Lower layers.""" + if self._lowers is None: + self.load() + return self._lowers + + @lowers.setter + def lowers(self, lowers): + # XXX detect cyclic dependencies. + self._lowers = lowers + + @property + def upperdir(self): + """return upperdir path for the given overlay name""" + return UPPERSDIR / self.unitname + + @property + def workdir(self): + """return workdir path for the given overlay name""" + return WORKSDIR / self.unitname + + @property + def lowerdir(self): + """return path for the given overlay name to be used as a lowerdir + for another overlays. - If missing_ok is True, brand new overlay metadata is computed with - no lowers. + - If name is not a managed overlay, return the corresponding mount point path. + - If name is a managed overlay, return the corresponding name to get around overlayfs + limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html) + """ + if self.upperdir.exists(): + return self.upperdir + return self.mountdir + + @property + def unitname(self): + """return systemd Unit file name for the given overlay name""" + return systemd_escape(self.mountdir) + + @property + def infopath(self): + """return the metadata file path for the given overlay name""" + return INFOSDIR / self.unitname + + @property + def automountunit(self): + """Systeme automount unit.""" + return AutomountUnit(self.unitname) + + @property + def mountunit(self): + """Systeme mount unit.""" + return MountUnit(self.unitname) + + def __repr__(self): + return f'' + + def __str__(self): + return self.name + + def is_managed(self): + return self.infopath.exists() + + def get_ascendants(self): + return _get_linearized_lowers(self, self.lowers) + + def load(self, missing_ok=True): + """return saved metadata of an existing overlay name. + + If missing_ok is True, brand new overlay metadata is computed with + no lowers. - If a systemd Unit file is found, it is parsed to retrieve real metadata. - """ - info = _compute_info(name, []) - infopath = _name2infopath(name) - if infopath.exists(): - with infopath.open(encoding='utf-8') as fobj: - info = json.load(fobj) - assert name == info['name'] - info['mountpath'] = Path(info['mountpath']) - info['automountpath'] = Path(info['automountpath']) - elif not missing_ok: - raise OSError('%s is not a managed overlay: %s not found' % (name, infopath)) - if info['mountpath'].exists(): - logger.debug('Metadata found for %s', name) - content = info['mountpath'].read_text(encoding='utf-8') - info['upperdir'] = Path( - re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\') + If a systemd Unit file is found, it is parsed to retrieve real metadata. + """ + infopath = self.infopath + if infopath.exists(): + with infopath.open(encoding='utf-8') as fobj: + info = json.load(fobj) + assert info['name'] == self.name + if self._lowers is None: + self._lowers = [build_layer(lower) for lower in info['lowers']] + else: + self._lowers = [] + self.check_consistency() + + def check_consistency(self): + """Check that info and units are consistents.""" + if self.mountunit.path.exists(): + logger.debug('Metadata found for %s', self.name) + content = self.mountunit.path.read_text(encoding='utf-8') + assert self.upperdir == Path( + re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\') + ) + lowerdirs = re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':') + assert [lower.lowerdir for lower in self.get_ascendants()] \ + == [Path(p.replace('\\\\', '\\')) for p in lowerdirs if p] + assert self.workdir == Path( + re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\') + ) + assert self.mountdir == Path( + re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip() + ) + else: + logger.debug("No metadata found for %s", self.name) + + def dump(self): + """Write the given overlay metadata to disk.""" + data = { + 'name': self.name, + 'lowers': [lower.name for lower in self.lowers] + } + self.infopath.write_text( + json.dumps(data, indent=2), encoding='utf-8' + ) + logger.debug('%s updated', self.infopath) + data['lowerdir'] = ':'.join( + str(lower.lowerdir).replace('\\', '\\\\') + for lower in self.get_ascendants() ) - info['lowerdir'] = [ - Path(p.replace('\\\\', '\\')) - for p in re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':') - ] - info['workdir'] = Path( - re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\') - ) - info['mountdir'] = Path( - re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip() - ) - else: - logger.debug("No metadata found for %s", name) - return info + data['upperdir'] = str(self.upperdir).replace('\\', '\\\\') + data['workdir'] = str(self.workdir).replace('\\', '\\\\') + data['mountdir'] = str(self.mountdir).replace('\\', '\\\\') + self.upperdir.mkdir(parents=True, exist_ok=True) + self.workdir.mkdir(parents=True, exist_ok=True) + self.mountdir.mkdir(parents=True, exist_ok=True) + self.mountunit.dump(**data) + self.automountunit.dump(**data) + + def start(self, permanent=False): + """Start the automount systemd unit. + + Enable the automount unit if permanent is truthy. + """ + self.automountunit.start() + if permanent: + self.automountunit.enable() + + def stop(self, permanent=False): + """Start the automount and mount systemd units. + + Disable the automount unit if permanent is truthy. + """ + self.automountunit.stop() + self.mountunit.stop() + if permanent: + self.automountunit.disable() + + @contextmanager + def temporary_stop(self): + """Temporary ensure that the systemd units are stopped. + + Restart the automount unit if it was preciously started. + """ + is_automonted = self.automountunit.is_active() + is_mounted = self.mountunit.is_active() + self.automountunit.stop() + self.mountunit.stop() + yield + if is_automonted: + self.automountunit.start() + if is_mounted: + self.mountunit.start() + + def delete(self): + """Delete the overlay. + + Stop, disable and delete the systemd units. + Delete the overlay info file and upper/work directories. + """ + self.stop(permanent=True) + self.automountunit.unlink() + self.mountunit.unlink() + systemctl.daemon_reload() + shutil.rmtree(bytes(self.mountdir), ignore_errors=True) + shutil.rmtree(bytes(self.workdir), ignore_errors=True) + shutil.rmtree(bytes(self.upperdir), ignore_errors=True) + self.infopath.unlink() -def _write_info(info): - """Write the given overlay metadata to disk.""" - data = { - 'name': info['name'], - 'lowers': info['lowers'], - 'unitname': info['unitname'], - 'mountpath': str(info['mountpath']), - 'automountpath': str(info['automountpath']), - } - _name2infopath(info['name']).write_text( - json.dumps(data, indent=2), encoding='utf-8' - ) - logger.debug('%s updated', _name2infopath(info['name'])) - data = info.copy() - data['lowerdir'] = ':'.join( - str(path).replace('\\', '\\\\') for path in data['lowerdir'] - ) - data['upperdir'] = str(data['upperdir']).replace('\\', '\\\\') - data['workdir'] = str(data['workdir']).replace('\\', '\\\\') - data['mountpath'] = str(info['mountpath']).replace('\\', '\\\\') - info['mountpath'].write_text(MOUNTTMPL.format(**data), encoding='utf-8') - logger.debug('%s updated', info['mountpath']) - info['automountpath'].write_text(AUTOMOUNTTMPL.format(**info), encoding='utf-8') - logger.debug('%s updated', info['automountpath']) +class UnmanagedLayer(BaseLayer): + + @property + def lowers(self): + """Lower layers.""" + return () + + @property + def lowerdir(self): + """return path for the given overlay name to be used as a lowerdir + for another overlays. + + - If name is not a managed overlay, return the corresponding mount point path. + - If name is a managed overlay, return the corresponding name to get around overlayfs + limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html) + """ + return self.mountdir + + def __repr__(self): + return f'' + + def is_managed(self): + return False + + +class GenericUnit: + """Generic systemd Unit interface.""" + + @property + def unittype(self): + """Systemd unit type (a.k.a "automount" or "mount").""" + raise NotImplementedError + + @property + def template(self): + """Template of the systemd unit content.""" + raise NotImplementedError + + def __init__(self, unitname): + self.unitname = unitname + + @property + def path(self): + """File system path of the systemd unit file.""" + return (UNITSDIR / self.unitname).with_suffix('.' + self.unittype) + + def dump(self, **data): + """Write the definition of the systemd unit into the disk.""" + self.path.write_text(self.template.format(**data), encoding='utf-8') + logger.debug('Systemd unit %s updated', self.path.name) + + def start(self): + """Start the systemd unit.""" + systemctl.start(self.path.name) + logger.info('Systemd unit %s started', self.path.name) + + def stop(self): + """Stop the systemd unit.""" + systemctl.stop(self.path.name) + logger.info('Systemd unit %s stopped', self.path.name) + + def enable(self): + """Enable the systemd unit.""" + systemctl.enable(self.path.name) + logger.info('Systemd unit %s enabled', self.path.name) + + def disable(self): + """Disable the systemd unit.""" + systemctl.disable(self.path.name) + logger.info('Systemd unit %s disabled', self.path.name) + + def unlink(self): + """Remove the systemd unit file.""" + self.path.unlink() + + def is_active(self): + """Return True if the unit is active.""" + return 'active' in systemctl.status(self.path.name)['Active'].split() + -def _iter_overlays_info_simple(): - for path in INFOSDIR.glob('*'): - info = json.loads(path.read_text(encoding='utf-8')) - yield _read_info(info['name']) +class MountUnit(GenericUnit): + """Systemd mount unit.""" + + unittype = 'mount' + template = ''' +[Unit] +Description = {name} Container overlay +ConditionPathExists = {mountdir} +[Mount] +What = overlay +Where = {mountdir} +Type = overlay +Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir} +''' + + +class AutomountUnit(GenericUnit): + """Systemd automount unit.""" + + unittype = 'automount' + template = ''' +[Automount] +Where={mountdir} +[Install] +WantedBy=local-fs.target +''' + + +class systemctl: -def _iter_overlays_info(ordered=None): - """Yield metadata for each managed overlay""" - infos = _iter_overlays_info_simple() + @classmethod + def start(cls, unitname): + cls._execute('start', unitname) + + @classmethod + def stop(cls, unitname): + cls._execute('stop', unitname) + + @classmethod + def enable(cls, unitname): + cls._execute('enable', unitname) + + @classmethod + def disable(cls, unitname): + cls._execute('enable', unitname) + + @classmethod + def daemon_reload(cls): + cls._execute('daemon-reload') + + @classmethod + def status(cls, unit): + """Return a disctionary containing data from `systemctl status {unit}`.""" + stdout, dummy_stderr = Popen( + ['systemctl', 'status', unit], stdout=PIPE + ).communicate() + return dict( + map(str.strip, line.split(':', 1)) + for line in stdout.decode('utf-8').splitlines() + if ':' in line + ) + + @staticmethod + def _execute(command, *args): + process = Popen(['systemctl', command] + list(args), stdout=PIPE) + stdout, dummy_stderr = process.communicate() + return stdout + + +def systemd_escape(path): + process = Popen(['systemd-escape', '--path', str(path)], stdout=PIPE) + stdout, dummy_stderr = process.communicate() + return stdout.strip().decode(sys.getdefaultencoding()) + + +@lru_cache +def build_layer(name): + """A registry to instanciate Layer object. + + The main idea is to ensure that only one object is instanciated + per layer. In fact each layer may have lower layers that can be + instanciated at any time. + """ + layer = Layer(name) + if layer.is_managed(): + return layer + return UnmanagedLayer(name) + + +def _iter_layers_simple(): + """Yield all known layers.""" + for path in INFOSDIR.glob('*'): + info = json.loads(path.read_text(encoding='utf-8')) + layer = build_layer(info['name']) + layer.lowers = [build_layer(lowername) for lowername in info['lowers']] + yield layer + + +def _iter_layers(ordered=None): + """Yield all known layers. + + :ordered: if truthy, yield layers by order. + """ + layers = list(_iter_layers_simple()) if not ordered: - return infos - infos = {info['name']: info for info in infos} - lowers = set(chain(*[info['lowers'] for info in infos.values()])) - heads = list(set(infos) - lowers) + return layers + lowers = set(chain(*(layer.lowers for layer in layers))) + heads = list(set(layers) - lowers) return ( - infos[name] for name in _get_linearized_lowerdirs(None, heads) if name in infos + layer for layer in _get_linearized_lowers(None, heads) if layer in layers ) -def _iter_tree(info, tree): - infos = [info] - seens = {info['name']} - while infos: - info = infos.pop() - for _info in tree[info['name']]: - if _info['name'] in seens: - continue - infos.append(_info) - seens.add(_info['name']) - yield _info - - -def _iter_descendants(info): - tree = defaultdict(list) - for _info in _iter_overlays_info_simple(): - for lower in _info['lowers']: - tree[lower].append(_info) - return _iter_tree(info, tree) - - -def _iter_ascendants(info): - tree = defaultdict(list) - infos = {i['name']: i for i in _iter_overlays_info_simple()} - tree = {n: [infos[x] for x in i['lowers'] if x in infos] - for n, i in infos.items()} - return _iter_tree(info, tree) +def _iter_tree(current, next_layers_getter): + """Utility to walk throught the layers tree.""" + layers = next_layers_getter(current)[:] + seens = {current} + while layers: + layer = layers.pop(0) + if layer in seens: + continue + layers += next_layers_getter(layer) or [] + seens.add(layer) + yield layer -def _systemctl_status(unit): - """REturn a disctionary containing data from `systemctl status {unit}`.""" - stdout, dummy_stderr = Popen( - ['systemctl', 'status', unit], stdout=PIPE - ).communicate() - return dict( - map(str.strip, line.split(':', 1)) - for line in stdout.decode('utf-8').splitlines() - if ':' in line - ) - - -def _is_mount_active(info): - return 'active' in _systemctl_status(info['mountpath'].name)['Active'].split() +def _iter_ascendants(current): + """Yield layers that depend on the given `layer`.""" + return _iter_tree(current, lambda layer: layer.lowers) -def _is_automount_active(info): - return 'active' in _systemctl_status(info['automountpath'].name)['Active'].split() - - -def _disable_units(info): - if _is_automount_active(info): - sh.systemctl.stop(info['automountpath'].name) - if _is_mount_active(info): - sh.systemctl.stop(info['mountpath'].name) - sh.systemctl.disable(info['automountpath'].name).wait() - sh.systemctl.disable(info['mountpath'].name).wait() - logger.debug('Systemd %s disabled', info['automountpath'].name) - - -def _remove_units(info): - _disable_units(info) - info['automountpath'].unlink() - info['mountpath'].unlink() +def _iter_descendants(current): + """Yield layers the `layer` depends on.""" + tree = defaultdict(list) + for layer in _iter_layers_simple(): + tree[layer] + for parent in layer.lowers: + tree[parent].append(layer) + return _iter_tree(current, tree.get) def c3_merge(graph): @@ -334,23 +545,23 @@ return res -def _get_linearized_lowerdirs(name, lowers): - """Get lower dirs of the given overlay name stacking lowers. It +def _get_linearized_lowers(layer, lowers=None): + """Get lowers of the given overlay name stacking lowers. It resolve every {sub}lowers for which overlay information can be retrieved. """ - lowers = lowers[:] - graph = defaultdict(list, {name: lowers[:]}) + lowers = lowers[:] if lowers else layer.lowers + graph = defaultdict(list, {layer: lowers[:]}) while lowers: lower = lowers.pop(0) # consume - sublowers = _read_info(lower, missing_ok=True)['lowers'] - graph[lower] = sublowers - lowers += sublowers # also add sublowers to resolver + if lower.lowers: + graph[lower] = lower.lowers + lowers += lower.lowers # also add sublowers to resolver result = {} - c3_linearize(name, graph, result) - return result[name][1:] + c3_linearize(layer, graph, result) + return result[layer][1:] -def creater(name, lowers, start, permanent): +def creater(name, lowers, start=None, permanent=None): """create an overlay at `name` that inherites from `lowerdir`. :name: the overlay path. @@ -361,57 +572,55 @@ If a lower path doesn't contains a path separator (a.k.a. '/'), /var/lib/machines/{lower} is used """ - info = _compute_info(name, lowers) - info['upperdir'].mkdir(parents=True, exist_ok=True) - info['workdir'].mkdir(parents=True, exist_ok=True) - _write_info(info) - sh.systemctl('daemon-reload').wait() - logger.info("%s created as %s", info['name'], info['unitname']) + layer = Layer(name) + layer.lowers = [build_layer(lowername) for lowername in lowers] + layer.dump() + systemctl.daemon_reload() if start: - sh.systemctl.start(info['automountpath'].name).wait() - logger.debug("Systemd %s.automount unit started", info['unitname']) + layer.start(permanent) else: - logger.warning("Use `overlayctl start %s` to start the overlay.", info['name']) - if permanent: - sh.systemctl.enable(info['automountpath'].name).wait() - logger.debug("Systemd %s.automount unit enabled", info['unitname']) + logger.warning("Use `overlayctl start %s` to start the overlay.", layer.name) def deleter(name): """Delete a managed overlay and related folder/files""" - lowers = reduce( - operator.or_, (set(info['lowers']) for info in _iter_overlays_info()), set() - ) - lowers = set(_name2unit(lower) for lower in lowers) - if _name2unit(name) in lowers: - raise DeletionError("\"%s\" is a lower directory of another overlay" % name) - info = _read_info(name) - _remove_units(info) - sh.systemctl('daemon-reload').wait() - shutil.rmtree(bytes(info['mountdir']), ignore_errors=True) - shutil.rmtree(bytes(info['workdir'])) - shutil.rmtree(bytes(info['upperdir'])) - _name2infopath(name).unlink() - logger.debug("Related file and folder deleted") - logger.info("\"%s\" deleted", info['name']) + lowers = set(chain(*(x.lowers for x in _iter_layers_simple()))) + layer = build_layer(name) + if not layer.is_managed(): + logger.info("\"%s\" not managed. Nothing to do.", layer.name) + return + descendants = list(_iter_descendants(layer)) + if descendants: + # XXX allow this with automatic updates. + raise DeletionError("\"%s\" is a lower directory of another overlay: %s" + % (name, ', '.join(x.name for x in descendants))) + layer.delete() + systemctl.daemon_reload() + logger.info("\"%s\" deleted", layer.name) -def editer(name, appended=None, prepanded=None, removed=None): +def _get_editor_on_file(filename): + cmd = '%s "%s"' % (os.environ.get('EDITOR', 'vim'), filename) + Popen(cmd, shell=True).wait() + + +def editer(name, appended=None, prepended=None, removed=None): """Ask user to edit lowers of the given overlay name, rewrite the metadata and unit files accordingly, finaly retart systemd units of the edited overlay. """ - info = _read_info(name) - if not appended and not prepanded and not removed: + layer = build_layer(name) + # XXX check descendants not active + if not appended and not prepended and not removed: with NamedTemporaryFile() as fobj: fobj.write(b'# -*- encoding: utf-8 -*-\n') fobj.write(b'# Note: Leave unchanged or fully empty to ignore changes\n') - fobj.write(b'\n'.join(lower.encode('utf-8') for lower in info['lowers'])) + fobj.write(b'\n'.join(lower.name.encode('utf-8') for lower in layer.lowers)) fobj.flush() - Popen(os.environ.get('EDITOR', 'vim') + ' ' + fobj.name, shell=True).wait() + _get_editor_on_file(fobj.name) fobj.seek(0) lowers = [ - line.strip() + build_layer(line.strip()) for line in fobj.read().decode('utf-8').splitlines() if not line.strip().startswith('#') and line.strip() ] @@ -419,110 +628,102 @@ if not fobj.read(1): return # Ignore empty file for convenience else: - prepanded = prepanded or [] - added = appended or [] - removed = removed or () - lowers = prepanded + [ - lower for lower in info['lowers'] if lower not in removed - ] + added - if lowers == info['lowers']: + prepended = map(build_layer, prepended or ()) + appended = map(build_layer, appended or ()) + removed = list(map(build_layer, removed or ())) + lowers = chain( + prepended, + (lower for lower in layer.lowers if lower not in removed), + appended) + if lowers == layer.lowers: return - automounted = _is_automount_active(info) - mounted = _is_mount_active(info) - if automounted: - sh.systemctl.stop(info['automountpath'].name) - if mounted: - sh.systemctl.stop(info['mountpath'].name) - info = _compute_info(name, lowers) - _write_info(info) - sh.systemctl('daemon-reload').wait() - if automounted: - sh.systemctl.start(info['automountpath'].name) - if mounted: - sh.systemctl.start(info['automount'].name) - logger.debug("Systemd \"%s\" restarted", info['automountpath']) - logger.info("\"%s\" updated", info['name']) + with layer.temporary_stop(): + layer.lowers[:] = lowers + layer.dump() + systemctl.daemon_reload() + # XXX propagate to descendants + logger.debug("Systemd \"%s\" restarted", layer.unitname) + logger.info("\"%s\" updated", layer.name) def starter(name, preserve_others=False, stop_others=False, permanent=False): - info = _read_info(name) - if _is_automount_active(info): - logger.info('%s is already started.', info['name']) + layer = build_layer(name) + if not layer.is_managed(): + logger.info("\"%s\" not managed. Nothing to do.", layer.name) return if not preserve_others: to_stop = [] - for _info in chain(_iter_descendants(info), _iter_ascendants(info)): - if _is_automount_active(_info): - to_stop.append(_info) + for _layer in chain(_iter_descendants(layer), _iter_ascendants(layer)): + if not _layer.is_managed(): + continue + if _layer.automountunit.is_active(): + to_stop.append(_layer) if to_stop: if not stop_others: logger.error( 'You should prefere to stop the following overlays: \n\n%s\n', - ', '.join(i['name'] for i in to_stop)) + ', '.join(x.name for x in to_stop)) logger.info( 'You may want to use `--stop-others`. ' 'Or see --help for more options.') return - for _info in to_stop: - _stopper(_info, permanent) + for _layer in to_stop: + _layer.disable() if permanent else _layer.stop() logger.info( 'The following overlays were stopped: \n\n' - + ', '.join(i['name'] for i in to_stop)) - sh.systemctl.start(info['unitname'] + '.automount').wait() - if permanent: - sh.systemctl.enable(info['unitname'] + '.automount').wait() - logger.info("Systemd %s.automount unit enabled", info['unitname']) + + ', '.join(x.name for x in to_stop)) + layer.start(permanent) -def stoper(name, permanent): - _stopper(_read_info(name), permanent) - - -def _stopper(info, permanent): - sh.systemctl.stop(info['unitname'] + '.mount').wait() - sh.systemctl.stop(info['unitname'] + '.automount').wait() - if permanent: - sh.systemctl.disable(info['unitname'] + '.automount').wait() +def stoper(name, permanent=False): + layer = build_layer(name) + if not layer.is_managed(): + logger.info("\"%s\" not managed. Nothing to do.", layer.name) + return + layer.stop(permanent) def lister( unit_name, ordered_deps, reversed_deps, no_info, regexp, depends_on, terminal ): """List managed overlays and display overlay inheritance.""" - infos = _iter_overlays_info(ordered_deps or reversed_deps) - infos = reversed(list(infos)) if reversed_deps else infos + layers = _iter_layers(ordered_deps or reversed_deps) + layers = reversed(list(layers)) if reversed_deps else layers if terminal: - infos = list(infos) - lowers = set(chain(*(info['lowers'] for info in infos))) - infos = (info for info in infos if info['name'] not in lowers) + layers = list(layers) + lowers = set(chain(*(layer.lowers for layer in layers))) + layers = (layer for layer in layers if layer not in lowers) + if regexp: + match = re.compile(regexp).match + layers = (layer for layer in layers if match(layer.name)) found = False - for info in infos: - if regexp and not re.match(regexp, info['name']): - continue - alllowers = _get_linearized_lowerdirs(info['name'], info['lowers']) - otherlowers = tuple(lower for lower in alllowers if lower not in info['lowers']) - depslowers = set(alllowers) | set(otherlowers) + if depends_on: + depends_on = [{build_layer(layername) for layername in group} for group in depends_on] + for layer in layers: + descendants = layer.get_ascendants() + grandchildren = tuple(lower for lower in descendants if lower not in layer.lowers) if depends_on: - if all((depslowers & deps) != deps for deps in depends_on): + descendants = set(descendants) + if not any(descendants.issuperset(deps) for deps in depends_on): continue found = True - info['lowers'] = ', '.join(info['lowers']) - info['others'] = ', '.join(otherlowers) if otherlowers else '' + data = { + 'lowers': ', '.join(map(str, layer.lowers)) if layer.lowers else '', + 'others': ', '.join(map(str, grandchildren)) if grandchildren else '', + 'name': layer.name, + } fmt = ( Fore.GREEN + Style.BRIGHT - + ('{unitname}' if unit_name else '{name}') + + '{name}' + Style.RESET_ALL ) - try: - sh.systemctl.status(info['unitname'] + '.mount') + if layer.mountunit.is_active(): icon = Fore.GREEN + ' 🖴 ' + Style.RESET_ALL - except sh.ErrorReturnCode_3: - try: - sh.systemctl.status(info['unitname'] + '.automount') - icon = ' 🖴 ' - except sh.ErrorReturnCode_3: - icon = '' + elif layer.automountunit.is_active(): + icon = ' 🖴 ' + else: + icon = '' if not no_info: fmt += ( icon @@ -533,45 +734,57 @@ + '\N{Rightwards Dashed Arrow} {others}' + Style.RESET_ALL ) - print(fmt.format(**info)) + print(fmt.format(**data)) if not found: raise NoResult("No overlay matches.") -def deplacer(old, new): +def deplacer(oldname, newname): # Checks - if old == new: - logger.info('Not a new name, nothing to do.') + _new = Layer(newname) + + if _new.mountdir.exists(): + logger.error('"%s" already exists.', newname) return - existings = list(_iter_overlays_info()) - names = set(x['name'] for x in existings) - if new in names: - logger.error('New name "%s" already exists.', new) + + layer = build_layer(oldname) + + if layer == _new: + logger.info('Not a new unit name, nothing to do.') return - superseeders = [x for x in existings if old in x['lowers']] - logger.debug('%s overlays affected: %s', len(superseeders), - ', '.join(x['name'] for x in superseeders)) - mounted = [info['name'] for info in superseeders if _is_mount_active(info)] - if mounted: - logger.error('The following units must be stopped: %s', ', '.join(mounted)) - # Move overlay - old_info = _read_info(old) - new_info = _compute_info(new, old_info['lowers']) - for pathname in ('upperdir', 'workdir'): - new_info[pathname].parent.mkdir(parents=True, exist_ok=True) - old_info[pathname].rename(new_info[pathname]) - _write_info(new_info) - _remove_units(old_info) - _name2infopath(old).unlink() - # Update existing overlays that depends on the moved one. - for info in superseeders: - lowers = info['lowers'] - lowers[lowers.index(old)] = new - _write_info(_compute_info(info['name'], lowers)) - sh.systemctl('daemon-reload').wait() - sh.systemctl.start(new_info['automountpath'].name).wait() - logger.debug("Systemd %s unit enabled and started", new_info['unitname']) - logger.info("%s created as %s", new_info['name'], new_info['unitname']) + + descendants = list(_iter_descendants(layer)) + logger.debug( + '%s overlays affected: %s', len(descendants), + ', '.join(x.name for x in descendants) + ) + activated = [x.name for x in descendants if x.automountunit.is_active()] + if activated: + # XXX add --stop + logger.error( + 'The following units must be stopped: %s', ', '.join(activated)) + + is_automonted = layer.automountunit.is_active() + if is_automonted: + layer.stop() + + # Move overlay dirs. + layer.upperdir.rename(_new.upperdir) + layer.workdir.rename(_new.workdir) + + layer.delete() + + layer.name = newname + layer.dump() + + # Update existing layers that depends on the moved one. + for layer in descendants: + layer.dump() + + systemctl.daemon_reload() + + if is_automonted: + layer.start() def _setup_logger(level): @@ -583,13 +796,17 @@ logging.getLogger('sh').setLevel('WARNING') +def ensure_directories_exist(): + """Ensure required folders exist.""" + for path in (MOUNTDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR): + path.mkdir(parents=True, exist_ok=True) + + def main(): """Main entry point for the CLI""" from argparse import ArgumentParser - - for path in (MOUNTDIR, OVERLAYDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR): - path.mkdir(parents=True, exist_ok=True) + ensure_directories_exist() parser = ArgumentParser(description="Manage images as overlays for machinectl.") parser.add_argument( @@ -786,8 +1003,8 @@ "edit an overlay created by this tool. " "An editor is started if --add nor --remove provided."), ) - edit.add_argument('-p', '--prepand', action='store_true', - help="prepand the following names", ) + edit.add_argument('-p', '--prepend', action='store_true', + help="prepend the following names", ) edit.add_argument( '-a', '--append', action='append', help="append the following names", ) @@ -804,7 +1021,7 @@ ) def _editer(args): - editer(args.mountdir, args.append, args.prepand, args.delete) + editer(args.mountdir, args.append, args.prepend, args.delete) edit.set_defaults(func=_editer) diff --git a/test_overlayctl.py b/test_overlayctl.py new file mode 100644 --- /dev/null +++ b/test_overlayctl.py @@ -0,0 +1,563 @@ +import imp +import io +import json +import os +import re +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import TestCase, mock + +# Like `import overlayctl` but it does not need a real python lib. +with io.open('overlayctl') as fobj: + overlayctl = imp.load_module( + 'overlayctl', fobj, 'overlayctl', ('', 'r', imp.PY_SOURCE)) + + +class BaseTest(TestCase): + """Base TestCase that create temporary to work within.""" + + maxDiff = None + + def setUp(self): + self._tempdir = TemporaryDirectory() + tempdirpath = Path(self._tempdir.__enter__()) + self.systemctl = overlayctl.systemctl = mock.Mock() + self.mountdir = overlayctl.MOUNTDIR = tempdirpath / 'machines' + self.unitdir = overlayctl.UNITSDIR = tempdirpath / 'systemd' + overlaydir = tempdirpath / 'overlayctl' + self.uppersdir = overlayctl.UPPERSDIR = overlaydir / 'upper' + self.worksdir = overlayctl.WORKSDIR = overlaydir / 'work' + self.infosdir = overlayctl.INFOSDIR = overlaydir / 'info' + overlayctl.ensure_directories_exist() + overlayctl.build_layer.cache_clear() + overlayctl._setup_logger('CRITICAL') + + def tearDown(self): + self._tempdir.cleanup() + + def assert_daemon_reload_called(self): + self.systemctl.daemon_reload.assert_called() + + def get_unit_name(self, systemd_name): + name = '-'.join([ + str(self.mountdir).replace(os.path.sep, '-'), + systemd_name + ]) + return name.lstrip('-') + + def get_mount_dir(self, name): + return self.mountdir / name + + def get_upper_dir(self, systemd_name): + return self.uppersdir / self.get_unit_name(systemd_name) + + def get_work_dir(self, systemd_name): + return self.worksdir / self.get_unit_name(systemd_name) + + def write_info(self, name, lowers): + (self.infosdir / self.get_unit_name(name)).write_text( + json.dumps({'name': name, 'lowers': lowers})) + + def assert_info_equal(self, name, lowers, systemd_name=''): + unitname = self.get_unit_name(systemd_name or name) + filepath = self.infosdir / unitname + info = json.loads(filepath.read_text()) + self.assertDictEqual({'name': name, 'lowers': lowers}, info) + + def assert_overlay_dirs_exists(self, systemd_name, name=None): + self.assertTrue(self.get_upper_dir(systemd_name).is_dir()) + self.assertTrue(self.get_work_dir(systemd_name).is_dir()) + self.assertTrue(self.get_mount_dir(name or systemd_name).is_dir()) + + def get_unit_path(self, systemd_name, unittype): + return (self.unitdir / self.get_unit_name(systemd_name)).with_suffix('.' + unittype) + + def parse_unit(self, systemd_name, unittype): + path = self.get_unit_path(systemd_name, unittype) + content = {} + target = content + sectionmatch = re.compile(r'\[(.*?)\]').match + entrymatch = re.compile(r'^\s*(.*?)\s*=\s*(.*?)\s*$').match + with path.open(encoding='utf-8') as fobj: + for line in fobj: + section = sectionmatch(line) + if line.lstrip().startswith('#'): + continue + if section: + [key] = section.groups() + target = content[key] = {} + continue + entry = entrymatch(line) + if entry: + key, value = entry.groups() + if key == 'Options': + value = { + key: list(filter(None, folders.split(':'))) + for group in value.split(',') + for key, folders in [group.split('=', 1)]} + target[key] = value + return content + + def assert_automount_unit_equal(self, systemd_name, expected): + result = self.parse_unit(systemd_name, 'automount') + self.assertDictEqual(result, expected) + + def assert_mount_unit_equal(self, systemd_name, expected): + result = self.parse_unit(systemd_name, 'mount') + self.assertDictEqual(result, expected) + + def check_mount_unit(self, layer_name, lower_names): + self.assert_mount_unit_equal(layer_name, { + 'Mount': { + 'Options': { + 'lowerdir': [str(lower_name) for lower_name in lower_names], + 'upperdir': [str(self.get_upper_dir(layer_name))], + 'workdir': [str(self.get_work_dir(layer_name))]}, + 'Type': 'overlay', + 'What': 'overlay', + 'Where': str(self.get_mount_dir(layer_name))}, + 'Unit': { + 'ConditionPathExists': str(self.get_mount_dir(layer_name)), + 'Description': '%s Container overlay' % layer_name + }}) + + def assert_layer_started(self, name): + self.systemctl.start.assert_called_once_with(self.get_unit_name(name) + '.automount') + + def assert_layer_enabled(self, name): + self.systemctl.enable.assert_called_once_with(self.get_unit_name(name) + '.automount') + + def assert_layer_stopped(self, name): + self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.automount') + self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.mount') + + def assert_layer_disabled(self, name): + self.systemctl.disable.assert_called_once_with(self.get_unit_name(name) + '.automount') + + +class TestCreate(BaseTest): + """Test the `create` command.""" + + def test_creating_a_root_layer(self): + # With no dependencies, the layer can be created. + # I don't know if it's a good idea for now. But it's allowed. + # The layer name is escaped + name = 'test-it' + systemd_name = r'test\x2dit' + overlayctl.creater(name, lowers=(), start=False, permanent=False) + self.assert_daemon_reload_called() + self.assert_info_equal(name=name, lowers=[], systemd_name=systemd_name) + self.assert_overlay_dirs_exists(systemd_name, name) + self.assert_automount_unit_equal(systemd_name, { + 'Automount': {'Where': str(self.get_mount_dir('test-it'))}, + 'Install': {'WantedBy': 'local-fs.target'} + }) + self.assert_mount_unit_equal(systemd_name, { + 'Mount': { + 'Options': { + 'lowerdir': [], + 'upperdir': [str(self.get_upper_dir(r'test\\x2dit'))], + 'workdir': [str(self.get_work_dir(r'test\\x2dit'))]}, + 'Type': 'overlay', + 'What': 'overlay', + 'Where': str(self.get_mount_dir('test-it'))}, + 'Unit': { + 'ConditionPathExists': str(self.get_mount_dir('test-it')), + 'Description': 'test-it Container overlay' + }}) + + def test_creating_a_simple_layer(self): + # With dependencies, the layer is created. The layer can + # depend on another layer, an absolute path to a directory or + # a name of a folder in the mount directory. + self.get_upper_dir('lower1').mkdir() + self.write_info('lower1', []) + lower2dir = self.mountdir / 'lower2' + lower2dir.mkdir() + overlayctl.creater( + 'test', lowers=('lower1', str(lower2dir), 'lower3')) + self.assert_daemon_reload_called() + self.assert_info_equal('test', lowers=[ + 'lower1', str(lower2dir), 'lower3']) + self.assert_overlay_dirs_exists('test') + self.assert_automount_unit_equal('test', { + 'Automount': {'Where': str(self.get_mount_dir('test'))}, + 'Install': {'WantedBy': 'local-fs.target'} + }) + self.check_mount_unit('test', [ + self.get_upper_dir('lower1'), + self.get_mount_dir('lower2'), + self.get_mount_dir('lower3')]) + + def test_automount_started(self): + # with --start, the automount unit is automatically started. + self.systemctl.status.return_value = {'Active': 'inactive'} + self.get_mount_dir('lower').mkdir() + overlayctl.creater('test', lowers=('lower',), start=True) + self.assert_overlay_dirs_exists('test') + self.assert_daemon_reload_called() + self.systemctl.start.assert_called_once_with(self.get_unit_name('test') + '.automount') + + def test_automount_enabled(self): + # with --start --permanent, the automount unit is automatically started and enabled. + self.systemctl.status.return_value = {'Active': 'inactive'} + self.get_mount_dir('lower').mkdir() + overlayctl.creater('test', lowers=('lower',), start=True, permanent=True) + self.assert_overlay_dirs_exists('test') + self.assert_daemon_reload_called() + unit = self.get_unit_name('test') + '.automount' + self.systemctl.start.assert_called_once_with(unit) + self.systemctl.enable.assert_called_once_with(unit) + + def test_creating_an_advanced_layer(self): + # Let's build an app with the following dependency tree. + # app + # ┌─────────────┼──────────┐ + # graphql flask nginx + # │ │ │ + # sqlalchemy │ network + # ┌───┴────┐ ┌──────┘ + # postgresql python + # └────┬────┘ + # archbase + # + # The lower directories are linearized using the c3 algorithm. + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('network', []) + overlayctl.creater('nginx', ['network']) + overlayctl.creater('postgresql', ['archbase']) + overlayctl.creater('python', ['archbase']) + overlayctl.creater('flask', ['python']) + overlayctl.creater('sqlalchemy', ['postgresql', 'python']) + overlayctl.creater('graphql', ['sqlalchemy']) + overlayctl.creater('app', ['graphql', 'flask', 'nginx', ]) + self.check_mount_unit('app', [ + self.get_upper_dir('graphql'), + self.get_upper_dir('flask'), + self.get_upper_dir('nginx'), + self.get_upper_dir('sqlalchemy'), + self.get_upper_dir('network'), + self.get_upper_dir('postgresql'), + self.get_upper_dir('python'), + self.get_mount_dir('archbase'), + ]) + + +class TestDelete(BaseTest): + """Test for the `delete` command.""" + + def test_delete_a_layer(self): + # Deleting a layer cleans up the upper/work directories, the + # info file, the .automount and the .mount systemd. unit file. + # The systemd units are stopped and disabled. + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.creater('test', lowers=('lower',), start=True, permanent=True) + automountunit = self.get_unit_name('test') + '.automount' + mountunit = self.get_unit_name('test') + '.mount' + self.assert_overlay_dirs_exists('test') + self.systemctl.start.assert_called_once_with(automountunit) + self.systemctl.enable.assert_called_once_with(automountunit) + self.systemctl.status.return_value = {'Active': 'active'} + overlayctl.deleter('test') + self.assertFalse(self.get_upper_dir('test').exists()) + self.assertFalse(self.get_work_dir('test').exists()) + self.assertFalse(self.get_mount_dir('test').exists()) + self.assertFalse(self.get_unit_path('test', 'automount').exists()) + self.assertFalse(self.get_unit_path('test', 'automount').exists()) + self.systemctl.stop.assert_any_call(mountunit) + self.systemctl.stop.assert_any_call(automountunit) + self.systemctl.disable.assert_any_call(automountunit) + + def test_cannot_delete_unmanaged_layer(self): + # Unmanaged layer cannot be deleted. + archbase = self.get_mount_dir('archbase') + archbase.mkdir() + overlayctl.deleter('archbase') + self.assertTrue(archbase.exists()) + + def test_cannot_delete_lower_layer(self): + # Its not allowed to delete a layer having descendants managed layer. + self.systemctl.status.return_value = {'Active': 'inactive'} + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + overlayctl.creater('layer2', ['layer1']) + with self.assertRaises(overlayctl.DeletionError): + overlayctl.deleter('layer1') + self.assertTrue(self.get_unit_path('layer1', 'mount').exists()) + # But we can delete a layer on top of unmanaged layers. + overlayctl.deleter('layer2') + overlayctl.deleter('layer1') + + +class TestStart(BaseTest): + """Tests for the `start` command.""" + + def test_starting_a_layer(self): + # Starting a layer starts the automount unit. Mounting is + # handled by systemd on first access to the folder (a.k.a `machinectl start`). + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + self.systemctl.reset_mock() + overlayctl.starter('layer1') + self.assert_layer_started('layer1') + self.systemctl.enable.assert_not_called() + self.systemctl.stop.assert_not_called() + + def test_enabling_layer(self): + # The user can make change permanent after reboot using the + # --permanent option. + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + self.systemctl.reset_mock() + overlayctl.starter('layer1', permanent=True) + self.assert_layer_started('layer1') + self.assert_layer_enabled('layer1') + + def test_cannot_start_an_unmanaged_layer(self): + # Starting an unmanaged layer does nothing. + self.get_mount_dir('archbase').mkdir() + overlayctl.starter('archbase') + self.systemctl.start.assert_not_called() + self.systemctl.enable.assert_not_called() + + def test_cannot_start_a_lower_with_active_descendants(self): + # By default, a layer with active_descendants cannot be started + # because overlayFS may broke thing. + self.systemctl.status.side_effect = [{'Active': 'active'}, {'Active': 'inactive'}] + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + overlayctl.creater('layer2', ['layer1']) + self.systemctl.reset_mock() + overlayctl.starter('layer1') + self.systemctl.start.assert_not_called() + + def test_user_can_force_to_start_a_lower_with_active_descendants(self): + # A layer with active_descendants can be started with the + # --preserve_others option. This may be dangereous, but the + # user is the master. + self.systemctl.status.side_effect = [{'Active': 'inactive'}] + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + overlayctl.creater('layer2', ['layer1']) + self.systemctl.reset_mock() + overlayctl.starter('layer1', preserve_others=True) + self.assert_layer_started('layer1') + + def test_starting_stops_descendants_if_needed(self): + # Active descendant layers can be stopped automatically before + # starting the layer with the --stop-others options. + self.systemctl.status.side_effect = [ + {'Active': 'active'}, {'Active': 'active'}, + {'Active': 'inactive'}] + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + overlayctl.creater('layer2', ['layer1']) + overlayctl.creater('layer3', ['layer2']) + self.systemctl.reset_mock() + overlayctl.starter('layer1', stop_others=True) + self.assert_layer_stopped('layer2') + self.assert_layer_stopped('layer3') + self.assert_layer_started('layer1') + + +class TestStop(BaseTest): + """Tests for the `stop` command.""" + + def test_stop_a_layer(self): + # Simply use the `stop` command to unmount the overlayfs and + # disable the automount unit. + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + self.systemctl.reset_mock() + self.systemctl.status.side_effect = [{'Active': 'active'}] + overlayctl.stoper('layer1') + self.assert_layer_stopped('layer1') + self.systemctl.disable.assert_not_called() + + def test_stop_a_layer_permanently(self): + # The user can disable automounting on reboot with the --permanent option. + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer1', ['archbase']) + self.systemctl.reset_mock() + self.systemctl.status.side_effect = [{'Active': 'active'}] + overlayctl.stoper('layer1', permanent=True) + self.assert_layer_stopped('layer1') + self.assert_layer_disabled('layer1') + + +class TestEdit(BaseTest): + + def test_prepending_lowers(self): + # User can augment lowers with the --prepend option. + self.get_mount_dir('archbase').mkdir() + overlayctl.creater('layer0', ['archbase']) + overlayctl.creater('layer1', ['archbase']) + overlayctl.creater('layer2', ['archbase']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.editer('layer2', appended=['layer0', 'layer1']) + self.assert_daemon_reload_called() + self.check_mount_unit('layer2', [ + self.get_upper_dir('layer0'), + self.get_upper_dir('layer1'), + self.get_mount_dir('archbase'), + ]) + + def test_appending_lowers(self): + # User can augment the overlay bases with the --append option. + self.get_mount_dir('base1').mkdir() + self.get_mount_dir('base2').mkdir() + overlayctl.creater('layer', []) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.editer('layer', appended=['base1', 'base2']) + self.assert_daemon_reload_called() + self.check_mount_unit('layer', [ + self.get_mount_dir('base1'), + self.get_mount_dir('base2'), + ]) + + def test_removing_lowers(self): + # User can reduce the lower layers with the --remove option. + self.get_mount_dir('base').mkdir() + overlayctl.creater('autologin', []) + overlayctl.creater('network', []) + overlayctl.creater('layer', ['network', 'autologin', 'base']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.editer('layer', removed=['autologin', 'network']) + self.assert_daemon_reload_called() + self.check_mount_unit('layer', [self.get_mount_dir('base')]) + + def test_moving_a_lower(self): + # User can use both use --remove and --append/--prepend. + self.get_mount_dir('base1').mkdir() + self.get_mount_dir('base2').mkdir() + overlayctl.creater('layer', ['base1', 'base2']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.editer('layer', removed=['base2'], prepended=['base2']) + self.assert_daemon_reload_called() + self.check_mount_unit('layer', [ + self.get_mount_dir('base2'), + self.get_mount_dir('base1'), + ]) + + def test_with_editor(self): + # The user's editor is started with no options. + # In the editor, user can move, add and remove lower freely. + self.get_mount_dir('base1').mkdir() + self.get_mount_dir('base2').mkdir() + overlayctl.creater('lower1', []) + overlayctl.creater('lower2', []) + overlayctl.creater('lower3', []) + overlayctl.creater('layer', ['lower2', 'lower1', 'base1', 'base2']) + self.systemctl.reset_mock() + + def check_and_edit_file(filename): + filepath = Path(filename) + lowers = [line for line in filepath.read_text().splitlines() + if not line.startswith('#')] + self.assertSequenceEqual(lowers, ['lower2', 'lower1', 'base1', 'base2']) + filepath.write_text('\n'.join(['lower1', 'lower2', 'lower3', 'base1'])) + + self.systemctl.status.return_value = {'Active': 'inactive'} + with mock.patch('overlayctl._get_editor_on_file', new=check_and_edit_file): + overlayctl.editer('layer') + self.check_mount_unit('layer', [ + self.get_upper_dir('lower1'), + self.get_upper_dir('lower2'), + self.get_upper_dir('lower3'), + self.get_mount_dir('base1'), + ]) + + +class TestMove(BaseTest): + """Tests for the `move` command.""" + + def test_moving_a_terminal_layer(self): + # Simple case when the user moves a layer without descendants. + # So, we have nothing to propagate. + self.get_mount_dir('base').mkdir() + overlayctl.creater('lower', ['base']) + overlayctl.creater('layer', ['lower']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.deplacer('layer', 'newlayer') + self.assert_layer_not_exist('layer') + self.assert_layer_exists('newlayer', [ + self.get_upper_dir('lower'), + self.get_mount_dir('base'), + ]) + self.assert_daemon_reload_called() + + def test_descendant_of_moved_are_updated(self): + # When moving a layer, its descendants must be updated accordingly. + self.get_mount_dir('base').mkdir() + overlayctl.creater('layer1', ['base']) + overlayctl.creater('layer2', ['layer1']) + overlayctl.creater('layer3', ['layer2']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'inactive'} + overlayctl.deplacer('layer1', 'renamed') + self.assert_layer_not_exist('layer1') + self.assert_layer_exists('renamed', [self.get_mount_dir('base')]) + self.assert_layer_exists('layer2', [ + self.get_upper_dir('renamed'), + self.get_mount_dir('base'), + ]) + self.assert_layer_exists('layer3', [ + self.get_upper_dir('layer2'), + self.get_upper_dir('renamed'), + self.get_mount_dir('base'), + ]) + self.assert_daemon_reload_called() + + def test_moved_layer_is_restarted(self): + # Moving an activated terminal layer is ok, It's still + # activated at the end. + self.get_mount_dir('base').mkdir() + overlayctl.creater('layer1', ['base']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'active'} + overlayctl.deplacer('layer1', 'renamed') + self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.mount') + self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.automount') + self.systemctl.start.assert_any_call(self.get_unit_name('renamed') + '.automount') + + def test_cannot_rename_if_new_name_exists(self): + # Do nothing if the new name is already known. + self.get_mount_dir('lower1').mkdir() + overlayctl.creater('lower2', []) + self.systemctl.reset_mock() + overlayctl.deplacer('lower2', 'lower1') + self.assertTrue(self.get_unit_path('lower2', 'mount').exists()) + self.assertTrue(self.get_mount_dir('lower2').exists()) + self.assertTrue(self.get_mount_dir('lower1').exists()) + + def test_cannot_rename_if_descandant_is_activated(self): + # OverlayFS does not like when lower directories moves, so, + # user cannot move a layer having an active descandant. + self.get_mount_dir('base').mkdir() + overlayctl.creater('lower', ['base']) + overlayctl.creater('layer', ['lower']) + self.systemctl.reset_mock() + self.systemctl.status.return_value = {'Active': 'active'} + overlayctl.deplacer('lower', 'newlower') + + def assert_layer_not_exist(self, name): + self.assertFalse(self.get_unit_path(name, 'mount').exists()) + self.assertFalse(self.get_unit_path(name, 'automount').exists()) + self.assertFalse(self.get_mount_dir(name).exists()) + self.assertFalse(self.get_upper_dir(name).exists()) + self.assertFalse(self.get_work_dir(name).exists()) + + def assert_layer_exists(self, name, lowers): + self.assertTrue(self.get_mount_dir(name).exists()) + self.assertTrue(self.get_upper_dir(name).exists()) + self.assertTrue(self.get_work_dir(name).exists()) + self.assert_automount_unit_equal(name, { + 'Automount': {'Where': str(self.get_mount_dir(name))}, + 'Install': {'WantedBy': 'local-fs.target'} + }) + self.check_mount_unit(name, lowers)