scheduler: avoid overlapping scheduling of the same tasks

For this we fundamentally use the `max_instances = 1` behaviour of
the base apscheduler.

We need to give add_job a function that runs until the end. This will
enable appscheduler to not schedule a new task if the previous is not
finnished.
3 files changed, 184 insertions(+), 16 deletions(-)

M rework/monitor.py
A => rework/sched.py
M tests/test_monitor.py
M rework/monitor.py +33 -10
@@ 8,13 8,17 @@ import traceback as tb
 from pathlib import Path
 import sys
 import pickle
+from threading import (
+    Event,
+    Thread
+)
 
 import tzlocal
 import pytz
 import psutil
-from apscheduler.schedulers.background import BackgroundScheduler
 from sqlhelp import select, insert, update
 
+from rework.sched import schedulerservice
 from rework.helper import (
     BetterCronTrigger,
     cpu_usage,

          
@@ 89,13 93,32 @@ class monstats:
     __repr__ = __str__
 
 
+def _schedule(engine, opname, domain, inputdata, host, meta):
+    def wrapped():
+        print(f'scheduling {opname} {domain}')
+        t = api.schedule(
+            engine,
+            opname,
+            rawinputdata=inputdata if inputdata else None,
+            hostid=None,
+            domain=domain or 'default',
+            metadata=meta
+        )
+        print(f'scheduled {opname} {domain} -> {t}')
+        # we must wait there to make sure
+        # apscheduler understands this is not finished
+        t.join()
+        print(f'scheduled {opname} {domain} -> {t} ended')
+    return wrapped
+
+
 class scheduler:
     __slots__ = ('engine', 'domain', 'sched', 'defs')
 
     def __init__(self, engine, domain):
         self.engine = engine
         self.domain = domain
-        self.sched = BackgroundScheduler()
+        self.sched = schedulerservice()
         self.sched.start()  # start empty
         self.defs = []
 

          
@@ 106,16 129,16 @@ class scheduler:
         self.sched.shutdown()
 
     def schedule(self, opname, domain, inputdata, host, meta, rule):
-        self.sched.add_job(
-            lambda: api.schedule(
+        job = self.sched.add_job(
+            _schedule(
                 self.engine,
                 opname,
-                rawinputdata=inputdata if inputdata else None,
-                hostid=host,
-                domain=domain,
-                metadata=meta
+                domain,
+                inputdata,
+                host,
+                meta
             ),
-            trigger=BetterCronTrigger.from_extended_crontab(rule)
+            trigger=BetterCronTrigger.from_extended_crontab(rule),
         )
 
     def loop(self):

          
@@ 124,7 147,7 @@ class scheduler:
             # reload everything
             self.stop()
             print(f'scheduler: reloading definitions for {self.domain}')
-            self.sched = BackgroundScheduler()
+            self.sched = schedulerservice()
             for operation, domain, inputdata, host, meta, rule in defs:
                 self.schedule(operation, domain, inputdata, host, meta, rule)
             self.defs = defs

          
A => rework/sched.py +128 -0
@@ 0,0 1,128 @@ 
+import queue
+import threading
+from concurrent.futures import _base
+
+from apscheduler.executors import pool
+from apscheduler.schedulers.blocking import (
+    BaseScheduler,
+    BlockingScheduler
+)
+
+# this module contains two parts
+# a) a vendored stripped down, less buggy version of the
+#    stdlib concurrent.futures.ThreadPoolExecutor
+# b) a specialisation of apscheduler executor and scheduler
+#    using the former (to avoid a deadlock at shutdown time)
+
+
+# this is a)
+
+class Stop:
+    pass
+
+
+class _WorkItem(object):
+    def __init__(self, future, fn, args, kwargs):
+        self.future = future
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        try:
+            result = self.fn(*self.args, **self.kwargs)
+        except BaseException as exc:
+            self.future.set_exception(exc)
+        else:
+            self.future.set_result(result)
+
+
+class _ThreadPoolExecutor:
+
+    def __init__(self, max_workers):
+        self._max_workers = max_workers
+        self._work_queue = queue.SimpleQueue()
+        self._threads = set()
+        self._shutdown = False
+        self._shutdown_lock = threading.Lock()
+
+    def _worker(self):
+        while True:
+            work_item = self._work_queue.get(block=True)
+            if work_item is Stop:
+                # allow the other workers to get it
+                self._work_queue.put(Stop)
+                return
+            work_item.run()
+
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            f = _base.Future()
+
+            self._work_queue.put(_WorkItem(f, fn, args, kwargs))
+            num_threads = len(self._threads)
+            if num_threads < self._max_workers:
+                thread_name = f'{self}_{num_threads}'
+                t = threading.Thread(target=self._worker)
+                t.start()
+                self._threads.add(t)
+            return f
+
+    def shutdown(self, *a):
+        with self._shutdown_lock:
+            self._shutdown = True
+            self._work_queue.put(Stop)
+            for t in self._threads:
+                t.join(timeout=1)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.shutdown()
+        return False
+
+
+# this is b)
+# there we provide the appscheduler pool executor
+# wrapping the above ThreadPoolExecutor
+
+class threadpoolexecutor(pool.BasePoolExecutor):
+
+    def __init__(self, max_workers=10, pool_kwargs=None):
+        pool_kwargs = pool_kwargs or {}
+        pool = _ThreadPoolExecutor(int(max_workers), **pool_kwargs)
+        super().__init__(pool)
+
+
+# this is an alternative implementation of the BackgroundScheduler
+# using our simplified ThreadPoolExecutor
+
+class schedulerservice(BlockingScheduler):
+
+    def _create_default_executor(self):
+        # here
+        return threadpoolexecutor()
+
+    def start(self, *args, **kwargs):
+        if self._event is None or self._event.is_set():
+            self._event = threading.Event()
+
+        BaseScheduler.start(self, *args, **kwargs)
+        self._thread = threading.Thread(
+            target=self._main_loop,
+            name='ReworkScheduler'
+        )
+        self._thread.daemon = True
+        self._thread.start()
+
+    def shutdown(self, *args, **kwargs):
+        super().shutdown(*args, **kwargs)
+        # timeout=1
+        # that's because we might have queued / unfinished things running
+        # below us (by design, see monitor._schedule)
+        self._thread.join(timeout=1)
+        del self._thread

          
M tests/test_monitor.py +23 -6
@@ 688,13 688,30 @@ def test_scheduled_overlap(engine, clean
         rule='* * * * * *',
         _anyrule=True
     )
-    with workers(engine) as mon:
-        mon.step()
-        time.sleep(1)
+    api.prepare(
+        engine,
+        'infinite_loop_long_timeout',
+        rule='* * * * * *',
+        _anyrule=True
+    )
+    with workers(engine, numworkers=2) as mon:
         mon.step()
-        time.sleep(1)
-        nbtasks = engine.execute('select count (*) from rework.task').scalar()
-        assert nbtasks == 2
+        time.sleep(2)
+
+    nbtasks = engine.execute(
+        'select count(t.id) '
+        'from rework.task as t, rework.operation as o '
+        'where t.operation = o.id and '
+        '      o.name = \'infinite_loop\''
+    ).scalar()
+    assert nbtasks == 1
+    nbtasks = engine.execute(
+        'select count(t.id) '
+        'from rework.task as t, rework.operation as o '
+        'where t.operation = o.id and '
+        '      o.name = \'infinite_loop_long_timeout\''
+    ).scalar()
+    assert nbtasks == 1
 
 
 def test_with_outputs(engine, cleanup):