cleanups after a linter pass
M pyproject.toml +4 -1
@@ 1,2 1,5 @@ 
 [tool.pytype]
-inputs = ['rework']
  No newline at end of file
+inputs = ['rework']
+
+[tool.ruff]
+ignore = ["E501", "E702", "E722", "E731"]

          
M rework/cli.py +2 -1
@@ 56,7 56,8 @@ def register_operations(dburi, module, d
 
     The registered operations will be relative to the current host.
     """
-    mod = imp.load_source('operations', module)
+    # load the module
+    imp.load_source('operations', module)
     engine = create_engine(find_dburi(dburi))
     ok, ko = api.freeze_operations(engine, domain)
 

          
M rework/io.py +0 -1
@@ 1,4 1,3 @@ 
-import json
 import calendar as cal
 from datetime import datetime as dt
 

          
M rework/monitor.py +3 -8
@@ 7,11 7,6 @@ from datetime import datetime
 import traceback as tb
 from pathlib import Path
 import sys
-import pickle
-from threading import (
-    Event,
-    Thread
-)
 
 import tzlocal
 import pytz

          
@@ 129,7 124,7 @@ class scheduler:
         self.sched.shutdown()
 
     def schedule(self, opname, domain, inputdata, host, meta, rule):
-        job = self.sched.add_job(
+        self.sched.add_job(
             _schedule(
                 self.engine,
                 opname,

          
@@ 148,8 143,8 @@ class scheduler:
             self.stop()
             print(f'scheduler: reloading definitions for {self.domain}')
             self.sched = schedulerservice()
-            for operation, domain, inputdata, host, meta, rule in defs:
-                self.schedule(operation, domain, inputdata, host, meta, rule)
+            for operation, domain, inputdata, hostid, meta, rule in defs:
+                self.schedule(operation, domain, inputdata, hostid, meta, rule)
             self.defs = defs
             print(f'scheduler: starting with definitions:\n{self.defs}')
             self.sched.start()

          
M rework/sched.py +0 -1
@@ 65,7 65,6 @@ class _ThreadPoolExecutor:
             self._work_queue.put(_WorkItem(f, fn, args, kwargs))
             num_threads = len(self._threads)
             if num_threads < self._max_workers:
-                thread_name = f'{self}_{num_threads}'
                 t = threading.Thread(target=self._worker)
                 t.start()
                 self._threads.add(t)

          
M rework/worker.py +1 -1
@@ 97,7 97,7 @@ class Worker:
                     traceback=traceback.format_exc()
                 ).do(cn)
             raise
-        except SystemExit as exit:
+        except SystemExit:
             raise
 
     def heartbeat(self):

          
M tests/conftest.py +1 -1
@@ 9,7 9,7 @@ from pytest_sa_pg import db
 from rework import api, cli as rcli, schema
 
 # our test tasks
-from . import tasks
+from . import tasks as _tasks
 
 
 DATADIR = Path(__file__).parent / 'data'

          
M tests/test_cli.py +2 -5
@@ 1,5 1,4 @@ 
 import os
-import time
 from pathlib import Path
 
 import pytest

          
@@ 110,8 109,6 @@ def test_register_operations(engine, cli
 
 
 def test_unregister_operations(engine, cli, cleanup):
-    r0 = cli('list-operations', engine.url)
-
     r = cli('unregister-operation', engine.url, 'no_such_op', '--no-confirm')
     assert r.output.startswith('Nothing to unregister')
 

          
@@ 674,6 671,6 @@ def test_scheduler_export_import(engine,
 
     r = cli('list-scheduled', engine.url)
     assert scrub(r.output).strip() == (
-        f'<X> `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
-        f'<X> `fancy_inputs_outputs` default `no host` `no meta` "* * * * * *"'
+        '<X> `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
+        '<X> `fancy_inputs_outputs` default `no host` `no meta` "* * * * * *"'
     )

          
M tests/test_monitor.py +12 -12
@@ 158,13 158,13 @@ def test_failed_pending_start(engine, cl
     from rework.api import workers
 
     with workers(engine, maxworkers=2, minworkers=2, start_timeout=0) as mon:
-        stat1 = mon.step()
+        mon.step()
         pending = mon.pending_start.copy()
         assert len(pending) == 2
         assert len(mon.wids) == 2
 
         time.sleep(.1)
-        stat2 = mon.step()
+        mon.step()
         assert len(mon.pending_start) == 2
         assert len(mon.wids) == 2
 

          
@@ 238,7 238,7 @@ def test_domain(engine, cleanup):
         t2 = api.schedule(engine, 'print_sleep_and_go_away', 1)
 
         guard(engine, f'select running from rework.worker where id = {wid}',
-              lambda r: r.scalar() == False)
+              lambda r: r.scalar() is False)
 
         assert t1.status == 'queued'
         assert t2.status == 'done'

          
@@ 249,14 249,14 @@ def test_domain(engine, cleanup):
         t2 = api.schedule(engine, 'print_sleep_and_go_away', 1)
 
         guard(engine, f'select running from rework.worker where id = {wid}',
-              lambda r: r.scalar() == False)
+              lambda r: r.scalar() is False)
 
         assert t1.status == 'done'
         assert t2.status == 'queued'
 
 
 def test_worker_two_runs_nondfefault_domain(engine, cleanup):
-    with workers(engine, maxruns=2, domain='nondefault') as mon:
+    with workers(engine, maxruns=2, domain='nondefault'):
         t1 = api.schedule(engine, 'run_in_non_default_domain')
         t2 = api.schedule(engine, 'run_in_non_default_domain')
         t3 = api.schedule(engine, 'run_in_non_default_domain')

          
@@ 301,7 301,7 @@ def test_worker_shutdown(engine, cleanup
             ).do(cn)
         assert worker.shutdown_asked()
         guard(engine, f'select shutdown from rework.worker where id = {wid}',
-              lambda r: r.scalar() == True)
+              lambda r: r.scalar() is True)
 
         guard(engine, 'select count(id) from rework.worker where running = true',
               lambda r: r.scalar() == 0)

          
@@ 320,7 320,7 @@ def test_worker_kill(engine, cleanup):
                 kill=True
             ).do(cn)
         guard(engine, f'select kill from rework.worker where id = {wid}',
-              lambda r: r.scalar() == True)
+              lambda r: r.scalar() is True)
 
         mon.preemptive_kill()
 

          
@@ 347,7 347,7 @@ def test_worker_max_runs(engine, cleanup
         t.join()
 
         guard(engine, f'select running from rework.worker where id = {wid}',
-              lambda r: r.scalar() == False)
+              lambda r: r.scalar() is False)
 
     with workers(engine, maxruns=1) as mon:
         wid = mon.wids[0]

          
@@ 358,7 358,7 @@ def test_worker_max_runs(engine, cleanup
         t1.join()
 
         guard(engine, f'select running from rework.worker where id = {wid}',
-              lambda r: r.scalar() == False)
+              lambda r: r.scalar() is False)
 
         assert t2.state == 'queued'
         assert t2.worker is None

          
@@ 384,7 384,7 @@ def test_worker_max_mem(engine, cleanup)
         t2 = api.schedule(engine, 'allocate_and_leak_mbytes', 100)
         t2.join()
         guard(engine, f'select shutdown from rework.worker where id = {wid}',
-              lambda r: r.scalar() == True)
+              lambda r: r.scalar() is True)
         worker = Worker(engine.url, wid)
         assert worker.shutdown_asked()
 

          
@@ 457,7 457,7 @@ def test_worker_unplanned_death(engine, 
 
 
 def test_killed_task(engine, cleanup):
-    with workers(engine) as mon:
+    with workers(engine):
         t = api.schedule(engine, 'infinite_loop')
         t.join('running')
 

          
@@ 465,7 465,7 @@ def test_killed_task(engine, cleanup):
     assert t.traceback is None
 
     try:
-        with workers(engine) as mon:
+        with workers(engine):
             t = api.schedule(engine, 'infinite_loop')
             t.join('running')
             raise Exception('kill the monitor')