@@ 17,13 17,13 @@ we stack the upper dir of lowered overla
import json
import logging
-import operator
import os
import re
import shutil
import sys
from collections import OrderedDict, defaultdict
-from functools import reduce
+from contextlib import contextmanager
+from functools import lru_cache
from itertools import chain
from pathlib import Path
from subprocess import PIPE, Popen
@@ 52,25 52,6 @@ INFOSDIR = OVERLAYDIR / 'info'
UNITSDIR = Path(os.environ.get('UNITSDIR', Path('/') / 'etc' / 'systemd' / 'system'))
-MOUNTTMPL = '''
-[Unit]
-Description = {name} Container overlay
-ConditionPathExists = {mountdir}
-[Mount]
-What = overlay
-Where = {mountdir}
-Type = overlay
-Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir}
-'''
-
-AUTOMOUNTTMPL = '''
-[Automount]
-Where={mountdir}
-[Install]
-WantedBy=local-fs.target
-'''
-
-
class OverlayctlError(SystemExit):
"""Raised on exception."""
@@ 85,222 66,452 @@ class DeletionError(OSError):
"""raised when an overlay can not be deleted."""
-def _name2mountdir(name):
- """return the mount point path for the given overlay name.
+class BaseLayer:
+ def __init__(self, name):
+ self.name = name
- If the name contains a path separator just return it, otherwise
- return a folder named name as a subfolder of MOUNTDIR.
- """
- if os.path.sep in name:
- return Path(name)
- return MOUNTDIR / name
-
+ def __eq__(self, layer):
+ return isinstance(layer, BaseLayer) and self.mountdir == layer.mountdir
-def _name2unit(name, _cache={}):
- """return systemd Unit file name for the given overlay name"""
- if name in _cache:
- return _cache[name]
- mountdir = _name2mountdir(name)
- _cache[name] = str(sh.systemd_escape(mountdir, path=True)).strip()
- return _cache[name]
-
+ def __hash__(self):
+ return hash(str(self.mountdir))
-def _name2upperdir(name):
- """return upperdir path for the given overlay name"""
- return UPPERSDIR / _name2unit(name)
-
-
-def _name2workdir(name):
- """return workdir path for the given overlay name"""
- return WORKSDIR / _name2unit(name)
-
-
-def _name2lowerdir(name):
- """return path for the given overlay name to be used as a lowerdir
- for another overlays.
+ @property
+ def mountdir(self):
+ """return the mount point path for the given overlay name.
- - If name is not a managed overlay, return the corresponding mount point path.
- - If name is a managed overlay, return the corresponding name to get around overlayfs
- limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
- """
- upper = _name2upperdir(name)
- if upper.exists():
- return upper
- return _name2mountdir(name)
+ If the name contains a path separator just return it, otherwise
+ return a folder named name as a subfolder of MOUNTDIR.
+ """
+ if os.path.sep in self.name:
+ return Path(self.name)
+ return MOUNTDIR / self.name
-
-def _name2infopath(name):
- """return the metadata file path for the given overlay name"""
- return INFOSDIR / _name2unit(name)
-
+ def __str__(self):
+ return self.name
-def _compute_info(name, lowers):
- """return a dictionary containing metadata for the given overlay name
- stacking the given lower directories.
- Existing overlay metadata are ignored.
- """
- info = {}
- info['name'] = name
- info['lowers'] = lowers
- info['unitname'] = _name2unit(name)
- info['mountdir'] = _name2mountdir(name)
- info['lowerdir'] = [
- _name2lowerdir(lower) for lower in _get_linearized_lowerdirs(name, lowers)
- ]
- info['upperdir'] = _name2upperdir(name)
- info['workdir'] = _name2workdir(name)
- info['mountpath'] = (UNITSDIR / info['unitname']).with_suffix('.mount')
- info['automountpath'] = (UNITSDIR / info['unitname']).with_suffix('.automount')
- return info
+ @property
+ def lowerdir(self):
+ raise NotImplementedError
+
+ @property
+ def lowers(self):
+ """Lower layers."""
+ raise NotImplementedError
+
+ def is_managed(self):
+ raise NotImplementedError
-def _read_info(name, missing_ok=False):
- """return saved metadata of an existing overlay name.
+class Layer(BaseLayer):
+ """An overlay layer."""
+
+ def __init__(self, name):
+ super(Layer, self).__init__(name)
+ self._lowers = None
+ #XXX cache properties
+
+ @property
+ def lowers(self):
+ """Lower layers."""
+ if self._lowers is None:
+ self.load()
+ return self._lowers
+
+ @lowers.setter
+ def lowers(self, lowers):
+ # XXX detect cyclic dependencies.
+ self._lowers = lowers
+
+ @property
+ def upperdir(self):
+ """return upperdir path for the given overlay name"""
+ return UPPERSDIR / self.unitname
+
+ @property
+ def workdir(self):
+ """return workdir path for the given overlay name"""
+ return WORKSDIR / self.unitname
+
+ @property
+ def lowerdir(self):
+ """return path for the given overlay name to be used as a lowerdir
+ for another overlays.
- If missing_ok is True, brand new overlay metadata is computed with
- no lowers.
+ - If name is not a managed overlay, return the corresponding mount point path.
+ - If name is a managed overlay, return the corresponding name to get around overlayfs
+ limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
+ """
+ if self.upperdir.exists():
+ return self.upperdir
+ return self.mountdir
+
+ @property
+ def unitname(self):
+ """return systemd Unit file name for the given overlay name"""
+ return systemd_escape(self.mountdir)
+
+ @property
+ def infopath(self):
+ """return the metadata file path for the given overlay name"""
+ return INFOSDIR / self.unitname
+
+ @property
+ def automountunit(self):
+ """Systeme automount unit."""
+ return AutomountUnit(self.unitname)
+
+ @property
+ def mountunit(self):
+ """Systeme mount unit."""
+ return MountUnit(self.unitname)
+
+ def __repr__(self):
+ return f'<Layer name="{self.name}" unitname="{self.unitname}">'
+
+ def __str__(self):
+ return self.name
+
+ def is_managed(self):
+ return self.infopath.exists()
+
+ def get_ascendants(self):
+ return _get_linearized_lowers(self, self.lowers)
+
+ def load(self, missing_ok=True):
+ """return saved metadata of an existing overlay name.
+
+ If missing_ok is True, brand new overlay metadata is computed with
+ no lowers.
- If a systemd Unit file is found, it is parsed to retrieve real metadata.
- """
- info = _compute_info(name, [])
- infopath = _name2infopath(name)
- if infopath.exists():
- with infopath.open(encoding='utf-8') as fobj:
- info = json.load(fobj)
- assert name == info['name']
- info['mountpath'] = Path(info['mountpath'])
- info['automountpath'] = Path(info['automountpath'])
- elif not missing_ok:
- raise OSError('%s is not a managed overlay: %s not found' % (name, infopath))
- if info['mountpath'].exists():
- logger.debug('Metadata found for %s', name)
- content = info['mountpath'].read_text(encoding='utf-8')
- info['upperdir'] = Path(
- re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\')
+ If a systemd Unit file is found, it is parsed to retrieve real metadata.
+ """
+ infopath = self.infopath
+ if infopath.exists():
+ with infopath.open(encoding='utf-8') as fobj:
+ info = json.load(fobj)
+ assert info['name'] == self.name
+ if self._lowers is None:
+ self._lowers = [build_layer(lower) for lower in info['lowers']]
+ else:
+ self._lowers = []
+ self.check_consistency()
+
+ def check_consistency(self):
+ """Check that info and units are consistents."""
+ if self.mountunit.path.exists():
+ logger.debug('Metadata found for %s', self.name)
+ content = self.mountunit.path.read_text(encoding='utf-8')
+ assert self.upperdir == Path(
+ re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\')
+ )
+ lowerdirs = re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':')
+ assert [lower.lowerdir for lower in self.get_ascendants()] \
+ == [Path(p.replace('\\\\', '\\')) for p in lowerdirs if p]
+ assert self.workdir == Path(
+ re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\')
+ )
+ assert self.mountdir == Path(
+ re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip()
+ )
+ else:
+ logger.debug("No metadata found for %s", self.name)
+
+ def dump(self):
+ """Write the given overlay metadata to disk."""
+ data = {
+ 'name': self.name,
+ 'lowers': [lower.name for lower in self.lowers]
+ }
+ self.infopath.write_text(
+ json.dumps(data, indent=2), encoding='utf-8'
+ )
+ logger.debug('%s updated', self.infopath)
+ data['lowerdir'] = ':'.join(
+ str(lower.lowerdir).replace('\\', '\\\\')
+ for lower in self.get_ascendants()
)
- info['lowerdir'] = [
- Path(p.replace('\\\\', '\\'))
- for p in re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':')
- ]
- info['workdir'] = Path(
- re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\')
- )
- info['mountdir'] = Path(
- re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip()
- )
- else:
- logger.debug("No metadata found for %s", name)
- return info
+ data['upperdir'] = str(self.upperdir).replace('\\', '\\\\')
+ data['workdir'] = str(self.workdir).replace('\\', '\\\\')
+ data['mountdir'] = str(self.mountdir).replace('\\', '\\\\')
+ self.upperdir.mkdir(parents=True, exist_ok=True)
+ self.workdir.mkdir(parents=True, exist_ok=True)
+ self.mountdir.mkdir(parents=True, exist_ok=True)
+ self.mountunit.dump(**data)
+ self.automountunit.dump(**data)
+
+ def start(self, permanent=False):
+ """Start the automount systemd unit.
+
+ Enable the automount unit if permanent is truthy.
+ """
+ self.automountunit.start()
+ if permanent:
+ self.automountunit.enable()
+
+ def stop(self, permanent=False):
+ """Start the automount and mount systemd units.
+
+ Disable the automount unit if permanent is truthy.
+ """
+ self.automountunit.stop()
+ self.mountunit.stop()
+ if permanent:
+ self.automountunit.disable()
+
+ @contextmanager
+ def temporary_stop(self):
+ """Temporary ensure that the systemd units are stopped.
+
+ Restart the automount unit if it was preciously started.
+ """
+ is_automonted = self.automountunit.is_active()
+ is_mounted = self.mountunit.is_active()
+ self.automountunit.stop()
+ self.mountunit.stop()
+ yield
+ if is_automonted:
+ self.automountunit.start()
+ if is_mounted:
+ self.mountunit.start()
+
+ def delete(self):
+ """Delete the overlay.
+
+ Stop, disable and delete the systemd units.
+ Delete the overlay info file and upper/work directories.
+ """
+ self.stop(permanent=True)
+ self.automountunit.unlink()
+ self.mountunit.unlink()
+ systemctl.daemon_reload()
+ shutil.rmtree(bytes(self.mountdir), ignore_errors=True)
+ shutil.rmtree(bytes(self.workdir), ignore_errors=True)
+ shutil.rmtree(bytes(self.upperdir), ignore_errors=True)
+ self.infopath.unlink()
-def _write_info(info):
- """Write the given overlay metadata to disk."""
- data = {
- 'name': info['name'],
- 'lowers': info['lowers'],
- 'unitname': info['unitname'],
- 'mountpath': str(info['mountpath']),
- 'automountpath': str(info['automountpath']),
- }
- _name2infopath(info['name']).write_text(
- json.dumps(data, indent=2), encoding='utf-8'
- )
- logger.debug('%s updated', _name2infopath(info['name']))
- data = info.copy()
- data['lowerdir'] = ':'.join(
- str(path).replace('\\', '\\\\') for path in data['lowerdir']
- )
- data['upperdir'] = str(data['upperdir']).replace('\\', '\\\\')
- data['workdir'] = str(data['workdir']).replace('\\', '\\\\')
- data['mountpath'] = str(info['mountpath']).replace('\\', '\\\\')
- info['mountpath'].write_text(MOUNTTMPL.format(**data), encoding='utf-8')
- logger.debug('%s updated', info['mountpath'])
- info['automountpath'].write_text(AUTOMOUNTTMPL.format(**info), encoding='utf-8')
- logger.debug('%s updated', info['automountpath'])
+class UnmanagedLayer(BaseLayer):
+
+ @property
+ def lowers(self):
+ """Lower layers."""
+ return ()
+
+ @property
+ def lowerdir(self):
+ """return path for the given overlay name to be used as a lowerdir
+ for another overlays.
+
+ - If name is not a managed overlay, return the corresponding mount point path.
+ - If name is a managed overlay, return the corresponding name to get around overlayfs
+ limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
+ """
+ return self.mountdir
+
+ def __repr__(self):
+ return f'<UnmanagedLayer name="{self.name}" mountdir="{self.mountdir}">'
+
+ def is_managed(self):
+ return False
+
+
+class GenericUnit:
+ """Generic systemd Unit interface."""
+
+ @property
+ def unittype(self):
+ """Systemd unit type (a.k.a "automount" or "mount")."""
+ raise NotImplementedError
+
+ @property
+ def template(self):
+ """Template of the systemd unit content."""
+ raise NotImplementedError
+
+ def __init__(self, unitname):
+ self.unitname = unitname
+
+ @property
+ def path(self):
+ """File system path of the systemd unit file."""
+ return (UNITSDIR / self.unitname).with_suffix('.' + self.unittype)
+
+ def dump(self, **data):
+ """Write the definition of the systemd unit into the disk."""
+ self.path.write_text(self.template.format(**data), encoding='utf-8')
+ logger.debug('Systemd unit %s updated', self.path.name)
+
+ def start(self):
+ """Start the systemd unit."""
+ systemctl.start(self.path.name)
+ logger.info('Systemd unit %s started', self.path.name)
+
+ def stop(self):
+ """Stop the systemd unit."""
+ systemctl.stop(self.path.name)
+ logger.info('Systemd unit %s stopped', self.path.name)
+
+ def enable(self):
+ """Enable the systemd unit."""
+ systemctl.enable(self.path.name)
+ logger.info('Systemd unit %s enabled', self.path.name)
+
+ def disable(self):
+ """Disable the systemd unit."""
+ systemctl.disable(self.path.name)
+ logger.info('Systemd unit %s disabled', self.path.name)
+
+ def unlink(self):
+ """Remove the systemd unit file."""
+ self.path.unlink()
+
+ def is_active(self):
+ """Return True if the unit is active."""
+ return 'active' in systemctl.status(self.path.name)['Active'].split()
+
-def _iter_overlays_info_simple():
- for path in INFOSDIR.glob('*'):
- info = json.loads(path.read_text(encoding='utf-8'))
- yield _read_info(info['name'])
+class MountUnit(GenericUnit):
+ """Systemd mount unit."""
+
+ unittype = 'mount'
+ template = '''
+[Unit]
+Description = {name} Container overlay
+ConditionPathExists = {mountdir}
+[Mount]
+What = overlay
+Where = {mountdir}
+Type = overlay
+Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir}
+'''
+
+
+class AutomountUnit(GenericUnit):
+ """Systemd automount unit."""
+
+ unittype = 'automount'
+ template = '''
+[Automount]
+Where={mountdir}
+[Install]
+WantedBy=local-fs.target
+'''
+
+
+class systemctl:
-def _iter_overlays_info(ordered=None):
- """Yield metadata for each managed overlay"""
- infos = _iter_overlays_info_simple()
+ @classmethod
+ def start(cls, unitname):
+ cls._execute('start', unitname)
+
+ @classmethod
+ def stop(cls, unitname):
+ cls._execute('stop', unitname)
+
+ @classmethod
+ def enable(cls, unitname):
+ cls._execute('enable', unitname)
+
+ @classmethod
+ def disable(cls, unitname):
+ cls._execute('enable', unitname)
+
+ @classmethod
+ def daemon_reload(cls):
+ cls._execute('daemon-reload')
+
+ @classmethod
+ def status(cls, unit):
+ """Return a disctionary containing data from `systemctl status {unit}`."""
+ stdout, dummy_stderr = Popen(
+ ['systemctl', 'status', unit], stdout=PIPE
+ ).communicate()
+ return dict(
+ map(str.strip, line.split(':', 1))
+ for line in stdout.decode('utf-8').splitlines()
+ if ':' in line
+ )
+
+ @staticmethod
+ def _execute(command, *args):
+ process = Popen(['systemctl', command] + list(args), stdout=PIPE)
+ stdout, dummy_stderr = process.communicate()
+ return stdout
+
+
+def systemd_escape(path):
+ process = Popen(['systemd-escape', '--path', str(path)], stdout=PIPE)
+ stdout, dummy_stderr = process.communicate()
+ return stdout.strip().decode(sys.getdefaultencoding())
+
+
+@lru_cache
+def build_layer(name):
+ """A registry to instanciate Layer object.
+
+ The main idea is to ensure that only one object is instanciated
+ per layer. In fact each layer may have lower layers that can be
+ instanciated at any time.
+ """
+ layer = Layer(name)
+ if layer.is_managed():
+ return layer
+ return UnmanagedLayer(name)
+
+
+def _iter_layers_simple():
+ """Yield all known layers."""
+ for path in INFOSDIR.glob('*'):
+ info = json.loads(path.read_text(encoding='utf-8'))
+ layer = build_layer(info['name'])
+ layer.lowers = [build_layer(lowername) for lowername in info['lowers']]
+ yield layer
+
+
+def _iter_layers(ordered=None):
+ """Yield all known layers.
+
+ :ordered: if truthy, yield layers by order.
+ """
+ layers = list(_iter_layers_simple())
if not ordered:
- return infos
- infos = {info['name']: info for info in infos}
- lowers = set(chain(*[info['lowers'] for info in infos.values()]))
- heads = list(set(infos) - lowers)
+ return layers
+ lowers = set(chain(*(layer.lowers for layer in layers)))
+ heads = list(set(layers) - lowers)
return (
- infos[name] for name in _get_linearized_lowerdirs(None, heads) if name in infos
+ layer for layer in _get_linearized_lowers(None, heads) if layer in layers
)
-def _iter_tree(info, tree):
- infos = [info]
- seens = {info['name']}
- while infos:
- info = infos.pop()
- for _info in tree[info['name']]:
- if _info['name'] in seens:
- continue
- infos.append(_info)
- seens.add(_info['name'])
- yield _info
-
-
-def _iter_descendants(info):
- tree = defaultdict(list)
- for _info in _iter_overlays_info_simple():
- for lower in _info['lowers']:
- tree[lower].append(_info)
- return _iter_tree(info, tree)
-
-
-def _iter_ascendants(info):
- tree = defaultdict(list)
- infos = {i['name']: i for i in _iter_overlays_info_simple()}
- tree = {n: [infos[x] for x in i['lowers'] if x in infos]
- for n, i in infos.items()}
- return _iter_tree(info, tree)
+def _iter_tree(current, next_layers_getter):
+ """Utility to walk throught the layers tree."""
+ layers = next_layers_getter(current)[:]
+ seens = {current}
+ while layers:
+ layer = layers.pop(0)
+ if layer in seens:
+ continue
+ layers += next_layers_getter(layer) or []
+ seens.add(layer)
+ yield layer
-def _systemctl_status(unit):
- """REturn a disctionary containing data from `systemctl status {unit}`."""
- stdout, dummy_stderr = Popen(
- ['systemctl', 'status', unit], stdout=PIPE
- ).communicate()
- return dict(
- map(str.strip, line.split(':', 1))
- for line in stdout.decode('utf-8').splitlines()
- if ':' in line
- )
-
-
-def _is_mount_active(info):
- return 'active' in _systemctl_status(info['mountpath'].name)['Active'].split()
+def _iter_ascendants(current):
+ """Yield layers that depend on the given `layer`."""
+ return _iter_tree(current, lambda layer: layer.lowers)
-def _is_automount_active(info):
- return 'active' in _systemctl_status(info['automountpath'].name)['Active'].split()
-
-
-def _disable_units(info):
- if _is_automount_active(info):
- sh.systemctl.stop(info['automountpath'].name)
- if _is_mount_active(info):
- sh.systemctl.stop(info['mountpath'].name)
- sh.systemctl.disable(info['automountpath'].name).wait()
- sh.systemctl.disable(info['mountpath'].name).wait()
- logger.debug('Systemd %s disabled', info['automountpath'].name)
-
-
-def _remove_units(info):
- _disable_units(info)
- info['automountpath'].unlink()
- info['mountpath'].unlink()
+def _iter_descendants(current):
+ """Yield layers the `layer` depends on."""
+ tree = defaultdict(list)
+ for layer in _iter_layers_simple():
+ tree[layer]
+ for parent in layer.lowers:
+ tree[parent].append(layer)
+ return _iter_tree(current, tree.get)
def c3_merge(graph):
@@ 334,23 545,23 @@ def c3_linearize(head, graph, results):
return res
-def _get_linearized_lowerdirs(name, lowers):
- """Get lower dirs of the given overlay name stacking lowers. It
+def _get_linearized_lowers(layer, lowers=None):
+ """Get lowers of the given overlay name stacking lowers. It
resolve every {sub}lowers for which overlay information can be retrieved.
"""
- lowers = lowers[:]
- graph = defaultdict(list, {name: lowers[:]})
+ lowers = lowers[:] if lowers else layer.lowers
+ graph = defaultdict(list, {layer: lowers[:]})
while lowers:
lower = lowers.pop(0) # consume
- sublowers = _read_info(lower, missing_ok=True)['lowers']
- graph[lower] = sublowers
- lowers += sublowers # also add sublowers to resolver
+ if lower.lowers:
+ graph[lower] = lower.lowers
+ lowers += lower.lowers # also add sublowers to resolver
result = {}
- c3_linearize(name, graph, result)
- return result[name][1:]
+ c3_linearize(layer, graph, result)
+ return result[layer][1:]
-def creater(name, lowers, start, permanent):
+def creater(name, lowers, start=None, permanent=None):
"""create an overlay at `name` that inherites from `lowerdir`.
:name: the overlay path.
@@ 361,57 572,55 @@ def creater(name, lowers, start, permane
If a lower path doesn't contains a path separator (a.k.a. '/'),
/var/lib/machines/{lower} is used
"""
- info = _compute_info(name, lowers)
- info['upperdir'].mkdir(parents=True, exist_ok=True)
- info['workdir'].mkdir(parents=True, exist_ok=True)
- _write_info(info)
- sh.systemctl('daemon-reload').wait()
- logger.info("%s created as %s", info['name'], info['unitname'])
+ layer = Layer(name)
+ layer.lowers = [build_layer(lowername) for lowername in lowers]
+ layer.dump()
+ systemctl.daemon_reload()
if start:
- sh.systemctl.start(info['automountpath'].name).wait()
- logger.debug("Systemd %s.automount unit started", info['unitname'])
+ layer.start(permanent)
else:
- logger.warning("Use `overlayctl start %s` to start the overlay.", info['name'])
- if permanent:
- sh.systemctl.enable(info['automountpath'].name).wait()
- logger.debug("Systemd %s.automount unit enabled", info['unitname'])
+ logger.warning("Use `overlayctl start %s` to start the overlay.", layer.name)
def deleter(name):
"""Delete a managed overlay and related folder/files"""
- lowers = reduce(
- operator.or_, (set(info['lowers']) for info in _iter_overlays_info()), set()
- )
- lowers = set(_name2unit(lower) for lower in lowers)
- if _name2unit(name) in lowers:
- raise DeletionError("\"%s\" is a lower directory of another overlay" % name)
- info = _read_info(name)
- _remove_units(info)
- sh.systemctl('daemon-reload').wait()
- shutil.rmtree(bytes(info['mountdir']), ignore_errors=True)
- shutil.rmtree(bytes(info['workdir']))
- shutil.rmtree(bytes(info['upperdir']))
- _name2infopath(name).unlink()
- logger.debug("Related file and folder deleted")
- logger.info("\"%s\" deleted", info['name'])
+ lowers = set(chain(*(x.lowers for x in _iter_layers_simple())))
+ layer = build_layer(name)
+ if not layer.is_managed():
+ logger.info("\"%s\" not managed. Nothing to do.", layer.name)
+ return
+ descendants = list(_iter_descendants(layer))
+ if descendants:
+ # XXX allow this with automatic updates.
+ raise DeletionError("\"%s\" is a lower directory of another overlay: %s"
+ % (name, ', '.join(x.name for x in descendants)))
+ layer.delete()
+ systemctl.daemon_reload()
+ logger.info("\"%s\" deleted", layer.name)
-def editer(name, appended=None, prepanded=None, removed=None):
+def _get_editor_on_file(filename):
+ cmd = '%s "%s"' % (os.environ.get('EDITOR', 'vim'), filename)
+ Popen(cmd, shell=True).wait()
+
+
+def editer(name, appended=None, prepended=None, removed=None):
"""Ask user to edit lowers of the given overlay name, rewrite
the metadata and unit files accordingly, finaly retart systemd units
of the edited overlay.
"""
- info = _read_info(name)
- if not appended and not prepanded and not removed:
+ layer = build_layer(name)
+ # XXX check descendants not active
+ if not appended and not prepended and not removed:
with NamedTemporaryFile() as fobj:
fobj.write(b'# -*- encoding: utf-8 -*-\n')
fobj.write(b'# Note: Leave unchanged or fully empty to ignore changes\n')
- fobj.write(b'\n'.join(lower.encode('utf-8') for lower in info['lowers']))
+ fobj.write(b'\n'.join(lower.name.encode('utf-8') for lower in layer.lowers))
fobj.flush()
- Popen(os.environ.get('EDITOR', 'vim') + ' ' + fobj.name, shell=True).wait()
+ _get_editor_on_file(fobj.name)
fobj.seek(0)
lowers = [
- line.strip()
+ build_layer(line.strip())
for line in fobj.read().decode('utf-8').splitlines()
if not line.strip().startswith('#') and line.strip()
]
@@ 419,110 628,102 @@ def editer(name, appended=None, prepande
if not fobj.read(1):
return # Ignore empty file for convenience
else:
- prepanded = prepanded or []
- added = appended or []
- removed = removed or ()
- lowers = prepanded + [
- lower for lower in info['lowers'] if lower not in removed
- ] + added
- if lowers == info['lowers']:
+ prepended = map(build_layer, prepended or ())
+ appended = map(build_layer, appended or ())
+ removed = list(map(build_layer, removed or ()))
+ lowers = chain(
+ prepended,
+ (lower for lower in layer.lowers if lower not in removed),
+ appended)
+ if lowers == layer.lowers:
return
- automounted = _is_automount_active(info)
- mounted = _is_mount_active(info)
- if automounted:
- sh.systemctl.stop(info['automountpath'].name)
- if mounted:
- sh.systemctl.stop(info['mountpath'].name)
- info = _compute_info(name, lowers)
- _write_info(info)
- sh.systemctl('daemon-reload').wait()
- if automounted:
- sh.systemctl.start(info['automountpath'].name)
- if mounted:
- sh.systemctl.start(info['automount'].name)
- logger.debug("Systemd \"%s\" restarted", info['automountpath'])
- logger.info("\"%s\" updated", info['name'])
+ with layer.temporary_stop():
+ layer.lowers[:] = lowers
+ layer.dump()
+ systemctl.daemon_reload()
+ # XXX propagate to descendants
+ logger.debug("Systemd \"%s\" restarted", layer.unitname)
+ logger.info("\"%s\" updated", layer.name)
def starter(name, preserve_others=False, stop_others=False, permanent=False):
- info = _read_info(name)
- if _is_automount_active(info):
- logger.info('%s is already started.', info['name'])
+ layer = build_layer(name)
+ if not layer.is_managed():
+ logger.info("\"%s\" not managed. Nothing to do.", layer.name)
return
if not preserve_others:
to_stop = []
- for _info in chain(_iter_descendants(info), _iter_ascendants(info)):
- if _is_automount_active(_info):
- to_stop.append(_info)
+ for _layer in chain(_iter_descendants(layer), _iter_ascendants(layer)):
+ if not _layer.is_managed():
+ continue
+ if _layer.automountunit.is_active():
+ to_stop.append(_layer)
if to_stop:
if not stop_others:
logger.error(
'You should prefere to stop the following overlays: \n\n%s\n',
- ', '.join(i['name'] for i in to_stop))
+ ', '.join(x.name for x in to_stop))
logger.info(
'You may want to use `--stop-others`. '
'Or see --help for more options.')
return
- for _info in to_stop:
- _stopper(_info, permanent)
+ for _layer in to_stop:
+ _layer.disable() if permanent else _layer.stop()
logger.info(
'The following overlays were stopped: \n\n'
- + ', '.join(i['name'] for i in to_stop))
- sh.systemctl.start(info['unitname'] + '.automount').wait()
- if permanent:
- sh.systemctl.enable(info['unitname'] + '.automount').wait()
- logger.info("Systemd %s.automount unit enabled", info['unitname'])
+ + ', '.join(x.name for x in to_stop))
+ layer.start(permanent)
-def stoper(name, permanent):
- _stopper(_read_info(name), permanent)
-
-
-def _stopper(info, permanent):
- sh.systemctl.stop(info['unitname'] + '.mount').wait()
- sh.systemctl.stop(info['unitname'] + '.automount').wait()
- if permanent:
- sh.systemctl.disable(info['unitname'] + '.automount').wait()
+def stoper(name, permanent=False):
+ layer = build_layer(name)
+ if not layer.is_managed():
+ logger.info("\"%s\" not managed. Nothing to do.", layer.name)
+ return
+ layer.stop(permanent)
def lister(
unit_name, ordered_deps, reversed_deps, no_info, regexp, depends_on, terminal
):
"""List managed overlays and display overlay inheritance."""
- infos = _iter_overlays_info(ordered_deps or reversed_deps)
- infos = reversed(list(infos)) if reversed_deps else infos
+ layers = _iter_layers(ordered_deps or reversed_deps)
+ layers = reversed(list(layers)) if reversed_deps else layers
if terminal:
- infos = list(infos)
- lowers = set(chain(*(info['lowers'] for info in infos)))
- infos = (info for info in infos if info['name'] not in lowers)
+ layers = list(layers)
+ lowers = set(chain(*(layer.lowers for layer in layers)))
+ layers = (layer for layer in layers if layer not in lowers)
+ if regexp:
+ match = re.compile(regexp).match
+ layers = (layer for layer in layers if match(layer.name))
found = False
- for info in infos:
- if regexp and not re.match(regexp, info['name']):
- continue
- alllowers = _get_linearized_lowerdirs(info['name'], info['lowers'])
- otherlowers = tuple(lower for lower in alllowers if lower not in info['lowers'])
- depslowers = set(alllowers) | set(otherlowers)
+ if depends_on:
+ depends_on = [{build_layer(layername) for layername in group} for group in depends_on]
+ for layer in layers:
+ descendants = layer.get_ascendants()
+ grandchildren = tuple(lower for lower in descendants if lower not in layer.lowers)
if depends_on:
- if all((depslowers & deps) != deps for deps in depends_on):
+ descendants = set(descendants)
+ if not any(descendants.issuperset(deps) for deps in depends_on):
continue
found = True
- info['lowers'] = ', '.join(info['lowers'])
- info['others'] = ', '.join(otherlowers) if otherlowers else ''
+ data = {
+ 'lowers': ', '.join(map(str, layer.lowers)) if layer.lowers else '',
+ 'others': ', '.join(map(str, grandchildren)) if grandchildren else '',
+ 'name': layer.name,
+ }
fmt = (
Fore.GREEN
+ Style.BRIGHT
- + ('{unitname}' if unit_name else '{name}')
+ + '{name}'
+ Style.RESET_ALL
)
- try:
- sh.systemctl.status(info['unitname'] + '.mount')
+ if layer.mountunit.is_active():
icon = Fore.GREEN + ' 🖴 ' + Style.RESET_ALL
- except sh.ErrorReturnCode_3:
- try:
- sh.systemctl.status(info['unitname'] + '.automount')
- icon = ' 🖴 '
- except sh.ErrorReturnCode_3:
- icon = ''
+ elif layer.automountunit.is_active():
+ icon = ' 🖴 '
+ else:
+ icon = ''
if not no_info:
fmt += (
icon
@@ 533,45 734,57 @@ def lister(
+ '\N{Rightwards Dashed Arrow} {others}'
+ Style.RESET_ALL
)
- print(fmt.format(**info))
+ print(fmt.format(**data))
if not found:
raise NoResult("No overlay matches.")
-def deplacer(old, new):
+def deplacer(oldname, newname):
# Checks
- if old == new:
- logger.info('Not a new name, nothing to do.')
+ _new = Layer(newname)
+
+ if _new.mountdir.exists():
+ logger.error('"%s" already exists.', newname)
return
- existings = list(_iter_overlays_info())
- names = set(x['name'] for x in existings)
- if new in names:
- logger.error('New name "%s" already exists.', new)
+
+ layer = build_layer(oldname)
+
+ if layer == _new:
+ logger.info('Not a new unit name, nothing to do.')
return
- superseeders = [x for x in existings if old in x['lowers']]
- logger.debug('%s overlays affected: %s', len(superseeders),
- ', '.join(x['name'] for x in superseeders))
- mounted = [info['name'] for info in superseeders if _is_mount_active(info)]
- if mounted:
- logger.error('The following units must be stopped: %s', ', '.join(mounted))
- # Move overlay
- old_info = _read_info(old)
- new_info = _compute_info(new, old_info['lowers'])
- for pathname in ('upperdir', 'workdir'):
- new_info[pathname].parent.mkdir(parents=True, exist_ok=True)
- old_info[pathname].rename(new_info[pathname])
- _write_info(new_info)
- _remove_units(old_info)
- _name2infopath(old).unlink()
- # Update existing overlays that depends on the moved one.
- for info in superseeders:
- lowers = info['lowers']
- lowers[lowers.index(old)] = new
- _write_info(_compute_info(info['name'], lowers))
- sh.systemctl('daemon-reload').wait()
- sh.systemctl.start(new_info['automountpath'].name).wait()
- logger.debug("Systemd %s unit enabled and started", new_info['unitname'])
- logger.info("%s created as %s", new_info['name'], new_info['unitname'])
+
+ descendants = list(_iter_descendants(layer))
+ logger.debug(
+ '%s overlays affected: %s', len(descendants),
+ ', '.join(x.name for x in descendants)
+ )
+ activated = [x.name for x in descendants if x.automountunit.is_active()]
+ if activated:
+ # XXX add --stop
+ logger.error(
+ 'The following units must be stopped: %s', ', '.join(activated))
+
+ is_automonted = layer.automountunit.is_active()
+ if is_automonted:
+ layer.stop()
+
+ # Move overlay dirs.
+ layer.upperdir.rename(_new.upperdir)
+ layer.workdir.rename(_new.workdir)
+
+ layer.delete()
+
+ layer.name = newname
+ layer.dump()
+
+ # Update existing layers that depends on the moved one.
+ for layer in descendants:
+ layer.dump()
+
+ systemctl.daemon_reload()
+
+ if is_automonted:
+ layer.start()
def _setup_logger(level):
@@ 583,13 796,17 @@ def _setup_logger(level):
logging.getLogger('sh').setLevel('WARNING')
+def ensure_directories_exist():
+ """Ensure required folders exist."""
+ for path in (MOUNTDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR):
+ path.mkdir(parents=True, exist_ok=True)
+
+
def main():
"""Main entry point for the CLI"""
from argparse import ArgumentParser
-
- for path in (MOUNTDIR, OVERLAYDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR):
- path.mkdir(parents=True, exist_ok=True)
+ ensure_directories_exist()
parser = ArgumentParser(description="Manage images as overlays for machinectl.")
parser.add_argument(
@@ 786,8 1003,8 @@ def main():
"edit an overlay created by this tool. "
"An editor is started if --add nor --remove provided."),
)
- edit.add_argument('-p', '--prepand', action='store_true',
- help="prepand the following names", )
+ edit.add_argument('-p', '--prepend', action='store_true',
+ help="prepend the following names", )
edit.add_argument(
'-a', '--append', action='append', help="append the following names",
)
@@ 804,7 1021,7 @@ def main():
)
def _editer(args):
- editer(args.mountdir, args.append, args.prepand, args.delete)
+ editer(args.mountdir, args.append, args.prepend, args.delete)
edit.set_defaults(func=_editer)
@@ 0,0 1,563 @@
+import imp
+import io
+import json
+import os
+import re
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest import TestCase, mock
+
+# Like `import overlayctl` but it does not need a real python lib.
+with io.open('overlayctl') as fobj:
+ overlayctl = imp.load_module(
+ 'overlayctl', fobj, 'overlayctl', ('', 'r', imp.PY_SOURCE))
+
+
+class BaseTest(TestCase):
+ """Base TestCase that create temporary to work within."""
+
+ maxDiff = None
+
+ def setUp(self):
+ self._tempdir = TemporaryDirectory()
+ tempdirpath = Path(self._tempdir.__enter__())
+ self.systemctl = overlayctl.systemctl = mock.Mock()
+ self.mountdir = overlayctl.MOUNTDIR = tempdirpath / 'machines'
+ self.unitdir = overlayctl.UNITSDIR = tempdirpath / 'systemd'
+ overlaydir = tempdirpath / 'overlayctl'
+ self.uppersdir = overlayctl.UPPERSDIR = overlaydir / 'upper'
+ self.worksdir = overlayctl.WORKSDIR = overlaydir / 'work'
+ self.infosdir = overlayctl.INFOSDIR = overlaydir / 'info'
+ overlayctl.ensure_directories_exist()
+ overlayctl.build_layer.cache_clear()
+ overlayctl._setup_logger('CRITICAL')
+
+ def tearDown(self):
+ self._tempdir.cleanup()
+
+ def assert_daemon_reload_called(self):
+ self.systemctl.daemon_reload.assert_called()
+
+ def get_unit_name(self, systemd_name):
+ name = '-'.join([
+ str(self.mountdir).replace(os.path.sep, '-'),
+ systemd_name
+ ])
+ return name.lstrip('-')
+
+ def get_mount_dir(self, name):
+ return self.mountdir / name
+
+ def get_upper_dir(self, systemd_name):
+ return self.uppersdir / self.get_unit_name(systemd_name)
+
+ def get_work_dir(self, systemd_name):
+ return self.worksdir / self.get_unit_name(systemd_name)
+
+ def write_info(self, name, lowers):
+ (self.infosdir / self.get_unit_name(name)).write_text(
+ json.dumps({'name': name, 'lowers': lowers}))
+
+ def assert_info_equal(self, name, lowers, systemd_name=''):
+ unitname = self.get_unit_name(systemd_name or name)
+ filepath = self.infosdir / unitname
+ info = json.loads(filepath.read_text())
+ self.assertDictEqual({'name': name, 'lowers': lowers}, info)
+
+ def assert_overlay_dirs_exists(self, systemd_name, name=None):
+ self.assertTrue(self.get_upper_dir(systemd_name).is_dir())
+ self.assertTrue(self.get_work_dir(systemd_name).is_dir())
+ self.assertTrue(self.get_mount_dir(name or systemd_name).is_dir())
+
+ def get_unit_path(self, systemd_name, unittype):
+ return (self.unitdir / self.get_unit_name(systemd_name)).with_suffix('.' + unittype)
+
+ def parse_unit(self, systemd_name, unittype):
+ path = self.get_unit_path(systemd_name, unittype)
+ content = {}
+ target = content
+ sectionmatch = re.compile(r'\[(.*?)\]').match
+ entrymatch = re.compile(r'^\s*(.*?)\s*=\s*(.*?)\s*$').match
+ with path.open(encoding='utf-8') as fobj:
+ for line in fobj:
+ section = sectionmatch(line)
+ if line.lstrip().startswith('#'):
+ continue
+ if section:
+ [key] = section.groups()
+ target = content[key] = {}
+ continue
+ entry = entrymatch(line)
+ if entry:
+ key, value = entry.groups()
+ if key == 'Options':
+ value = {
+ key: list(filter(None, folders.split(':')))
+ for group in value.split(',')
+ for key, folders in [group.split('=', 1)]}
+ target[key] = value
+ return content
+
+ def assert_automount_unit_equal(self, systemd_name, expected):
+ result = self.parse_unit(systemd_name, 'automount')
+ self.assertDictEqual(result, expected)
+
+ def assert_mount_unit_equal(self, systemd_name, expected):
+ result = self.parse_unit(systemd_name, 'mount')
+ self.assertDictEqual(result, expected)
+
+ def check_mount_unit(self, layer_name, lower_names):
+ self.assert_mount_unit_equal(layer_name, {
+ 'Mount': {
+ 'Options': {
+ 'lowerdir': [str(lower_name) for lower_name in lower_names],
+ 'upperdir': [str(self.get_upper_dir(layer_name))],
+ 'workdir': [str(self.get_work_dir(layer_name))]},
+ 'Type': 'overlay',
+ 'What': 'overlay',
+ 'Where': str(self.get_mount_dir(layer_name))},
+ 'Unit': {
+ 'ConditionPathExists': str(self.get_mount_dir(layer_name)),
+ 'Description': '%s Container overlay' % layer_name
+ }})
+
+ def assert_layer_started(self, name):
+ self.systemctl.start.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+ def assert_layer_enabled(self, name):
+ self.systemctl.enable.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+ def assert_layer_stopped(self, name):
+ self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.automount')
+ self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.mount')
+
+ def assert_layer_disabled(self, name):
+ self.systemctl.disable.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+
+class TestCreate(BaseTest):
+ """Test the `create` command."""
+
+ def test_creating_a_root_layer(self):
+ # With no dependencies, the layer can be created.
+ # I don't know if it's a good idea for now. But it's allowed.
+ # The layer name is escaped
+ name = 'test-it'
+ systemd_name = r'test\x2dit'
+ overlayctl.creater(name, lowers=(), start=False, permanent=False)
+ self.assert_daemon_reload_called()
+ self.assert_info_equal(name=name, lowers=[], systemd_name=systemd_name)
+ self.assert_overlay_dirs_exists(systemd_name, name)
+ self.assert_automount_unit_equal(systemd_name, {
+ 'Automount': {'Where': str(self.get_mount_dir('test-it'))},
+ 'Install': {'WantedBy': 'local-fs.target'}
+ })
+ self.assert_mount_unit_equal(systemd_name, {
+ 'Mount': {
+ 'Options': {
+ 'lowerdir': [],
+ 'upperdir': [str(self.get_upper_dir(r'test\\x2dit'))],
+ 'workdir': [str(self.get_work_dir(r'test\\x2dit'))]},
+ 'Type': 'overlay',
+ 'What': 'overlay',
+ 'Where': str(self.get_mount_dir('test-it'))},
+ 'Unit': {
+ 'ConditionPathExists': str(self.get_mount_dir('test-it')),
+ 'Description': 'test-it Container overlay'
+ }})
+
+ def test_creating_a_simple_layer(self):
+ # With dependencies, the layer is created. The layer can
+ # depend on another layer, an absolute path to a directory or
+ # a name of a folder in the mount directory.
+ self.get_upper_dir('lower1').mkdir()
+ self.write_info('lower1', [])
+ lower2dir = self.mountdir / 'lower2'
+ lower2dir.mkdir()
+ overlayctl.creater(
+ 'test', lowers=('lower1', str(lower2dir), 'lower3'))
+ self.assert_daemon_reload_called()
+ self.assert_info_equal('test', lowers=[
+ 'lower1', str(lower2dir), 'lower3'])
+ self.assert_overlay_dirs_exists('test')
+ self.assert_automount_unit_equal('test', {
+ 'Automount': {'Where': str(self.get_mount_dir('test'))},
+ 'Install': {'WantedBy': 'local-fs.target'}
+ })
+ self.check_mount_unit('test', [
+ self.get_upper_dir('lower1'),
+ self.get_mount_dir('lower2'),
+ self.get_mount_dir('lower3')])
+
+ def test_automount_started(self):
+ # with --start, the automount unit is automatically started.
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ self.get_mount_dir('lower').mkdir()
+ overlayctl.creater('test', lowers=('lower',), start=True)
+ self.assert_overlay_dirs_exists('test')
+ self.assert_daemon_reload_called()
+ self.systemctl.start.assert_called_once_with(self.get_unit_name('test') + '.automount')
+
+ def test_automount_enabled(self):
+ # with --start --permanent, the automount unit is automatically started and enabled.
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ self.get_mount_dir('lower').mkdir()
+ overlayctl.creater('test', lowers=('lower',), start=True, permanent=True)
+ self.assert_overlay_dirs_exists('test')
+ self.assert_daemon_reload_called()
+ unit = self.get_unit_name('test') + '.automount'
+ self.systemctl.start.assert_called_once_with(unit)
+ self.systemctl.enable.assert_called_once_with(unit)
+
+ def test_creating_an_advanced_layer(self):
+ # Let's build an app with the following dependency tree.
+ # app
+ # ┌─────────────┼──────────┐
+ # graphql flask nginx
+ # │ │ │
+ # sqlalchemy │ network
+ # ┌───┴────┐ ┌──────┘
+ # postgresql python
+ # └────┬────┘
+ # archbase
+ #
+ # The lower directories are linearized using the c3 algorithm.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('network', [])
+ overlayctl.creater('nginx', ['network'])
+ overlayctl.creater('postgresql', ['archbase'])
+ overlayctl.creater('python', ['archbase'])
+ overlayctl.creater('flask', ['python'])
+ overlayctl.creater('sqlalchemy', ['postgresql', 'python'])
+ overlayctl.creater('graphql', ['sqlalchemy'])
+ overlayctl.creater('app', ['graphql', 'flask', 'nginx', ])
+ self.check_mount_unit('app', [
+ self.get_upper_dir('graphql'),
+ self.get_upper_dir('flask'),
+ self.get_upper_dir('nginx'),
+ self.get_upper_dir('sqlalchemy'),
+ self.get_upper_dir('network'),
+ self.get_upper_dir('postgresql'),
+ self.get_upper_dir('python'),
+ self.get_mount_dir('archbase'),
+ ])
+
+
+class TestDelete(BaseTest):
+ """Test for the `delete` command."""
+
+ def test_delete_a_layer(self):
+ # Deleting a layer cleans up the upper/work directories, the
+ # info file, the .automount and the .mount systemd. unit file.
+ # The systemd units are stopped and disabled.
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.creater('test', lowers=('lower',), start=True, permanent=True)
+ automountunit = self.get_unit_name('test') + '.automount'
+ mountunit = self.get_unit_name('test') + '.mount'
+ self.assert_overlay_dirs_exists('test')
+ self.systemctl.start.assert_called_once_with(automountunit)
+ self.systemctl.enable.assert_called_once_with(automountunit)
+ self.systemctl.status.return_value = {'Active': 'active'}
+ overlayctl.deleter('test')
+ self.assertFalse(self.get_upper_dir('test').exists())
+ self.assertFalse(self.get_work_dir('test').exists())
+ self.assertFalse(self.get_mount_dir('test').exists())
+ self.assertFalse(self.get_unit_path('test', 'automount').exists())
+ self.assertFalse(self.get_unit_path('test', 'automount').exists())
+ self.systemctl.stop.assert_any_call(mountunit)
+ self.systemctl.stop.assert_any_call(automountunit)
+ self.systemctl.disable.assert_any_call(automountunit)
+
+ def test_cannot_delete_unmanaged_layer(self):
+ # Unmanaged layer cannot be deleted.
+ archbase = self.get_mount_dir('archbase')
+ archbase.mkdir()
+ overlayctl.deleter('archbase')
+ self.assertTrue(archbase.exists())
+
+ def test_cannot_delete_lower_layer(self):
+ # Its not allowed to delete a layer having descendants managed layer.
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ overlayctl.creater('layer2', ['layer1'])
+ with self.assertRaises(overlayctl.DeletionError):
+ overlayctl.deleter('layer1')
+ self.assertTrue(self.get_unit_path('layer1', 'mount').exists())
+ # But we can delete a layer on top of unmanaged layers.
+ overlayctl.deleter('layer2')
+ overlayctl.deleter('layer1')
+
+
+class TestStart(BaseTest):
+ """Tests for the `start` command."""
+
+ def test_starting_a_layer(self):
+ # Starting a layer starts the automount unit. Mounting is
+ # handled by systemd on first access to the folder (a.k.a `machinectl start`).
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ self.systemctl.reset_mock()
+ overlayctl.starter('layer1')
+ self.assert_layer_started('layer1')
+ self.systemctl.enable.assert_not_called()
+ self.systemctl.stop.assert_not_called()
+
+ def test_enabling_layer(self):
+ # The user can make change permanent after reboot using the
+ # --permanent option.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ self.systemctl.reset_mock()
+ overlayctl.starter('layer1', permanent=True)
+ self.assert_layer_started('layer1')
+ self.assert_layer_enabled('layer1')
+
+ def test_cannot_start_an_unmanaged_layer(self):
+ # Starting an unmanaged layer does nothing.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.starter('archbase')
+ self.systemctl.start.assert_not_called()
+ self.systemctl.enable.assert_not_called()
+
+ def test_cannot_start_a_lower_with_active_descendants(self):
+ # By default, a layer with active_descendants cannot be started
+ # because overlayFS may broke thing.
+ self.systemctl.status.side_effect = [{'Active': 'active'}, {'Active': 'inactive'}]
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ overlayctl.creater('layer2', ['layer1'])
+ self.systemctl.reset_mock()
+ overlayctl.starter('layer1')
+ self.systemctl.start.assert_not_called()
+
+ def test_user_can_force_to_start_a_lower_with_active_descendants(self):
+ # A layer with active_descendants can be started with the
+ # --preserve_others option. This may be dangereous, but the
+ # user is the master.
+ self.systemctl.status.side_effect = [{'Active': 'inactive'}]
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ overlayctl.creater('layer2', ['layer1'])
+ self.systemctl.reset_mock()
+ overlayctl.starter('layer1', preserve_others=True)
+ self.assert_layer_started('layer1')
+
+ def test_starting_stops_descendants_if_needed(self):
+ # Active descendant layers can be stopped automatically before
+ # starting the layer with the --stop-others options.
+ self.systemctl.status.side_effect = [
+ {'Active': 'active'}, {'Active': 'active'},
+ {'Active': 'inactive'}]
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ overlayctl.creater('layer2', ['layer1'])
+ overlayctl.creater('layer3', ['layer2'])
+ self.systemctl.reset_mock()
+ overlayctl.starter('layer1', stop_others=True)
+ self.assert_layer_stopped('layer2')
+ self.assert_layer_stopped('layer3')
+ self.assert_layer_started('layer1')
+
+
+class TestStop(BaseTest):
+ """Tests for the `stop` command."""
+
+ def test_stop_a_layer(self):
+ # Simply use the `stop` command to unmount the overlayfs and
+ # disable the automount unit.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.side_effect = [{'Active': 'active'}]
+ overlayctl.stoper('layer1')
+ self.assert_layer_stopped('layer1')
+ self.systemctl.disable.assert_not_called()
+
+ def test_stop_a_layer_permanently(self):
+ # The user can disable automounting on reboot with the --permanent option.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer1', ['archbase'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.side_effect = [{'Active': 'active'}]
+ overlayctl.stoper('layer1', permanent=True)
+ self.assert_layer_stopped('layer1')
+ self.assert_layer_disabled('layer1')
+
+
+class TestEdit(BaseTest):
+
+ def test_prepending_lowers(self):
+ # User can augment lowers with the --prepend option.
+ self.get_mount_dir('archbase').mkdir()
+ overlayctl.creater('layer0', ['archbase'])
+ overlayctl.creater('layer1', ['archbase'])
+ overlayctl.creater('layer2', ['archbase'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.editer('layer2', appended=['layer0', 'layer1'])
+ self.assert_daemon_reload_called()
+ self.check_mount_unit('layer2', [
+ self.get_upper_dir('layer0'),
+ self.get_upper_dir('layer1'),
+ self.get_mount_dir('archbase'),
+ ])
+
+ def test_appending_lowers(self):
+ # User can augment the overlay bases with the --append option.
+ self.get_mount_dir('base1').mkdir()
+ self.get_mount_dir('base2').mkdir()
+ overlayctl.creater('layer', [])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.editer('layer', appended=['base1', 'base2'])
+ self.assert_daemon_reload_called()
+ self.check_mount_unit('layer', [
+ self.get_mount_dir('base1'),
+ self.get_mount_dir('base2'),
+ ])
+
+ def test_removing_lowers(self):
+ # User can reduce the lower layers with the --remove option.
+ self.get_mount_dir('base').mkdir()
+ overlayctl.creater('autologin', [])
+ overlayctl.creater('network', [])
+ overlayctl.creater('layer', ['network', 'autologin', 'base'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.editer('layer', removed=['autologin', 'network'])
+ self.assert_daemon_reload_called()
+ self.check_mount_unit('layer', [self.get_mount_dir('base')])
+
+ def test_moving_a_lower(self):
+ # User can use both use --remove and --append/--prepend.
+ self.get_mount_dir('base1').mkdir()
+ self.get_mount_dir('base2').mkdir()
+ overlayctl.creater('layer', ['base1', 'base2'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.editer('layer', removed=['base2'], prepended=['base2'])
+ self.assert_daemon_reload_called()
+ self.check_mount_unit('layer', [
+ self.get_mount_dir('base2'),
+ self.get_mount_dir('base1'),
+ ])
+
+ def test_with_editor(self):
+ # The user's editor is started with no options.
+ # In the editor, user can move, add and remove lower freely.
+ self.get_mount_dir('base1').mkdir()
+ self.get_mount_dir('base2').mkdir()
+ overlayctl.creater('lower1', [])
+ overlayctl.creater('lower2', [])
+ overlayctl.creater('lower3', [])
+ overlayctl.creater('layer', ['lower2', 'lower1', 'base1', 'base2'])
+ self.systemctl.reset_mock()
+
+ def check_and_edit_file(filename):
+ filepath = Path(filename)
+ lowers = [line for line in filepath.read_text().splitlines()
+ if not line.startswith('#')]
+ self.assertSequenceEqual(lowers, ['lower2', 'lower1', 'base1', 'base2'])
+ filepath.write_text('\n'.join(['lower1', 'lower2', 'lower3', 'base1']))
+
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ with mock.patch('overlayctl._get_editor_on_file', new=check_and_edit_file):
+ overlayctl.editer('layer')
+ self.check_mount_unit('layer', [
+ self.get_upper_dir('lower1'),
+ self.get_upper_dir('lower2'),
+ self.get_upper_dir('lower3'),
+ self.get_mount_dir('base1'),
+ ])
+
+
+class TestMove(BaseTest):
+ """Tests for the `move` command."""
+
+ def test_moving_a_terminal_layer(self):
+ # Simple case when the user moves a layer without descendants.
+ # So, we have nothing to propagate.
+ self.get_mount_dir('base').mkdir()
+ overlayctl.creater('lower', ['base'])
+ overlayctl.creater('layer', ['lower'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.deplacer('layer', 'newlayer')
+ self.assert_layer_not_exist('layer')
+ self.assert_layer_exists('newlayer', [
+ self.get_upper_dir('lower'),
+ self.get_mount_dir('base'),
+ ])
+ self.assert_daemon_reload_called()
+
+ def test_descendant_of_moved_are_updated(self):
+ # When moving a layer, its descendants must be updated accordingly.
+ self.get_mount_dir('base').mkdir()
+ overlayctl.creater('layer1', ['base'])
+ overlayctl.creater('layer2', ['layer1'])
+ overlayctl.creater('layer3', ['layer2'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'inactive'}
+ overlayctl.deplacer('layer1', 'renamed')
+ self.assert_layer_not_exist('layer1')
+ self.assert_layer_exists('renamed', [self.get_mount_dir('base')])
+ self.assert_layer_exists('layer2', [
+ self.get_upper_dir('renamed'),
+ self.get_mount_dir('base'),
+ ])
+ self.assert_layer_exists('layer3', [
+ self.get_upper_dir('layer2'),
+ self.get_upper_dir('renamed'),
+ self.get_mount_dir('base'),
+ ])
+ self.assert_daemon_reload_called()
+
+ def test_moved_layer_is_restarted(self):
+ # Moving an activated terminal layer is ok, It's still
+ # activated at the end.
+ self.get_mount_dir('base').mkdir()
+ overlayctl.creater('layer1', ['base'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'active'}
+ overlayctl.deplacer('layer1', 'renamed')
+ self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.mount')
+ self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.automount')
+ self.systemctl.start.assert_any_call(self.get_unit_name('renamed') + '.automount')
+
+ def test_cannot_rename_if_new_name_exists(self):
+ # Do nothing if the new name is already known.
+ self.get_mount_dir('lower1').mkdir()
+ overlayctl.creater('lower2', [])
+ self.systemctl.reset_mock()
+ overlayctl.deplacer('lower2', 'lower1')
+ self.assertTrue(self.get_unit_path('lower2', 'mount').exists())
+ self.assertTrue(self.get_mount_dir('lower2').exists())
+ self.assertTrue(self.get_mount_dir('lower1').exists())
+
+ def test_cannot_rename_if_descandant_is_activated(self):
+ # OverlayFS does not like when lower directories moves, so,
+ # user cannot move a layer having an active descandant.
+ self.get_mount_dir('base').mkdir()
+ overlayctl.creater('lower', ['base'])
+ overlayctl.creater('layer', ['lower'])
+ self.systemctl.reset_mock()
+ self.systemctl.status.return_value = {'Active': 'active'}
+ overlayctl.deplacer('lower', 'newlower')
+
+ def assert_layer_not_exist(self, name):
+ self.assertFalse(self.get_unit_path(name, 'mount').exists())
+ self.assertFalse(self.get_unit_path(name, 'automount').exists())
+ self.assertFalse(self.get_mount_dir(name).exists())
+ self.assertFalse(self.get_upper_dir(name).exists())
+ self.assertFalse(self.get_work_dir(name).exists())
+
+ def assert_layer_exists(self, name, lowers):
+ self.assertTrue(self.get_mount_dir(name).exists())
+ self.assertTrue(self.get_upper_dir(name).exists())
+ self.assertTrue(self.get_work_dir(name).exists())
+ self.assert_automount_unit_equal(name, {
+ 'Automount': {'Where': str(self.get_mount_dir(name))},
+ 'Install': {'WantedBy': 'local-fs.target'}
+ })
+ self.check_mount_unit(name, lowers)