200ba2114979 — Alain Leufroy 3 years ago
rewrite code to make it a bit more easy to improve and add tests

Tests for list are still missing. But it's late.
3 files changed, 1146 insertions(+), 360 deletions(-)

M Makefile
M overlayctl
A => test_overlayctl.py
M Makefile +6 -0
@@ 1,2 1,8 @@ 
 normalize:
 	black -S overlayctl
+
+test:
+	python -m unittest test_overlayctl
+
+devtest:
+	hg manifest | entr -s 'pytest test_overlayctl.py --failed-first --exitfirst'

          
M overlayctl +577 -360
@@ 17,13 17,13 @@ we stack the upper dir of lowered overla
 
 import json
 import logging
-import operator
 import os
 import re
 import shutil
 import sys
 from collections import OrderedDict, defaultdict
-from functools import reduce
+from contextlib import contextmanager
+from functools import lru_cache
 from itertools import chain
 from pathlib import Path
 from subprocess import PIPE, Popen

          
@@ 52,25 52,6 @@ INFOSDIR = OVERLAYDIR / 'info'
 UNITSDIR = Path(os.environ.get('UNITSDIR', Path('/') / 'etc' / 'systemd' / 'system'))
 
 
-MOUNTTMPL = '''
-[Unit]
-Description = {name} Container overlay
-ConditionPathExists = {mountdir}
-[Mount]
-What = overlay
-Where = {mountdir}
-Type = overlay
-Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir}
-'''
-
-AUTOMOUNTTMPL = '''
-[Automount]
-Where={mountdir}
-[Install]
-WantedBy=local-fs.target
-'''
-
-
 class OverlayctlError(SystemExit):
     """Raised on exception."""
 

          
@@ 85,222 66,452 @@ class DeletionError(OSError):
     """raised when an overlay can not be deleted."""
 
 
-def _name2mountdir(name):
-    """return the mount point path for the given overlay name.
+class BaseLayer:
+    def __init__(self, name):
+        self.name = name
 
-    If the name contains a path separator just return it, otherwise
-    return a folder named name as a subfolder of MOUNTDIR.
-    """
-    if os.path.sep in name:
-        return Path(name)
-    return MOUNTDIR / name
-
+    def __eq__(self, layer):
+        return isinstance(layer, BaseLayer) and self.mountdir == layer.mountdir
 
-def _name2unit(name, _cache={}):
-    """return systemd Unit file name for the given overlay name"""
-    if name in _cache:
-        return _cache[name]
-    mountdir = _name2mountdir(name)
-    _cache[name] = str(sh.systemd_escape(mountdir, path=True)).strip()
-    return _cache[name]
-
+    def __hash__(self):
+        return hash(str(self.mountdir))
 
-def _name2upperdir(name):
-    """return upperdir path for the given overlay name"""
-    return UPPERSDIR / _name2unit(name)
-
-
-def _name2workdir(name):
-    """return workdir path for the given overlay name"""
-    return WORKSDIR / _name2unit(name)
-
-
-def _name2lowerdir(name):
-    """return path for the given overlay name to be used as a lowerdir
-    for another overlays.
+    @property
+    def mountdir(self):
+        """return the mount point path for the given overlay name.
 
-    - If name is not a managed overlay, return the corresponding mount point path.
-    - If name is a managed overlay, return the corresponding name to get around overlayfs
-      limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
-    """
-    upper = _name2upperdir(name)
-    if upper.exists():
-        return upper
-    return _name2mountdir(name)
+        If the name contains a path separator just return it, otherwise
+        return a folder named name as a subfolder of MOUNTDIR.
+        """
+        if os.path.sep in self.name:
+            return Path(self.name)
+        return MOUNTDIR / self.name
 
-
-def _name2infopath(name):
-    """return the metadata file path for the given overlay name"""
-    return INFOSDIR / _name2unit(name)
-
+    def __str__(self):
+        return self.name
 
-def _compute_info(name, lowers):
-    """return a dictionary containing metadata for the given overlay name
-    stacking the given lower directories.
-    Existing overlay metadata are ignored.
-    """
-    info = {}
-    info['name'] = name
-    info['lowers'] = lowers
-    info['unitname'] = _name2unit(name)
-    info['mountdir'] = _name2mountdir(name)
-    info['lowerdir'] = [
-        _name2lowerdir(lower) for lower in _get_linearized_lowerdirs(name, lowers)
-    ]
-    info['upperdir'] = _name2upperdir(name)
-    info['workdir'] = _name2workdir(name)
-    info['mountpath'] = (UNITSDIR / info['unitname']).with_suffix('.mount')
-    info['automountpath'] = (UNITSDIR / info['unitname']).with_suffix('.automount')
-    return info
+    @property
+    def lowerdir(self):
+        raise NotImplementedError
+
+    @property
+    def lowers(self):
+        """Lower layers."""
+        raise NotImplementedError
+
+    def is_managed(self):
+        raise NotImplementedError
 
 
-def _read_info(name, missing_ok=False):
-    """return saved metadata of an existing overlay name.
+class Layer(BaseLayer):
+    """An overlay layer."""
+
+    def __init__(self, name):
+        super(Layer, self).__init__(name)
+        self._lowers = None
+        #XXX cache properties
+
+    @property
+    def lowers(self):
+        """Lower layers."""
+        if self._lowers is None:
+            self.load()
+        return self._lowers
+
+    @lowers.setter
+    def lowers(self, lowers):
+        # XXX detect cyclic dependencies.
+        self._lowers = lowers
+
+    @property
+    def upperdir(self):
+        """return upperdir path for the given overlay name"""
+        return UPPERSDIR / self.unitname
+
+    @property
+    def workdir(self):
+        """return workdir path for the given overlay name"""
+        return WORKSDIR / self.unitname
+
+    @property
+    def lowerdir(self):
+        """return path for the given overlay name to be used as a lowerdir
+        for another overlays.
 
-    If missing_ok is True, brand new overlay metadata is computed with
-    no lowers.
+        - If name is not a managed overlay, return the corresponding mount point path.
+        - If name is a managed overlay, return the corresponding name to get around overlayfs
+          limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
+        """
+        if self.upperdir.exists():
+            return self.upperdir
+        return self.mountdir
+
+    @property
+    def unitname(self):
+        """return systemd Unit file name for the given overlay name"""
+        return systemd_escape(self.mountdir)
+
+    @property
+    def infopath(self):
+        """return the metadata file path for the given overlay name"""
+        return INFOSDIR / self.unitname
+
+    @property
+    def automountunit(self):
+        """Systeme automount unit."""
+        return AutomountUnit(self.unitname)
+
+    @property
+    def mountunit(self):
+        """Systeme mount unit."""
+        return MountUnit(self.unitname)
+
+    def __repr__(self):
+        return f'<Layer name="{self.name}" unitname="{self.unitname}">'
+
+    def __str__(self):
+        return self.name
+
+    def is_managed(self):
+        return self.infopath.exists()
+
+    def get_ascendants(self):
+        return _get_linearized_lowers(self, self.lowers)
+
+    def load(self, missing_ok=True):
+        """return saved metadata of an existing overlay name.
+
+        If missing_ok is True, brand new overlay metadata is computed with
+        no lowers.
 
