M rework/monitor.py +33 -10
@@ 8,13 8,17 @@ import traceback as tb
from pathlib import Path
import sys
import pickle
+from threading import (
+ Event,
+ Thread
+)
import tzlocal
import pytz
import psutil
-from apscheduler.schedulers.background import BackgroundScheduler
from sqlhelp import select, insert, update
+from rework.sched import schedulerservice
from rework.helper import (
BetterCronTrigger,
cpu_usage,
@@ 89,13 93,32 @@ class monstats:
__repr__ = __str__
+def _schedule(engine, opname, domain, inputdata, host, meta):
+ def wrapped():
+ print(f'scheduling {opname} {domain}')
+ t = api.schedule(
+ engine,
+ opname,
+ rawinputdata=inputdata if inputdata else None,
+ hostid=None,
+ domain=domain or 'default',
+ metadata=meta
+ )
+ print(f'scheduled {opname} {domain} -> {t}')
+ # we must wait there to make sure
+ # apscheduler understands this is not finished
+ t.join()
+ print(f'scheduled {opname} {domain} -> {t} ended')
+ return wrapped
+
+
class scheduler:
__slots__ = ('engine', 'domain', 'sched', 'defs')
def __init__(self, engine, domain):
self.engine = engine
self.domain = domain
- self.sched = BackgroundScheduler()
+ self.sched = schedulerservice()
self.sched.start() # start empty
self.defs = []
@@ 106,16 129,16 @@ class scheduler:
self.sched.shutdown()
def schedule(self, opname, domain, inputdata, host, meta, rule):
- self.sched.add_job(
- lambda: api.schedule(
+ job = self.sched.add_job(
+ _schedule(
self.engine,
opname,
- rawinputdata=inputdata if inputdata else None,
- hostid=host,
- domain=domain,
- metadata=meta
+ domain,
+ inputdata,
+ host,
+ meta
),
- trigger=BetterCronTrigger.from_extended_crontab(rule)
+ trigger=BetterCronTrigger.from_extended_crontab(rule),
)
def loop(self):
@@ 124,7 147,7 @@ class scheduler:
# reload everything
self.stop()
print(f'scheduler: reloading definitions for {self.domain}')
- self.sched = BackgroundScheduler()
+ self.sched = schedulerservice()
for operation, domain, inputdata, host, meta, rule in defs:
self.schedule(operation, domain, inputdata, host, meta, rule)
self.defs = defs
A => rework/sched.py +128 -0
@@ 0,0 1,128 @@
+import queue
+import threading
+from concurrent.futures import _base
+
+from apscheduler.executors import pool
+from apscheduler.schedulers.blocking import (
+ BaseScheduler,
+ BlockingScheduler
+)
+
+# this module contains two parts
+# a) a vendored stripped down, less buggy version of the
+# stdlib concurrent.futures.ThreadPoolExecutor
+# b) a specialisation of apscheduler executor and scheduler
+# using the former (to avoid a deadlock at shutdown time)
+
+
+# this is a)
+
+class Stop:
+ pass
+
+
+class _WorkItem(object):
+ def __init__(self, future, fn, args, kwargs):
+ self.future = future
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ def run(self):
+ try:
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException as exc:
+ self.future.set_exception(exc)
+ else:
+ self.future.set_result(result)
+
+
+class _ThreadPoolExecutor:
+
+ def __init__(self, max_workers):
+ self._max_workers = max_workers
+ self._work_queue = queue.SimpleQueue()
+ self._threads = set()
+ self._shutdown = False
+ self._shutdown_lock = threading.Lock()
+
+ def _worker(self):
+ while True:
+ work_item = self._work_queue.get(block=True)
+ if work_item is Stop:
+ # allow the other workers to get it
+ self._work_queue.put(Stop)
+ return
+ work_item.run()
+
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after shutdown')
+
+ f = _base.Future()
+
+ self._work_queue.put(_WorkItem(f, fn, args, kwargs))
+ num_threads = len(self._threads)
+ if num_threads < self._max_workers:
+ thread_name = f'{self}_{num_threads}'
+ t = threading.Thread(target=self._worker)
+ t.start()
+ self._threads.add(t)
+ return f
+
+ def shutdown(self, *a):
+ with self._shutdown_lock:
+ self._shutdown = True
+ self._work_queue.put(Stop)
+ for t in self._threads:
+ t.join(timeout=1)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+ return False
+
+
+# this is b)
+# there we provide the appscheduler pool executor
+# wrapping the above ThreadPoolExecutor
+
+class threadpoolexecutor(pool.BasePoolExecutor):
+
+ def __init__(self, max_workers=10, pool_kwargs=None):
+ pool_kwargs = pool_kwargs or {}
+ pool = _ThreadPoolExecutor(int(max_workers), **pool_kwargs)
+ super().__init__(pool)
+
+
+# this is an alternative implementation of the BackgroundScheduler
+# using our simplified ThreadPoolExecutor
+
+class schedulerservice(BlockingScheduler):
+
+ def _create_default_executor(self):
+ # here
+ return threadpoolexecutor()
+
+ def start(self, *args, **kwargs):
+ if self._event is None or self._event.is_set():
+ self._event = threading.Event()
+
+ BaseScheduler.start(self, *args, **kwargs)
+ self._thread = threading.Thread(
+ target=self._main_loop,
+ name='ReworkScheduler'
+ )
+ self._thread.daemon = True
+ self._thread.start()
+
+ def shutdown(self, *args, **kwargs):
+ super().shutdown(*args, **kwargs)
+ # timeout=1
+ # that's because we might have queued / unfinished things running
+ # below us (by design, see monitor._schedule)
+ self._thread.join(timeout=1)
+ del self._thread
M tests/test_monitor.py +23 -6
@@ 688,13 688,30 @@ def test_scheduled_overlap(engine, clean
rule='* * * * * *',
_anyrule=True
)
- with workers(engine) as mon:
- mon.step()
- time.sleep(1)
+ api.prepare(
+ engine,
+ 'infinite_loop_long_timeout',
+ rule='* * * * * *',
+ _anyrule=True
+ )
+ with workers(engine, numworkers=2) as mon:
mon.step()
- time.sleep(1)
- nbtasks = engine.execute('select count (*) from rework.task').scalar()
- assert nbtasks == 2
+ time.sleep(2)
+
+ nbtasks = engine.execute(
+ 'select count(t.id) '
+ 'from rework.task as t, rework.operation as o '
+ 'where t.operation = o.id and '
+ ' o.name = \'infinite_loop\''
+ ).scalar()
+ assert nbtasks == 1
+ nbtasks = engine.execute(
+ 'select count(t.id) '
+ 'from rework.task as t, rework.operation as o '
+ 'where t.operation = o.id and '
+ ' o.name = \'infinite_loop_long_timeout\''
+ ).scalar()
+ assert nbtasks == 1
def test_with_outputs(engine, cleanup):