@@ 397,6 397,25 @@ def filterio(specs, operation, domain=No
return out[0]
+def prepared(engine, operation, domain):
+ q = (
+ 'select s.id, s.inputdata, s.metadata, s.rule '
+ 'from rework.sched as s, rework.operation as o '
+ 'where s.domain = %(domain)s and '
+ ' s.operation = o.id and '
+ ' o.name = %(operation)s'
+ )
+ return [
+ (sid, nary_unpack(inp), meta, rule)
+ for sid, inp, meta, rule in
+ engine.execute(
+ q,
+ operation=operation,
+ domain=domain
+ ).fetchall()
+ ]
+
+
# binary serializer
def nary_pack(*bytestr):
@@ 6,6 6,7 @@ from rework.helper import (
filterio,
iospec,
host,
+ prepared,
unpack_io,
unpack_iofile,
unpack_iofiles_length
@@ 402,6 403,38 @@ def test_prepare_with_inputs(engine, cle
unpacked_file = unpack_iofile(spec, inputdata, 'myfile.txt')
assert unpacked_file == b'some file'
+ prep = prepared(engine, 'yummy', 'default')
+ assert prep == [
+ (4,
+ (b'myfile.txt',
+ b'weight',
+ b'birthdate',
+ b'sometime',
+ b'name',
+ b'option',
+ b'some file',
+ b'65',
+ b'1973-05-20T09:00:00',
+ b'(date "1973-5-20")',
+ b'Babar',
+ b'foo'),
+ {'user': 'Babar'},
+ '* * * * * *'),
+ (5,
+ (b'myfile.txt',
+ b'weight',
+ b'birthdate',
+ b'name',
+ b'option',
+ b'some file',
+ b'65',
+ b'1973-05-20T00:00:00',
+ b'Babar',
+ b'foo'),
+ None,
+ '* * * * * *')
+ ]
+
def test_prepare_inputs_nr_domain_mismatch(engine, cleanup):
reset_ops(engine)