-    If a systemd Unit file is found, it is parsed to retrieve real metadata.
-    """
-    info = _compute_info(name, [])
-    infopath = _name2infopath(name)
-    if infopath.exists():
-        with infopath.open(encoding='utf-8') as fobj:
-            info = json.load(fobj)
-        assert name == info['name']
-        info['mountpath'] = Path(info['mountpath'])
-        info['automountpath'] = Path(info['automountpath'])
-    elif not missing_ok:
-        raise OSError('%s is not a managed overlay: %s not found' % (name, infopath))
-    if info['mountpath'].exists():
-        logger.debug('Metadata found for %s', name)
-        content = info['mountpath'].read_text(encoding='utf-8')
-        info['upperdir'] = Path(
-            re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\')
+        If a systemd Unit file is found, it is parsed to retrieve real metadata.
+        """
+        infopath = self.infopath
+        if infopath.exists():
+            with infopath.open(encoding='utf-8') as fobj:
+                info = json.load(fobj)
+            assert info['name'] == self.name
+            if self._lowers is None:
+                self._lowers = [build_layer(lower) for lower in info['lowers']]
+        else:
+            self._lowers = []
+        self.check_consistency()
+
+    def check_consistency(self):
+        """Check that info and units are consistents."""
+        if self.mountunit.path.exists():
+            logger.debug('Metadata found for %s', self.name)
+            content = self.mountunit.path.read_text(encoding='utf-8')
+            assert self.upperdir == Path(
+                re.search('upperdir=(.*?)[,$]', content).groups()[0].replace('\\\\', '\\')
+            )
+            lowerdirs = re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':')
+            assert [lower.lowerdir for lower in self.get_ascendants()] \
+                == [Path(p.replace('\\\\', '\\')) for p in lowerdirs if p]
+            assert self.workdir == Path(
+                re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\')
+            )
+            assert self.mountdir == Path(
+                re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip()
+            )
+        else:
+            logger.debug("No metadata found for %s", self.name)
+
+    def dump(self):
+        """Write the given overlay metadata to disk."""
+        data = {
+            'name': self.name,
+            'lowers': [lower.name for lower in self.lowers]
+        }
+        self.infopath.write_text(
+            json.dumps(data, indent=2), encoding='utf-8'
+        )
+        logger.debug('%s updated', self.infopath)
+        data['lowerdir'] = ':'.join(
+            str(lower.lowerdir).replace('\\', '\\\\')
+            for lower in self.get_ascendants()
         )
-        info['lowerdir'] = [
-            Path(p.replace('\\\\', '\\'))
-            for p in re.search('lowerdir=(.*?)[,$]', content).groups()[0].split(':')
-        ]
-        info['workdir'] = Path(
-            re.search('workdir=(.*?)[,\n]', content).groups()[0].replace('\\\\', '\\')
-        )
-        info['mountdir'] = Path(
-            re.search('[Ww]here *=(.*?)\n', content).groups()[0].strip()
-        )
-    else:
-        logger.debug("No metadata found for %s", name)
-    return info
+        data['upperdir'] = str(self.upperdir).replace('\\', '\\\\')
+        data['workdir'] = str(self.workdir).replace('\\', '\\\\')
+        data['mountdir'] = str(self.mountdir).replace('\\', '\\\\')
+        self.upperdir.mkdir(parents=True, exist_ok=True)
+        self.workdir.mkdir(parents=True, exist_ok=True)
+        self.mountdir.mkdir(parents=True, exist_ok=True)
+        self.mountunit.dump(**data)
+        self.automountunit.dump(**data)
+
+    def start(self, permanent=False):
+        """Start the automount systemd  unit.
+
+        Enable the automount unit if permanent is truthy.
+        """
+        self.automountunit.start()
+        if permanent:
+            self.automountunit.enable()
+
+    def stop(self, permanent=False):
+        """Start the automount and mount systemd  units.
+
+        Disable the automount unit if permanent is truthy.
+        """
+        self.automountunit.stop()
+        self.mountunit.stop()
+        if permanent:
+            self.automountunit.disable()
+
+    @contextmanager
+    def temporary_stop(self):
+        """Temporary ensure that the systemd units are stopped.
+
+        Restart the automount unit if it was preciously started.
+        """
+        is_automonted = self.automountunit.is_active()
+        is_mounted = self.mountunit.is_active()
+        self.automountunit.stop()
+        self.mountunit.stop()
+        yield
+        if is_automonted:
+            self.automountunit.start()
+        if is_mounted:
+            self.mountunit.start()
+
+    def delete(self):
+        """Delete the overlay.
+
+        Stop, disable and delete the systemd units.
+        Delete the overlay info file and upper/work directories.
+        """
+        self.stop(permanent=True)
+        self.automountunit.unlink()
+        self.mountunit.unlink()
+        systemctl.daemon_reload()
+        shutil.rmtree(bytes(self.mountdir), ignore_errors=True)
+        shutil.rmtree(bytes(self.workdir), ignore_errors=True)
+        shutil.rmtree(bytes(self.upperdir), ignore_errors=True)
+        self.infopath.unlink()
 
 
-def _write_info(info):
-    """Write the given overlay metadata to disk."""
-    data = {
-        'name': info['name'],
-        'lowers': info['lowers'],
-        'unitname': info['unitname'],
-        'mountpath': str(info['mountpath']),
-        'automountpath': str(info['automountpath']),
-    }
-    _name2infopath(info['name']).write_text(
-        json.dumps(data, indent=2), encoding='utf-8'
-    )
-    logger.debug('%s updated', _name2infopath(info['name']))
-    data = info.copy()
-    data['lowerdir'] = ':'.join(
-        str(path).replace('\\', '\\\\') for path in data['lowerdir']
-    )
-    data['upperdir'] = str(data['upperdir']).replace('\\', '\\\\')
-    data['workdir'] = str(data['workdir']).replace('\\', '\\\\')
-    data['mountpath'] = str(info['mountpath']).replace('\\', '\\\\')
-    info['mountpath'].write_text(MOUNTTMPL.format(**data), encoding='utf-8')
-    logger.debug('%s updated', info['mountpath'])
-    info['automountpath'].write_text(AUTOMOUNTTMPL.format(**info), encoding='utf-8')
-    logger.debug('%s updated', info['automountpath'])
+class UnmanagedLayer(BaseLayer):
+
+    @property
+    def lowers(self):
+        """Lower layers."""
+        return ()
+
+    @property
+    def lowerdir(self):
+        """return path for the given overlay name to be used as a lowerdir
+        for another overlays.
+
+        - If name is not a managed overlay, return the corresponding mount point path.
+        - If name is a managed overlay, return the corresponding name to get around overlayfs
+          limitation (https://lists.ubuntu.com/archives/kernel-team/2013-March/025775.html)
+        """
+        return self.mountdir
+
+    def __repr__(self):
+        return f'<UnmanagedLayer name="{self.name}" mountdir="{self.mountdir}">'
+
+    def is_managed(self):
+        return False
+
+
+class GenericUnit:
+    """Generic systemd Unit interface."""
+
+    @property
+    def unittype(self):
+        """Systemd unit type (a.k.a "automount" or "mount")."""
+        raise NotImplementedError
+
+    @property
+    def template(self):
+        """Template of the systemd unit content."""
+        raise NotImplementedError
+
+    def __init__(self, unitname):
+        self.unitname = unitname
+
+    @property
+    def path(self):
+        """File system path of the systemd unit file."""
+        return (UNITSDIR / self.unitname).with_suffix('.' + self.unittype)
+
+    def dump(self, **data):
+        """Write the definition of the systemd unit into the disk."""
+        self.path.write_text(self.template.format(**data), encoding='utf-8')
+        logger.debug('Systemd unit %s updated', self.path.name)
+
+    def start(self):
+        """Start the systemd unit."""
+        systemctl.start(self.path.name)
+        logger.info('Systemd unit %s started', self.path.name)
+
+    def stop(self):
+        """Stop the systemd unit."""
+        systemctl.stop(self.path.name)
+        logger.info('Systemd unit %s stopped', self.path.name)
+
+    def enable(self):
+        """Enable the systemd unit."""
+        systemctl.enable(self.path.name)
+        logger.info('Systemd unit %s enabled', self.path.name)
+
+    def disable(self):
+        """Disable the systemd unit."""
+        systemctl.disable(self.path.name)
+        logger.info('Systemd unit %s disabled', self.path.name)
+
+    def unlink(self):
+        """Remove the systemd unit file."""
+        self.path.unlink()
+
+    def is_active(self):
+        """Return True if the unit is active."""
+        return 'active' in systemctl.status(self.path.name)['Active'].split()
+
 
 
-def _iter_overlays_info_simple():
-    for path in INFOSDIR.glob('*'):
-        info = json.loads(path.read_text(encoding='utf-8'))
-        yield _read_info(info['name'])
+class MountUnit(GenericUnit):
+    """Systemd mount unit."""
+
+    unittype = 'mount'
+    template = '''
+[Unit]
+Description = {name} Container overlay
+ConditionPathExists = {mountdir}
+[Mount]
+What = overlay
+Where = {mountdir}
+Type = overlay
+Options = lowerdir={lowerdir},upperdir={upperdir},workdir={workdir}
+'''
+
+
+class AutomountUnit(GenericUnit):
+    """Systemd automount unit."""
+
+    unittype = 'automount'
+    template = '''
+[Automount]
+Where={mountdir}
+[Install]
+WantedBy=local-fs.target
+'''
+
+
+class systemctl:
 
 
-def _iter_overlays_info(ordered=None):
-    """Yield metadata for each managed overlay"""
-    infos = _iter_overlays_info_simple()
+    @classmethod
+    def start(cls, unitname):
+        cls._execute('start', unitname)
+
+    @classmethod
+    def stop(cls, unitname):
+        cls._execute('stop', unitname)
+
+    @classmethod
+    def enable(cls, unitname):
+        cls._execute('enable', unitname)
+
+    @classmethod
+    def disable(cls, unitname):
+        cls._execute('enable', unitname)
+
+    @classmethod
+    def daemon_reload(cls):
+        cls._execute('daemon-reload')
+
+    @classmethod
+    def status(cls, unit):
+        """Return a disctionary containing data from `systemctl status {unit}`."""
+        stdout, dummy_stderr = Popen(
+            ['systemctl', 'status', unit], stdout=PIPE
+        ).communicate()
+        return dict(
+            map(str.strip, line.split(':', 1))
+            for line in stdout.decode('utf-8').splitlines()
+            if ':' in line
+        )
+
+    @staticmethod
+    def _execute(command, *args):
+        process = Popen(['systemctl', command] + list(args), stdout=PIPE)
+        stdout, dummy_stderr = process.communicate()
+        return stdout
+
+
+def systemd_escape(path):
+    process = Popen(['systemd-escape', '--path', str(path)], stdout=PIPE)
+    stdout, dummy_stderr = process.communicate()
+    return stdout.strip().decode(sys.getdefaultencoding())
+
+
+@lru_cache
+def build_layer(name):
+    """A registry to instanciate Layer object.
+
+    The main idea is to ensure that only one object is instanciated
+    per layer. In fact each layer may have lower layers that can be
+    instanciated at any time.
+    """
+    layer = Layer(name)
+    if layer.is_managed():
+        return layer
+    return UnmanagedLayer(name)
+
+
+def _iter_layers_simple():
+    """Yield all known layers."""
+    for path in INFOSDIR.glob('*'):
+        info = json.loads(path.read_text(encoding='utf-8'))
+        layer = build_layer(info['name'])
+        layer.lowers = [build_layer(lowername) for lowername in info['lowers']]
+        yield layer
+
+
+def _iter_layers(ordered=None):
+    """Yield all known layers.
+
+    :ordered: if truthy, yield layers by order.
+    """
+    layers = list(_iter_layers_simple())
     if not ordered:
