M pyproject.toml +4 -1
@@ 1,2 1,5 @@
[tool.pytype]
-inputs = ['rework']
No newline at end of file
+inputs = ['rework']
+
+[tool.ruff]
+ignore = ["E501", "E702", "E722", "E731"]
M rework/cli.py +2 -1
@@ 56,7 56,8 @@ def register_operations(dburi, module, d
The registered operations will be relative to the current host.
"""
- mod = imp.load_source('operations', module)
+ # load the module
+ imp.load_source('operations', module)
engine = create_engine(find_dburi(dburi))
ok, ko = api.freeze_operations(engine, domain)
M rework/io.py +0 -1
@@ 1,4 1,3 @@
-import json
import calendar as cal
from datetime import datetime as dt
M rework/monitor.py +3 -8
@@ 7,11 7,6 @@ from datetime import datetime
import traceback as tb
from pathlib import Path
import sys
-import pickle
-from threading import (
- Event,
- Thread
-)
import tzlocal
import pytz
@@ 129,7 124,7 @@ class scheduler:
self.sched.shutdown()
def schedule(self, opname, domain, inputdata, host, meta, rule):
- job = self.sched.add_job(
+ self.sched.add_job(
_schedule(
self.engine,
opname,
@@ 148,8 143,8 @@ class scheduler:
self.stop()
print(f'scheduler: reloading definitions for {self.domain}')
self.sched = schedulerservice()
- for operation, domain, inputdata, host, meta, rule in defs:
- self.schedule(operation, domain, inputdata, host, meta, rule)
+ for operation, domain, inputdata, hostid, meta, rule in defs:
+ self.schedule(operation, domain, inputdata, hostid, meta, rule)
self.defs = defs
print(f'scheduler: starting with definitions:\n{self.defs}')
self.sched.start()
M rework/sched.py +0 -1
@@ 65,7 65,6 @@ class _ThreadPoolExecutor:
self._work_queue.put(_WorkItem(f, fn, args, kwargs))
num_threads = len(self._threads)
if num_threads < self._max_workers:
- thread_name = f'{self}_{num_threads}'
t = threading.Thread(target=self._worker)
t.start()
self._threads.add(t)
M rework/worker.py +1 -1
@@ 97,7 97,7 @@ class Worker:
traceback=traceback.format_exc()
).do(cn)
raise
- except SystemExit as exit:
+ except SystemExit:
raise
def heartbeat(self):
M tests/conftest.py +1 -1
@@ 9,7 9,7 @@ from pytest_sa_pg import db
from rework import api, cli as rcli, schema
# our test tasks
-from . import tasks
+from . import tasks as _tasks
DATADIR = Path(__file__).parent / 'data'
M tests/test_cli.py +2 -5
@@ 1,5 1,4 @@
import os
-import time
from pathlib import Path
import pytest
@@ 110,8 109,6 @@ def test_register_operations(engine, cli
def test_unregister_operations(engine, cli, cleanup):
- r0 = cli('list-operations', engine.url)
-
r = cli('unregister-operation', engine.url, 'no_such_op', '--no-confirm')
assert r.output.startswith('Nothing to unregister')
@@ 674,6 671,6 @@ def test_scheduler_export_import(engine,
r = cli('list-scheduled', engine.url)
assert scrub(r.output).strip() == (
- f'<X> `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
- f'<X> `fancy_inputs_outputs` default `no host` `no meta` "* * * * * *"'
+ '<X> `print_sleep_and_go_away` default `no host` `no meta` "* * * * * *"\n'
+ '<X> `fancy_inputs_outputs` default `no host` `no meta` "* * * * * *"'
)
M tests/test_monitor.py +12 -12
@@ 158,13 158,13 @@ def test_failed_pending_start(engine, cl
from rework.api import workers
with workers(engine, maxworkers=2, minworkers=2, start_timeout=0) as mon:
- stat1 = mon.step()
+ mon.step()
pending = mon.pending_start.copy()
assert len(pending) == 2
assert len(mon.wids) == 2
time.sleep(.1)
- stat2 = mon.step()
+ mon.step()
assert len(mon.pending_start) == 2
assert len(mon.wids) == 2
@@ 238,7 238,7 @@ def test_domain(engine, cleanup):
t2 = api.schedule(engine, 'print_sleep_and_go_away', 1)
guard(engine, f'select running from rework.worker where id = {wid}',
- lambda r: r.scalar() == False)
+ lambda r: r.scalar() is False)
assert t1.status == 'queued'
assert t2.status == 'done'
@@ 249,14 249,14 @@ def test_domain(engine, cleanup):
t2 = api.schedule(engine, 'print_sleep_and_go_away', 1)
guard(engine, f'select running from rework.worker where id = {wid}',
- lambda r: r.scalar() == False)
+ lambda r: r.scalar() is False)
assert t1.status == 'done'
assert t2.status == 'queued'
def test_worker_two_runs_nondfefault_domain(engine, cleanup):
- with workers(engine, maxruns=2, domain='nondefault') as mon:
+ with workers(engine, maxruns=2, domain='nondefault'):
t1 = api.schedule(engine, 'run_in_non_default_domain')
t2 = api.schedule(engine, 'run_in_non_default_domain')
t3 = api.schedule(engine, 'run_in_non_default_domain')
@@ 301,7 301,7 @@ def test_worker_shutdown(engine, cleanup
).do(cn)
assert worker.shutdown_asked()
guard(engine, f'select shutdown from rework.worker where id = {wid}',
- lambda r: r.scalar() == True)
+ lambda r: r.scalar() is True)
guard(engine, 'select count(id) from rework.worker where running = true',
lambda r: r.scalar() == 0)
@@ 320,7 320,7 @@ def test_worker_kill(engine, cleanup):
kill=True
).do(cn)
guard(engine, f'select kill from rework.worker where id = {wid}',
- lambda r: r.scalar() == True)
+ lambda r: r.scalar() is True)
mon.preemptive_kill()
@@ 347,7 347,7 @@ def test_worker_max_runs(engine, cleanup
t.join()
guard(engine, f'select running from rework.worker where id = {wid}',
- lambda r: r.scalar() == False)
+ lambda r: r.scalar() is False)
with workers(engine, maxruns=1) as mon:
wid = mon.wids[0]
@@ 358,7 358,7 @@ def test_worker_max_runs(engine, cleanup
t1.join()
guard(engine, f'select running from rework.worker where id = {wid}',
- lambda r: r.scalar() == False)
+ lambda r: r.scalar() is False)
assert t2.state == 'queued'
assert t2.worker is None
@@ 384,7 384,7 @@ def test_worker_max_mem(engine, cleanup)
t2 = api.schedule(engine, 'allocate_and_leak_mbytes', 100)
t2.join()
guard(engine, f'select shutdown from rework.worker where id = {wid}',
- lambda r: r.scalar() == True)
+ lambda r: r.scalar() is True)
worker = Worker(engine.url, wid)
assert worker.shutdown_asked()
@@ 457,7 457,7 @@ def test_worker_unplanned_death(engine,
def test_killed_task(engine, cleanup):
- with workers(engine) as mon:
+ with workers(engine):
t = api.schedule(engine, 'infinite_loop')
t.join('running')
@@ 465,7 465,7 @@ def test_killed_task(engine, cleanup):
assert t.traceback is None
try:
- with workers(engine) as mon:
+ with workers(engine):
t = api.schedule(engine, 'infinite_loop')
t.join('running')
raise Exception('kill the monitor')