cli/monitor: add a --vacuum parameter

The vaccuming of old tasks is now managed by the monitor itself.
Hence there is no need for a cron job to manage it.
2 files changed, 20 insertions(+), 3 deletions(-)

M rework/cli.py
M rework/monitor.py
M rework/cli.py +1 -0
@@ 151,6 151,7 @@ def new_worker(**config):
 @click.option('--maxmem', type=int, default=0,
               help='shutdown on Mb consummed')
 @click.option('--domain', default='default')
+@click.option('--vacuum', default='P1D')
 @click.option('--debug', is_flag=True, default=False)
 @click.option('--debug-port', type=int, default=0)
 @click.option('--start-timeout', type=int, default=30)

          
M rework/monitor.py +19 -3
@@ 18,6 18,8 @@ import psutil
 from sqlhelp import select, insert, update
 
 from rework.helper import (
+    cleanup_tasks,
+    cleanup_workers,
     cpu_usage,
     host,
     kill_process_tree,

          
@@ 216,12 218,13 @@ class Monitor:
                  'maxruns', 'maxmem', 'debugport',
                  'workers', 'host', 'monid',
                  'start_timeout', 'debugfile',
-                 'pending_start', 'scheduler')
+                 'pending_start', 'scheduler',
+                 'vacuum', 'lastvacuum')
 
     def __init__(self, engine, logger=None, domain='default',
                  minworkers=None, maxworkers=2,
                  maxruns=0, maxmem=0, debug=False,
-                 debug_port=0,
+                 debug_port=0, vacuum='P1D',
                  start_timeout=30, debugfile=None):
         self.engine = engine
         self.logger = logger or setuplogger()

          
@@ 232,6 235,8 @@ class Monitor:
         self.maxruns = maxruns
         self.maxmem = maxmem
         self.debugport = 6666 or debug_port if debug else 0
+        self.lastvacuum = datetime.now()
+        self.vacuum = isodate.parse_duration(vacuum)
         self.workers = {}
         self.host = host()
         self.start_timeout = start_timeout

          
@@ 582,7 587,17 @@ class Monitor:
             update('rework.monitor').where(id=self.monid).values(
                 lastseen=utcnow().astimezone(TZ)
             ).do(cn)
-

+
+    def autovacuum(self):
+        now = datetime.now()
+        if (now - self.lastvacuum) < timedelta(hours=1):
+            return
+        finished = now - self.vacuum
+        tasks = cleanup_tasks(self.engine, finished, self.domain)
+        workers = cleanup_workers(self.engine, finished, self.domain)
+        self.logger.info(f'mon: vacuumed {tasks} tasks and {workers} workers')
+        self.lastvacuum = now
+
     def unregister(self):
         assert self.monid
         with self.engine.begin() as cn:

          
@@ 639,6 654,7 @@ class Monitor:
             self.logger.info(f'mon: reaped {len(dead)} dead workers')
         stats = self.ensure_workers()
         self.scheduler.step()
+        self.autovacuum()
         self.dead_man_switch()
         return stats