-        return infos
-    infos = {info['name']: info for info in infos}
-    lowers = set(chain(*[info['lowers'] for info in infos.values()]))
-    heads = list(set(infos) - lowers)
+        return layers
+    lowers = set(chain(*(layer.lowers for layer in layers)))
+    heads = list(set(layers) - lowers)
     return (
-        infos[name] for name in _get_linearized_lowerdirs(None, heads) if name in infos
+        layer for layer in _get_linearized_lowers(None, heads) if layer in layers
     )
 
 
-def _iter_tree(info, tree):
-    infos = [info]
-    seens = {info['name']}
-    while infos:
-        info = infos.pop()
-        for _info in tree[info['name']]:
-            if _info['name'] in seens:
-                continue
-            infos.append(_info)
-            seens.add(_info['name'])
-            yield _info
-
-
-def _iter_descendants(info):
-    tree = defaultdict(list)
-    for _info in _iter_overlays_info_simple():
-        for lower in _info['lowers']:
-            tree[lower].append(_info)
-    return _iter_tree(info, tree)
-
-
-def _iter_ascendants(info):
-    tree = defaultdict(list)
-    infos = {i['name']: i for i in _iter_overlays_info_simple()}
-    tree = {n: [infos[x] for x in i['lowers'] if x in infos]
-            for n, i in infos.items()}
-    return _iter_tree(info, tree)
+def _iter_tree(current, next_layers_getter):
+    """Utility to walk throught the layers tree."""
+    layers = next_layers_getter(current)[:]
+    seens = {current}
+    while layers:
+        layer = layers.pop(0)
+        if layer in seens:
+            continue
+        layers += next_layers_getter(layer) or []
+        seens.add(layer)
+        yield layer
 
 
-def _systemctl_status(unit):
-    """REturn a disctionary containing data from `systemctl status {unit}`."""
-    stdout, dummy_stderr = Popen(
-        ['systemctl', 'status', unit], stdout=PIPE
-    ).communicate()
-    return dict(
-        map(str.strip, line.split(':', 1))
-        for line in stdout.decode('utf-8').splitlines()
-        if ':' in line
-    )
-
-
-def _is_mount_active(info):
-    return 'active' in _systemctl_status(info['mountpath'].name)['Active'].split()
+def _iter_ascendants(current):
+    """Yield  layers that depend on the given `layer`."""
+    return _iter_tree(current, lambda layer: layer.lowers)
 
 
-def _is_automount_active(info):
-    return 'active' in _systemctl_status(info['automountpath'].name)['Active'].split()
-
-
-def _disable_units(info):
-    if _is_automount_active(info):
-        sh.systemctl.stop(info['automountpath'].name)
-    if _is_mount_active(info):
-        sh.systemctl.stop(info['mountpath'].name)
-    sh.systemctl.disable(info['automountpath'].name).wait()
-    sh.systemctl.disable(info['mountpath'].name).wait()
-    logger.debug('Systemd %s disabled', info['automountpath'].name)
-
-
-def _remove_units(info):
-    _disable_units(info)
-    info['automountpath'].unlink()
-    info['mountpath'].unlink()
+def _iter_descendants(current):
+    """Yield layers the `layer` depends on."""
+    tree = defaultdict(list)
+    for layer in _iter_layers_simple():
+        tree[layer]
+        for parent in layer.lowers:
+            tree[parent].append(layer)
+    return _iter_tree(current, tree.get)
 
 
 def c3_merge(graph):

          
@@ 334,23 545,23 @@ def c3_linearize(head, graph, results):
     return res
 
 
-def _get_linearized_lowerdirs(name, lowers):
-    """Get lower dirs of the given overlay name stacking lowers. It
+def _get_linearized_lowers(layer, lowers=None):
+    """Get lowers of the given overlay name stacking lowers. It
     resolve every {sub}lowers for which overlay information can be retrieved.
     """
-    lowers = lowers[:]
-    graph = defaultdict(list, {name: lowers[:]})
+    lowers = lowers[:] if lowers else layer.lowers
+    graph = defaultdict(list, {layer: lowers[:]})
     while lowers:
         lower = lowers.pop(0)  # consume
-        sublowers = _read_info(lower, missing_ok=True)['lowers']
-        graph[lower] = sublowers
-        lowers += sublowers  # also add sublowers to resolver
+        if lower.lowers:
+            graph[lower] = lower.lowers
+            lowers += lower.lowers  # also add sublowers to resolver
     result = {}
-    c3_linearize(name, graph, result)
-    return result[name][1:]
+    c3_linearize(layer, graph, result)
+    return result[layer][1:]
 
 
-def creater(name, lowers, start, permanent):
+def creater(name, lowers, start=None, permanent=None):
     """create an overlay at `name` that inherites from `lowerdir`.
 
     :name: the overlay path.

          
