cli/scheduler: allow to export/import schedule lists

This is done with pickles and the purpose is to make
migrations easier, not long-term archival.
4 files changed, 114 insertions(+), 4 deletions(-)

M rework/api.py
M rework/cli.py
M rework/testutils.py
M tests/test_cli.py
M rework/api.py +3 -3
@@ 175,7 175,8 @@ def prepare(engine,
             domain='default',
             inputdata=None,
             host=None,
-            metadata=None):
+            metadata=None,
+            rawinputdata=None):
     if metadata:
         assert isinstance(metadata, dict)
 

          
@@ 183,8 184,7 @@ def prepare(engine,
     BetterCronTrigger.from_extended_crontab(rule)
 
     spec = filterinput(inputspec(engine), opname, domain, host)
-    rawinputdata = None
-    if inputdata:
+    if rawinputdata is None and inputdata:
         if spec is not None:
             rawinputdata = pack_inputs(spec, inputdata)
         else:

          
M rework/cli.py +52 -0
@@ 2,10 2,12 @@ import imp
 from time import sleep
 import tzlocal
 from pathlib import Path
+import pickle
 
 import click
 from colorama import init, Fore, Style
 from pkg_resources import iter_entry_points
+import zstd
 
 from sqlalchemy import create_engine
 from sqlhelp import update, select

          
@@ 329,6 331,56 @@ def list_scheduled(dburi):
     print(Style.RESET_ALL)
 
 
+@rework.command(name='export-scheduled')
+@click.argument('dburi')
+@click.option('--path', default='rework.sched')
+def export_scheduled(dburi, path):
+    engine = create_engine(find_dburi(dburi))
+    sql = (
+        'select op.name, s.domain, s.inputdata, s.host, s.metadata, s.rule '
+        'from rework.sched as s, rework.operation as op '
+        'where s.operation = op.id'
+    )
+    inputs = []
+    for row in engine.execute(sql):
+        inp = {}
+        for k, v in row.items():
+            if isinstance(v, memoryview):
+                v = v.tobytes()
+            inp[k] = v
+        inputs.append(inp)
+
+    Path(path).write_bytes(
+        zstd.compress(
+            pickle.dumps(inputs)
+        )
+    )
+    print(f'saved {len(inputs)} entries into {path}')
+
+
+@rework.command(name='import-scheduled')
+@click.argument('dburi')
+@click.option('--path', default='rework.sched')
+def import_scheduled(dburi, path):
+    engine = create_engine(find_dburi(dburi))
+    inputs = pickle.loads(
+        zstd.decompress(
+            Path(path).read_bytes()
+        )
+    )
+    for row in inputs:
+        api.prepare(
+            engine,
+            row['name'],
+            row['rule'],
+            row['domain'],
+            host=row['host'],
+            metadata=row['metadata'],
+            rawinputdata=row['inputdata']
+        )
+    print(f'loaded {len(inputs)} entries from {path}')
+
+
 @rework.command('vacuum')
 @click.argument('dburi')
 @click.option('--workers', is_flag=True, default=False)

          
M rework/testutils.py +12 -0
@@ 1,4 1,7 @@ 
 from contextlib import contextmanager
+from pathlib import Path
+import tempfile
+import shutil
 
 from rework import api
 

          
@@ 43,3 46,12 @@ def scrub(anstr, subst='X'):
 
 def tasks(engine):
     return engine.execute('select * from rework.task').fetchall()
+
+
+@contextmanager
+def tempdir(suffix='', prefix='tmp'):
+    tmp = tempfile.mkdtemp(suffix=suffix, prefix=prefix)
+    try:
+        yield Path(tmp)
+    finally:
+        shutil.rmtree(tmp)

          
M tests/test_cli.py +47 -1
@@ 5,7 5,11 @@ from pathlib import Path
 import pytest
 from sqlhelp import insert
 from rework import api
-from rework.testutils import scrub, workers
+from rework.testutils import (
+    scrub,
+    tempdir,
+    workers
+)
 from rework.helper import guard, wait_true
 from rework.task import Task
 

          
@@ 599,3 603,45 @@ def test_scheduler_with_inputs(engine, c
         '[<X>-<X>-<X> <X>:<X>:<X>.<X>+<X>] → '
         '[<X>-<X>-<X> <X>:<X>:<X>.<X>+<X>]'
     )
+
+
+def test_scheduler_export_import(engine, cli, cleanup):
+    r = cli('list-scheduled', engine.url)
+    assert len(r.output.strip()) == 0
+
+    sid = api.prepare(
+        engine,
+        'print_sleep_and_go_away',
+        inputdata='HELLO'
+    )
+    sid2 = api.prepare(
+        engine,
+        'fancy_inputs',
+        inputdata={
+            'myfile': b'file contents',
+            'foo': 42,
+            'bar': 'Hello'
+        }
+    )
+
+    r = cli('list-scheduled', engine.url)
+    assert r.output.strip() == (
+        f'{sid} `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
+        f'{sid2} `fancy_inputs` default `no host` `no meta` "* * * * * *"'
+    )
+
+    with tempdir() as path:
+        r0 = cli('export-scheduled', engine.url, path=str(path / 'rework.sched'))
+        assert r0.output.startswith('saved 2 entries into')
+
+        with engine.begin() as cn:
+            cn.execute('delete from rework.sched')
+
+        r1 = cli('import-scheduled', engine.url, path=str(path / 'rework.sched'))
+        assert r1.output.startswith('loaded 2 entries from')
+
+    r = cli('list-scheduled', engine.url)
+    assert scrub(r.output).strip() == (
+        f'<X> `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
+        f'<X> `fancy_inputs` default `no host` `no meta` "* * * * * *"'
+    )