@@ 361,57 572,55 @@ def creater(name, lowers, start, permane
            If a lower path doesn't contains a path separator (a.k.a. '/'),
            /var/lib/machines/{lower} is used
     """
-    info = _compute_info(name, lowers)
-    info['upperdir'].mkdir(parents=True, exist_ok=True)
-    info['workdir'].mkdir(parents=True, exist_ok=True)
-    _write_info(info)
-    sh.systemctl('daemon-reload').wait()
-    logger.info("%s created as %s", info['name'], info['unitname'])
+    layer = Layer(name)
+    layer.lowers = [build_layer(lowername) for lowername in lowers]
+    layer.dump()
+    systemctl.daemon_reload()
     if start:
-        sh.systemctl.start(info['automountpath'].name).wait()
-        logger.debug("Systemd %s.automount unit started", info['unitname'])
+        layer.start(permanent)
     else:
-        logger.warning("Use `overlayctl start %s` to start the overlay.", info['name'])
-    if permanent:
-        sh.systemctl.enable(info['automountpath'].name).wait()
-        logger.debug("Systemd %s.automount unit enabled", info['unitname'])
+        logger.warning("Use `overlayctl start %s` to start the overlay.", layer.name)
 
 
 def deleter(name):
     """Delete a managed overlay and related folder/files"""
-    lowers = reduce(
-        operator.or_, (set(info['lowers']) for info in _iter_overlays_info()), set()
-    )
-    lowers = set(_name2unit(lower) for lower in lowers)
-    if _name2unit(name) in lowers:
-        raise DeletionError("\"%s\" is a lower directory of another overlay" % name)
-    info = _read_info(name)
-    _remove_units(info)
-    sh.systemctl('daemon-reload').wait()
-    shutil.rmtree(bytes(info['mountdir']), ignore_errors=True)
-    shutil.rmtree(bytes(info['workdir']))
-    shutil.rmtree(bytes(info['upperdir']))
-    _name2infopath(name).unlink()
-    logger.debug("Related file and folder deleted")
-    logger.info("\"%s\" deleted", info['name'])
+    lowers = set(chain(*(x.lowers for x in _iter_layers_simple())))
+    layer = build_layer(name)
+    if not layer.is_managed():
+        logger.info("\"%s\" not managed. Nothing to do.", layer.name)
+        return
+    descendants = list(_iter_descendants(layer))
+    if descendants:
+        # XXX allow this with automatic updates.
+        raise DeletionError("\"%s\" is a lower directory of another overlay: %s"
+                            % (name, ', '.join(x.name for x in descendants)))
+    layer.delete()
+    systemctl.daemon_reload()
+    logger.info("\"%s\" deleted", layer.name)
 
 
-def editer(name, appended=None, prepanded=None, removed=None):
+def _get_editor_on_file(filename):
+    cmd = '%s "%s"' % (os.environ.get('EDITOR', 'vim'), filename)
+    Popen(cmd, shell=True).wait()
+
+
+def editer(name, appended=None, prepended=None, removed=None):
     """Ask user to edit lowers of the given overlay name, rewrite
     the metadata and unit files accordingly, finaly retart systemd units
     of the edited overlay.
     """
-    info = _read_info(name)
-    if not appended and not prepanded and not removed:
+    layer = build_layer(name)
+    # XXX check descendants not active
+    if not appended and not prepended and not removed:
         with NamedTemporaryFile() as fobj:
             fobj.write(b'# -*- encoding: utf-8 -*-\n')
             fobj.write(b'# Note: Leave unchanged or fully empty to ignore changes\n')
-            fobj.write(b'\n'.join(lower.encode('utf-8') for lower in info['lowers']))
+            fobj.write(b'\n'.join(lower.name.encode('utf-8') for lower in layer.lowers))
             fobj.flush()
-            Popen(os.environ.get('EDITOR', 'vim') + ' ' + fobj.name, shell=True).wait()
+            _get_editor_on_file(fobj.name)
             fobj.seek(0)
             lowers = [
-                line.strip()
+                build_layer(line.strip())
                 for line in fobj.read().decode('utf-8').splitlines()
                 if not line.strip().startswith('#') and line.strip()
             ]

          
@@ 419,110 628,102 @@ def editer(name, appended=None, prepande
             if not fobj.read(1):
                 return  # Ignore empty file for convenience
     else:
-        prepanded = prepanded or []
-        added = appended or []
-        removed = removed or ()
-        lowers = prepanded + [
-            lower for lower in info['lowers'] if lower not in removed
-        ] + added
-    if lowers == info['lowers']:
+        prepended = map(build_layer, prepended or ())
+        appended = map(build_layer, appended or ())
+        removed = list(map(build_layer, removed or ()))
+        lowers = chain(
+            prepended,
+            (lower for lower in layer.lowers if lower not in removed),
+            appended)
+    if lowers == layer.lowers:
         return
-    automounted = _is_automount_active(info)
-    mounted = _is_mount_active(info)
-    if automounted:
-        sh.systemctl.stop(info['automountpath'].name)
-    if mounted:
-        sh.systemctl.stop(info['mountpath'].name)
-    info = _compute_info(name, lowers)
-    _write_info(info)
-    sh.systemctl('daemon-reload').wait()
-    if automounted:
-        sh.systemctl.start(info['automountpath'].name)
-    if mounted:
-        sh.systemctl.start(info['automount'].name)
-    logger.debug("Systemd \"%s\" restarted", info['automountpath'])
-    logger.info("\"%s\" updated", info['name'])
+    with layer.temporary_stop():
+        layer.lowers[:] = lowers
+        layer.dump()
+    systemctl.daemon_reload()
+    # XXX propagate to descendants
+    logger.debug("Systemd \"%s\" restarted", layer.unitname)
+    logger.info("\"%s\" updated", layer.name)
 
 
 def starter(name, preserve_others=False, stop_others=False, permanent=False):
-    info = _read_info(name)
-    if _is_automount_active(info):
-        logger.info('%s is already started.', info['name'])
+    layer = build_layer(name)
+    if not layer.is_managed():
+        logger.info("\"%s\" not managed. Nothing to do.", layer.name)
         return
     if not preserve_others:
         to_stop = []
-        for _info in chain(_iter_descendants(info), _iter_ascendants(info)):
-            if _is_automount_active(_info):
-                to_stop.append(_info)
+        for _layer in chain(_iter_descendants(layer), _iter_ascendants(layer)):
+            if not _layer.is_managed():
+                continue
+            if _layer.automountunit.is_active():
+                to_stop.append(_layer)
         if to_stop:
             if not stop_others:
                 logger.error(
                     'You should prefere to stop the following overlays: \n\n%s\n',
-                    ', '.join(i['name'] for i in to_stop))
+                    ', '.join(x.name for x in to_stop))
                 logger.info(
                     'You may want to use `--stop-others`. '
                     'Or see --help for more options.')
                 return
-            for _info in to_stop:
-                _stopper(_info, permanent)
+            for _layer in to_stop:
+                _layer.disable() if permanent else _layer.stop()
             logger.info(
                 'The following overlays were stopped: \n\n'
-                + ', '.join(i['name'] for i in to_stop))
-    sh.systemctl.start(info['unitname'] + '.automount').wait()
-    if permanent:
-        sh.systemctl.enable(info['unitname'] + '.automount').wait()
-        logger.info("Systemd %s.automount unit enabled", info['unitname'])
+                + ', '.join(x.name for x in to_stop))
+    layer.start(permanent)
 
 
-def stoper(name, permanent):
-    _stopper(_read_info(name), permanent)
-
-
-def _stopper(info, permanent):
-    sh.systemctl.stop(info['unitname'] + '.mount').wait()
-    sh.systemctl.stop(info['unitname'] + '.automount').wait()
-    if permanent:
-        sh.systemctl.disable(info['unitname'] + '.automount').wait()
+def stoper(name, permanent=False):
+    layer = build_layer(name)
+    if not layer.is_managed():
+        logger.info("\"%s\" not managed. Nothing to do.", layer.name)
+        return
+    layer.stop(permanent)
 
 
 def lister(
     unit_name, ordered_deps, reversed_deps, no_info, regexp, depends_on, terminal
 ):
     """List managed overlays and display overlay inheritance."""
-    infos = _iter_overlays_info(ordered_deps or reversed_deps)
-    infos = reversed(list(infos)) if reversed_deps else infos
+    layers = _iter_layers(ordered_deps or reversed_deps)
+    layers = reversed(list(layers)) if reversed_deps else layers
     if terminal:
-        infos = list(infos)
-        lowers = set(chain(*(info['lowers'] for info in infos)))
-        infos = (info for info in infos if info['name'] not in lowers)
+        layers = list(layers)
+        lowers = set(chain(*(layer.lowers for layer in layers)))
+        layers = (layer for layer in layers if layer not in lowers)
+    if regexp:
+        match = re.compile(regexp).match
+        layers = (layer for layer in layers if match(layer.name))
     found = False
-    for info in infos:
-        if regexp and not re.match(regexp, info['name']):
-            continue
-        alllowers = _get_linearized_lowerdirs(info['name'], info['lowers'])
-        otherlowers = tuple(lower for lower in alllowers if lower not in info['lowers'])
-        depslowers = set(alllowers) | set(otherlowers)
+    if depends_on:
+        depends_on = [{build_layer(layername) for layername in group} for group in depends_on]
+    for layer in layers:
+        descendants = layer.get_ascendants()
+        grandchildren = tuple(lower for lower in descendants if lower not in layer.lowers)
         if depends_on:
-            if all((depslowers & deps) != deps for deps in depends_on):
+            descendants = set(descendants)
+            if not any(descendants.issuperset(deps) for deps in depends_on):
                 continue
         found = True
-        info['lowers'] = ', '.join(info['lowers'])
-        info['others'] = ', '.join(otherlowers) if otherlowers else ''
+        data = {
+            'lowers': ', '.join(map(str, layer.lowers)) if layer.lowers else '',
+            'others': ', '.join(map(str, grandchildren)) if grandchildren else '',
+            'name': layer.name,
+        }
         fmt = (
             Fore.GREEN
             + Style.BRIGHT
-            + ('{unitname}' if unit_name else '{name}')
+            + '{name}'
             + Style.RESET_ALL
         )
-        try:
-            sh.systemctl.status(info['unitname'] + '.mount')
+        if layer.mountunit.is_active():
             icon = Fore.GREEN + ' 🖴 ' + Style.RESET_ALL
-        except sh.ErrorReturnCode_3:
-            try:
-                sh.systemctl.status(info['unitname'] + '.automount')
-                icon = ' 🖴 '
-            except sh.ErrorReturnCode_3:
-                icon = ''
+        elif layer.automountunit.is_active():
+            icon = ' 🖴 '
+        else:
+            icon = ''
         if not no_info:
             fmt += (
                 icon

          
@@ 533,45 734,57 @@ def lister(
                 + '\N{Rightwards Dashed Arrow} {others}'
                 + Style.RESET_ALL
             )
-        print(fmt.format(**info))
+        print(fmt.format(**data))
     if not found:
         raise NoResult("No overlay matches.")
 
 
-def deplacer(old, new):
+def deplacer(oldname, newname):
     # Checks
-    if old == new:
-        logger.info('Not a new name, nothing to do.')
+    _new = Layer(newname)
+
+    if _new.mountdir.exists():
+        logger.error('"%s" already exists.', newname)
         return
-    existings = list(_iter_overlays_info())
-    names = set(x['name'] for x in existings)
-    if new in names:
-        logger.error('New name "%s" already exists.', new)
+
+    layer = build_layer(oldname)
+
+    if layer == _new:
+        logger.info('Not a new unit name, nothing to do.')
         return
-    superseeders = [x for x in existings if old in x['lowers']]
-    logger.debug('%s overlays affected: %s', len(superseeders),
-                 ', '.join(x['name'] for x in superseeders))
-    mounted = [info['name'] for info in superseeders if _is_mount_active(info)]
-    if mounted:
-        logger.error('The following units must be stopped: %s', ', '.join(mounted))
-    # Move overlay
-    old_info = _read_info(old)
-    new_info = _compute_info(new, old_info['lowers'])
-    for pathname in ('upperdir', 'workdir'):
-        new_info[pathname].parent.mkdir(parents=True, exist_ok=True)
-        old_info[pathname].rename(new_info[pathname])
-    _write_info(new_info)
-    _remove_units(old_info)
-    _name2infopath(old).unlink()
-    # Update existing overlays that depends on the moved one.
-    for info in superseeders:
-        lowers = info['lowers']
-        lowers[lowers.index(old)] = new
-        _write_info(_compute_info(info['name'], lowers))
-    sh.systemctl('daemon-reload').wait()
-    sh.systemctl.start(new_info['automountpath'].name).wait()
-    logger.debug("Systemd %s unit enabled and started", new_info['unitname'])
-    logger.info("%s created as %s", new_info['name'], new_info['unitname'])
+
+    descendants = list(_iter_descendants(layer))
+    logger.debug(
+        '%s overlays affected: %s', len(descendants),
+        ', '.join(x.name for x in descendants)
+    )
+    activated = [x.name for x in descendants if x.automountunit.is_active()]
+    if activated:
+        # XXX add --stop
+        logger.error(
+            'The following units must be stopped: %s', ', '.join(activated))
+
+    is_automonted = layer.automountunit.is_active()
+    if is_automonted:
+        layer.stop()
+
+    # Move overlay dirs.
+    layer.upperdir.rename(_new.upperdir)
+    layer.workdir.rename(_new.workdir)
+
+    layer.delete()
+
+    layer.name = newname
+    layer.dump()
+
+    # Update existing layers that depends on the moved one.
+    for layer in descendants:
+        layer.dump()
+
+    systemctl.daemon_reload()
+
+    if is_automonted:
+        layer.start()
 
 
 def _setup_logger(level):

          
@@ 583,13 796,17 @@ def _setup_logger(level):
     logging.getLogger('sh').setLevel('WARNING')
 
 
+def ensure_directories_exist():
+    """Ensure required folders exist."""
+    for path in (MOUNTDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR):
+        path.mkdir(parents=True, exist_ok=True)
+
+
 def main():
     """Main entry point for the CLI"""
 
     from argparse import ArgumentParser
-
-    for path in (MOUNTDIR, OVERLAYDIR, UPPERSDIR, WORKSDIR, INFOSDIR, UNITSDIR):
-        path.mkdir(parents=True, exist_ok=True)
+    ensure_directories_exist()
 
     parser = ArgumentParser(description="Manage images as overlays for machinectl.")
     parser.add_argument(

          
@@ 786,8 1003,8 @@ def main():
             "edit an overlay created by this tool. "
             "An editor is started if --add nor --remove provided."),
     )
-    edit.add_argument('-p', '--prepand', action='store_true',
-                      help="prepand the following names", )
+    edit.add_argument('-p', '--prepend', action='store_true',
+                      help="prepend the following names", )
     edit.add_argument(
         '-a', '--append', action='append', help="append the following names",
     )

          
@@ 804,7 1021,7 @@ def main():
     )
 
     def _editer(args):
-        editer(args.mountdir, args.append, args.prepand, args.delete)
+        editer(args.mountdir, args.append, args.prepend, args.delete)
 
     edit.set_defaults(func=_editer)
 

          
A => test_overlayctl.py +563 -0
@@ 0,0 1,563 @@ 
+import imp
+import io
+import json
+import os
+import re
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest import TestCase, mock
+
+# Like `import overlayctl` but it does not need a real python lib.
+with io.open('overlayctl') as fobj:
+    overlayctl = imp.load_module(
+        'overlayctl', fobj, 'overlayctl', ('', 'r', imp.PY_SOURCE))
+
+
+class BaseTest(TestCase):
+    """Base TestCase that create temporary to work within."""
+
+    maxDiff = None
+
+    def setUp(self):
+        self._tempdir = TemporaryDirectory()
+        tempdirpath = Path(self._tempdir.__enter__())
+        self.systemctl = overlayctl.systemctl = mock.Mock()
+        self.mountdir = overlayctl.MOUNTDIR = tempdirpath / 'machines'
+        self.unitdir = overlayctl.UNITSDIR = tempdirpath / 'systemd'
+        overlaydir = tempdirpath / 'overlayctl'
+        self.uppersdir = overlayctl.UPPERSDIR = overlaydir / 'upper'
+        self.worksdir = overlayctl.WORKSDIR = overlaydir / 'work'
+        self.infosdir = overlayctl.INFOSDIR = overlaydir / 'info'
+        overlayctl.ensure_directories_exist()
+        overlayctl.build_layer.cache_clear()
+        overlayctl._setup_logger('CRITICAL')
+
+    def tearDown(self):
+        self._tempdir.cleanup()
+
+    def assert_daemon_reload_called(self):
+        self.systemctl.daemon_reload.assert_called()
+
+    def get_unit_name(self, systemd_name):
+        name = '-'.join([
+            str(self.mountdir).replace(os.path.sep, '-'),
+            systemd_name
+        ])
+        return name.lstrip('-')
+
+    def get_mount_dir(self, name):
+        return self.mountdir / name
+
+    def get_upper_dir(self, systemd_name):
+        return self.uppersdir / self.get_unit_name(systemd_name)
+
+    def get_work_dir(self, systemd_name):
+        return self.worksdir / self.get_unit_name(systemd_name)
+
+    def write_info(self, name, lowers):
+        (self.infosdir / self.get_unit_name(name)).write_text(
+            json.dumps({'name': name, 'lowers': lowers}))
+
+    def assert_info_equal(self, name, lowers, systemd_name=''):
+        unitname = self.get_unit_name(systemd_name or name)
+        filepath = self.infosdir / unitname
+        info = json.loads(filepath.read_text())
+        self.assertDictEqual({'name': name, 'lowers': lowers}, info)
+
+    def assert_overlay_dirs_exists(self, systemd_name, name=None):
+        self.assertTrue(self.get_upper_dir(systemd_name).is_dir())
+        self.assertTrue(self.get_work_dir(systemd_name).is_dir())
+        self.assertTrue(self.get_mount_dir(name or systemd_name).is_dir())
+
+    def get_unit_path(self, systemd_name, unittype):
+        return (self.unitdir / self.get_unit_name(systemd_name)).with_suffix('.' + unittype)
+
+    def parse_unit(self, systemd_name, unittype):
+        path = self.get_unit_path(systemd_name, unittype)
+        content = {}
+        target = content
+        sectionmatch = re.compile(r'\[(.*?)\]').match
+        entrymatch = re.compile(r'^\s*(.*?)\s*=\s*(.*?)\s*$').match
+        with path.open(encoding='utf-8') as fobj:
+            for line in fobj:
+                section = sectionmatch(line)
+                if line.lstrip().startswith('#'):
+                    continue
+                if section:
+                    [key] = section.groups()
+                    target = content[key] = {}
+                    continue
+                entry = entrymatch(line)
+                if entry:
+                    key, value = entry.groups()
+                    if key == 'Options':
+                        value = {
+                            key: list(filter(None, folders.split(':')))
+                            for group in value.split(',')
+                            for key, folders in [group.split('=', 1)]}
+                    target[key] = value
+        return content
+
+    def assert_automount_unit_equal(self, systemd_name, expected):
+        result = self.parse_unit(systemd_name, 'automount')
+        self.assertDictEqual(result, expected)
+
+    def assert_mount_unit_equal(self, systemd_name, expected):
+        result = self.parse_unit(systemd_name, 'mount')
+        self.assertDictEqual(result, expected)
+
+    def check_mount_unit(self, layer_name, lower_names):
+        self.assert_mount_unit_equal(layer_name, {
+            'Mount': {
+                'Options': {
+                    'lowerdir': [str(lower_name) for lower_name in lower_names],
+                    'upperdir': [str(self.get_upper_dir(layer_name))],
+                    'workdir': [str(self.get_work_dir(layer_name))]},
+                'Type': 'overlay',
+                'What': 'overlay',
+                'Where': str(self.get_mount_dir(layer_name))},
+            'Unit': {
+                'ConditionPathExists': str(self.get_mount_dir(layer_name)),
+                'Description': '%s Container overlay' % layer_name
+            }})
+
+    def assert_layer_started(self, name):
+        self.systemctl.start.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+    def assert_layer_enabled(self, name):
+        self.systemctl.enable.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+    def assert_layer_stopped(self, name):
+        self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.automount')
+        self.systemctl.stop.assert_any_call(self.get_unit_name(name) + '.mount')
+
+    def assert_layer_disabled(self, name):
+        self.systemctl.disable.assert_called_once_with(self.get_unit_name(name) + '.automount')
+
+
+class TestCreate(BaseTest):
+    """Test the `create` command."""
+
+    def test_creating_a_root_layer(self):
+        # With no dependencies, the layer can be created.
+        # I don't know if it's a good idea for now. But it's allowed.
+        # The layer name is escaped
+        name = 'test-it'
+        systemd_name = r'test\x2dit'
+        overlayctl.creater(name, lowers=(), start=False, permanent=False)
+        self.assert_daemon_reload_called()
+        self.assert_info_equal(name=name, lowers=[], systemd_name=systemd_name)
+        self.assert_overlay_dirs_exists(systemd_name, name)
+        self.assert_automount_unit_equal(systemd_name, {
+            'Automount': {'Where': str(self.get_mount_dir('test-it'))},
+            'Install': {'WantedBy': 'local-fs.target'}
+        })
+        self.assert_mount_unit_equal(systemd_name, {
+            'Mount': {
+                'Options': {
+                    'lowerdir': [],
+                    'upperdir': [str(self.get_upper_dir(r'test\\x2dit'))],
+                    'workdir': [str(self.get_work_dir(r'test\\x2dit'))]},
+                'Type': 'overlay',
+                'What': 'overlay',
+                'Where': str(self.get_mount_dir('test-it'))},
+            'Unit': {
+                'ConditionPathExists': str(self.get_mount_dir('test-it')),
+                'Description': 'test-it Container overlay'
+            }})
+
+    def test_creating_a_simple_layer(self):
+        # With dependencies, the layer is created. The layer can
+        # depend on another layer, an absolute path to a directory or
+        # a name of a folder in the mount directory.
+        self.get_upper_dir('lower1').mkdir()
+        self.write_info('lower1', [])
+        lower2dir = self.mountdir / 'lower2'
+        lower2dir.mkdir()
+        overlayctl.creater(
+            'test', lowers=('lower1', str(lower2dir), 'lower3'))
+        self.assert_daemon_reload_called()
+        self.assert_info_equal('test', lowers=[
+            'lower1', str(lower2dir), 'lower3'])
+        self.assert_overlay_dirs_exists('test')
+        self.assert_automount_unit_equal('test', {
+            'Automount': {'Where': str(self.get_mount_dir('test'))},
+            'Install': {'WantedBy': 'local-fs.target'}
+        })
+        self.check_mount_unit('test', [
+            self.get_upper_dir('lower1'),
+            self.get_mount_dir('lower2'),
+            self.get_mount_dir('lower3')])
+
+    def test_automount_started(self):
+        # with --start, the automount unit is automatically started.
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        self.get_mount_dir('lower').mkdir()
+        overlayctl.creater('test', lowers=('lower',), start=True)
+        self.assert_overlay_dirs_exists('test')
+        self.assert_daemon_reload_called()
+        self.systemctl.start.assert_called_once_with(self.get_unit_name('test') + '.automount')
+
+    def test_automount_enabled(self):
+        # with --start --permanent, the automount unit is automatically started and enabled.
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        self.get_mount_dir('lower').mkdir()
+        overlayctl.creater('test', lowers=('lower',), start=True, permanent=True)
+        self.assert_overlay_dirs_exists('test')
+        self.assert_daemon_reload_called()
+        unit = self.get_unit_name('test') + '.automount'
+        self.systemctl.start.assert_called_once_with(unit)
+        self.systemctl.enable.assert_called_once_with(unit)
+
+    def test_creating_an_advanced_layer(self):
+        # Let's build an app with the following dependency tree.
+        #                        app
+        #           ┌─────────────┼──────────┐
+        #        graphql        flask      nginx
+        #           │             │          │
+        #      sqlalchemy         │       network
+        #       ┌───┴────┐ ┌──────┘
+        # postgresql   python
+        #       └────┬────┘
+        #         archbase
+        #
+        # The lower directories are linearized using the c3 algorithm.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('network', [])
+        overlayctl.creater('nginx', ['network'])
+        overlayctl.creater('postgresql', ['archbase'])
+        overlayctl.creater('python', ['archbase'])
+        overlayctl.creater('flask', ['python'])
+        overlayctl.creater('sqlalchemy', ['postgresql', 'python'])
+        overlayctl.creater('graphql', ['sqlalchemy'])
+        overlayctl.creater('app', ['graphql', 'flask', 'nginx', ])
+        self.check_mount_unit('app', [
+            self.get_upper_dir('graphql'),
+            self.get_upper_dir('flask'),
+            self.get_upper_dir('nginx'),
+            self.get_upper_dir('sqlalchemy'),
+            self.get_upper_dir('network'),
+            self.get_upper_dir('postgresql'),
+            self.get_upper_dir('python'),
+            self.get_mount_dir('archbase'),
+        ])
+
+
+class TestDelete(BaseTest):
+    """Test for the `delete` command."""
+
+    def test_delete_a_layer(self):
+        # Deleting a layer cleans up the upper/work directories, the
+        # info file, the .automount and the .mount systemd. unit file.
+        # The systemd units are stopped and disabled.
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.creater('test', lowers=('lower',), start=True, permanent=True)
+        automountunit = self.get_unit_name('test') + '.automount'
+        mountunit = self.get_unit_name('test') + '.mount'
+        self.assert_overlay_dirs_exists('test')
+        self.systemctl.start.assert_called_once_with(automountunit)
+        self.systemctl.enable.assert_called_once_with(automountunit)
+        self.systemctl.status.return_value = {'Active': 'active'}
+        overlayctl.deleter('test')
+        self.assertFalse(self.get_upper_dir('test').exists())
+        self.assertFalse(self.get_work_dir('test').exists())
+        self.assertFalse(self.get_mount_dir('test').exists())
+        self.assertFalse(self.get_unit_path('test', 'automount').exists())
+        self.assertFalse(self.get_unit_path('test', 'automount').exists())
+        self.systemctl.stop.assert_any_call(mountunit)
+        self.systemctl.stop.assert_any_call(automountunit)
+        self.systemctl.disable.assert_any_call(automountunit)
+
+    def test_cannot_delete_unmanaged_layer(self):
+        # Unmanaged layer cannot be deleted.
+        archbase = self.get_mount_dir('archbase')
+        archbase.mkdir()
+        overlayctl.deleter('archbase')
+        self.assertTrue(archbase.exists())
+
+    def test_cannot_delete_lower_layer(self):
+        # Its not allowed to delete a layer having descendants managed layer.
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        overlayctl.creater('layer2', ['layer1'])
+        with self.assertRaises(overlayctl.DeletionError):
+            overlayctl.deleter('layer1')
+        self.assertTrue(self.get_unit_path('layer1', 'mount').exists())
+        # But we can delete a layer on top of unmanaged layers.
+        overlayctl.deleter('layer2')
+        overlayctl.deleter('layer1')
+
+
+class TestStart(BaseTest):
+    """Tests for the `start` command."""
+
+    def test_starting_a_layer(self):
+        # Starting a layer starts the automount unit. Mounting is
+        # handled by systemd on first access to the folder (a.k.a `machinectl start`).
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        self.systemctl.reset_mock()
+        overlayctl.starter('layer1')
+        self.assert_layer_started('layer1')
+        self.systemctl.enable.assert_not_called()
+        self.systemctl.stop.assert_not_called()
+
+    def test_enabling_layer(self):
+        # The user can make change permanent after reboot using the
+        # --permanent option.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        self.systemctl.reset_mock()
+        overlayctl.starter('layer1', permanent=True)
+        self.assert_layer_started('layer1')
+        self.assert_layer_enabled('layer1')
+
+    def test_cannot_start_an_unmanaged_layer(self):
+        # Starting an unmanaged layer does nothing.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.starter('archbase')
+        self.systemctl.start.assert_not_called()
+        self.systemctl.enable.assert_not_called()
+
+    def test_cannot_start_a_lower_with_active_descendants(self):
+        # By default, a layer with active_descendants cannot be started
+        # because overlayFS may broke thing.
+        self.systemctl.status.side_effect = [{'Active': 'active'}, {'Active': 'inactive'}]
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        overlayctl.creater('layer2', ['layer1'])
+        self.systemctl.reset_mock()
+        overlayctl.starter('layer1')
+        self.systemctl.start.assert_not_called()
+
+    def test_user_can_force_to_start_a_lower_with_active_descendants(self):
+        # A layer with active_descendants can be started with the
+        # --preserve_others option. This may be dangereous, but the
+        # user is the master.
+        self.systemctl.status.side_effect = [{'Active': 'inactive'}]
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        overlayctl.creater('layer2', ['layer1'])
+        self.systemctl.reset_mock()
+        overlayctl.starter('layer1', preserve_others=True)
+        self.assert_layer_started('layer1')
+
+    def test_starting_stops_descendants_if_needed(self):
+        # Active descendant layers can be stopped automatically before
+        # starting the layer with the --stop-others options.
+        self.systemctl.status.side_effect = [
+            {'Active': 'active'}, {'Active': 'active'},
+            {'Active': 'inactive'}]
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        overlayctl.creater('layer2', ['layer1'])
+        overlayctl.creater('layer3', ['layer2'])
+        self.systemctl.reset_mock()
+        overlayctl.starter('layer1', stop_others=True)
+        self.assert_layer_stopped('layer2')
+        self.assert_layer_stopped('layer3')
+        self.assert_layer_started('layer1')
+
+
+class TestStop(BaseTest):
+    """Tests for the `stop` command."""
+
+    def test_stop_a_layer(self):
+        # Simply use the `stop` command to unmount the overlayfs and
+        # disable the automount unit.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.side_effect = [{'Active': 'active'}]
+        overlayctl.stoper('layer1')
+        self.assert_layer_stopped('layer1')
+        self.systemctl.disable.assert_not_called()
+
+    def test_stop_a_layer_permanently(self):
+        # The user can disable automounting on reboot with the --permanent option.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer1', ['archbase'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.side_effect = [{'Active': 'active'}]
+        overlayctl.stoper('layer1', permanent=True)
+        self.assert_layer_stopped('layer1')
+        self.assert_layer_disabled('layer1')
+
+
+class TestEdit(BaseTest):
+
+    def test_prepending_lowers(self):
+        # User can augment lowers with the --prepend option.
+        self.get_mount_dir('archbase').mkdir()
+        overlayctl.creater('layer0', ['archbase'])
+        overlayctl.creater('layer1', ['archbase'])
+        overlayctl.creater('layer2', ['archbase'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.editer('layer2', appended=['layer0', 'layer1'])
+        self.assert_daemon_reload_called()
+        self.check_mount_unit('layer2', [
+            self.get_upper_dir('layer0'),
+            self.get_upper_dir('layer1'),
+            self.get_mount_dir('archbase'),
+        ])
+
+    def test_appending_lowers(self):
+        # User can augment the overlay bases with the --append option.
+        self.get_mount_dir('base1').mkdir()
+        self.get_mount_dir('base2').mkdir()
+        overlayctl.creater('layer', [])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.editer('layer', appended=['base1', 'base2'])
+        self.assert_daemon_reload_called()
+        self.check_mount_unit('layer', [
+            self.get_mount_dir('base1'),
+            self.get_mount_dir('base2'),
+        ])
+
+    def test_removing_lowers(self):
+        # User can reduce the lower layers with the --remove option.
+        self.get_mount_dir('base').mkdir()
+        overlayctl.creater('autologin', [])
+        overlayctl.creater('network', [])
+        overlayctl.creater('layer', ['network', 'autologin', 'base'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.editer('layer', removed=['autologin', 'network'])
+        self.assert_daemon_reload_called()
+        self.check_mount_unit('layer', [self.get_mount_dir('base')])
+
+    def test_moving_a_lower(self):
+        # User can use both use --remove and --append/--prepend.
+        self.get_mount_dir('base1').mkdir()
+        self.get_mount_dir('base2').mkdir()
+        overlayctl.creater('layer', ['base1', 'base2'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.editer('layer', removed=['base2'], prepended=['base2'])
+        self.assert_daemon_reload_called()
+        self.check_mount_unit('layer', [
+            self.get_mount_dir('base2'),
+            self.get_mount_dir('base1'),
+        ])
+
+    def test_with_editor(self):
+        # The user's editor is started with no options.
+        # In the editor, user can move, add and remove lower freely.
+        self.get_mount_dir('base1').mkdir()
+        self.get_mount_dir('base2').mkdir()
+        overlayctl.creater('lower1', [])
+        overlayctl.creater('lower2', [])
+        overlayctl.creater('lower3', [])
+        overlayctl.creater('layer', ['lower2', 'lower1', 'base1', 'base2'])
+        self.systemctl.reset_mock()
+
+        def check_and_edit_file(filename):
+            filepath = Path(filename)
+            lowers = [line for line in filepath.read_text().splitlines()
+                      if not line.startswith('#')]
+            self.assertSequenceEqual(lowers, ['lower2', 'lower1', 'base1', 'base2'])
+            filepath.write_text('\n'.join(['lower1', 'lower2', 'lower3', 'base1']))
+
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        with mock.patch('overlayctl._get_editor_on_file', new=check_and_edit_file):
+            overlayctl.editer('layer')
+        self.check_mount_unit('layer', [
+            self.get_upper_dir('lower1'),
+            self.get_upper_dir('lower2'),
+            self.get_upper_dir('lower3'),
+            self.get_mount_dir('base1'),
+        ])
+
+
+class TestMove(BaseTest):
+    """Tests for the `move` command."""
+
+    def test_moving_a_terminal_layer(self):
+        # Simple case when the user moves a layer without descendants.
+        # So, we have nothing to propagate.
+        self.get_mount_dir('base').mkdir()
+        overlayctl.creater('lower', ['base'])
+        overlayctl.creater('layer', ['lower'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.deplacer('layer', 'newlayer')
+        self.assert_layer_not_exist('layer')
+        self.assert_layer_exists('newlayer', [
+            self.get_upper_dir('lower'),
+            self.get_mount_dir('base'),
+        ])
+        self.assert_daemon_reload_called()
+
+    def test_descendant_of_moved_are_updated(self):
+        # When moving a layer, its descendants must be updated accordingly.
+        self.get_mount_dir('base').mkdir()
+        overlayctl.creater('layer1', ['base'])
+        overlayctl.creater('layer2', ['layer1'])
+        overlayctl.creater('layer3', ['layer2'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'inactive'}
+        overlayctl.deplacer('layer1', 'renamed')
+        self.assert_layer_not_exist('layer1')
+        self.assert_layer_exists('renamed', [self.get_mount_dir('base')])
+        self.assert_layer_exists('layer2', [
+            self.get_upper_dir('renamed'),
+            self.get_mount_dir('base'),
+        ])
+        self.assert_layer_exists('layer3', [
+            self.get_upper_dir('layer2'),
+            self.get_upper_dir('renamed'),
+            self.get_mount_dir('base'),
+        ])
+        self.assert_daemon_reload_called()
+
+    def test_moved_layer_is_restarted(self):
+        # Moving an activated terminal layer is ok, It's still
+        # activated at the end.
+        self.get_mount_dir('base').mkdir()
+        overlayctl.creater('layer1', ['base'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'active'}
+        overlayctl.deplacer('layer1', 'renamed')
+        self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.mount')
+        self.systemctl.stop.assert_any_call(self.get_unit_name('layer1') + '.automount')
+        self.systemctl.start.assert_any_call(self.get_unit_name('renamed') + '.automount')
+
+    def test_cannot_rename_if_new_name_exists(self):
+        # Do nothing if the new name is already known.
+        self.get_mount_dir('lower1').mkdir()
+        overlayctl.creater('lower2', [])
+        self.systemctl.reset_mock()
+        overlayctl.deplacer('lower2', 'lower1')
+        self.assertTrue(self.get_unit_path('lower2', 'mount').exists())
+        self.assertTrue(self.get_mount_dir('lower2').exists())
+        self.assertTrue(self.get_mount_dir('lower1').exists())
+
+    def test_cannot_rename_if_descandant_is_activated(self):
+        # OverlayFS does not like when lower directories moves, so,
+        # user cannot move a layer having an active descandant.
+        self.get_mount_dir('base').mkdir()
+        overlayctl.creater('lower', ['base'])
+        overlayctl.creater('layer', ['lower'])
+        self.systemctl.reset_mock()
+        self.systemctl.status.return_value = {'Active': 'active'}
+        overlayctl.deplacer('lower', 'newlower')
+
+    def assert_layer_not_exist(self, name):
+        self.assertFalse(self.get_unit_path(name, 'mount').exists())
+        self.assertFalse(self.get_unit_path(name, 'automount').exists())
+        self.assertFalse(self.get_mount_dir(name).exists())
+        self.assertFalse(self.get_upper_dir(name).exists())
+        self.assertFalse(self.get_work_dir(name).exists())
+
+    def assert_layer_exists(self, name, lowers):
+        self.assertTrue(self.get_mount_dir(name).exists())
+        self.assertTrue(self.get_upper_dir(name).exists())
+        self.assertTrue(self.get_work_dir(name).exists())
+        self.assert_automount_unit_equal(name, {
+            'Automount': {'Where': str(self.get_mount_dir(name))},
+            'Install': {'WantedBy': 'local-fs.target'}
+        })
+        self.check_mount_unit(name, lowers)