# HG changeset patch # User Christophe de Vienne # Date 1674898809 -3600 # Sat Jan 28 10:40:09 2023 +0100 # Node ID 29cd4a9412bf44c01f772fab062ac3411bf9fc44 # Parent 99f0f081545fbd076a8399996204636eae82f559 ensureconf: implement concurrency diff --git a/hgext3rd/confman/commands.py b/hgext3rd/confman/commands.py --- a/hgext3rd/confman/commands.py +++ b/hgext3rd/confman/commands.py @@ -1,13 +1,14 @@ "This module contains main command actions" import os import os.path as osp +import queue import shutil import sys from mercurial import error, registrar from mercurial.i18n import _ -from . import forge +from . import forge, runner from .managed import gitrepo, hgrepo from .opts import DEFAULTOPTS, EXCLUDEOPT, INCLUDEOPT, PULLURIOPT, REMOTEOPTS from .utils import WrappedRepo, WrappedUI, readconf @@ -59,6 +60,7 @@ "do not update managed if it is on a descendant of track", ), ("", "ci", False, "enable ci mode"), + ("j", "jobs", 1, "number of operations to run in parallel"), ], ) def ensureconf(ui, repo, *args, **opts): @@ -101,52 +103,58 @@ def checkout_repo(section, rev, conf): confman.checkout_section(section, snaps, opts.get("keep_descendant", False)) - # phase 1 - checkout all the repos that may extend the configuration, - # update the configuration, and repeat until no new section appear - confcomplete = False - while not confcomplete: + with runner.Runner(opts.get('jobs', 1)) as r: + + def checkout(need_checkout): + nonlocal ready + + res = r.call(checkout_repo, [(args, {}) for args in need_checkout]) + + has_errors = False + for ((section, rev, _), _, _, exc) in res: + if exc: + ui.write('%s\n' % exc, label='confman.dirty') + has_errors = True + else: + ready[section] = rev + return not has_errors + + # phase 1 - checkout all the repos that may extend the configuration, + # update the configuration, and repeat until no new section appear + confcomplete = False + while not confcomplete: + need_checkout = [] + for section in confman.filtered_sections(): + conf = confman.confs[section] + if confman._check_parameters(section, True): + continue + if not any(s.startswith("expand") for s in conf): + continue + rev = snaps.get(conf["layout"]) or conf.get("track", "default") + if ready.get(section) == rev: + continue + need_checkout.append((section, rev, conf)) + + if not need_checkout: + confcomplete = True + elif not checkout(need_checkout): + return 1 + + confman._readconf() + + # phase 2 - at this point, we know the configuration is stable, we can + # checkout all the remaining repos without reading it. need_checkout = [] for section in confman.filtered_sections(): conf = confman.confs[section] - if confman._check_parameters(section, True): - continue + confman._check_parameters(section, False) rev = snaps.get(conf["layout"]) or conf.get("track", "default") if ready.get(section) == rev: continue - if any(s.startswith("expand") for s in conf): - need_checkout.append((section, rev, conf)) - - if not need_checkout: - confcomplete = True - - for section, rev, conf in need_checkout: - try: - checkout_repo(section, rev, conf) - except Exception as err: - ui.write('%s\n' % err, label='confman.dirty') - return 1 - ready[section] = rev - - confman._readconf() + need_checkout.append((section, rev, conf)) - # phase 2 - at this point, we know the configuration is stable, we can - # checkout all the remaining repos without reading it. - need_checkout = [] - for section in confman.filtered_sections(): - conf = confman.confs[section] - confman._check_parameters(section, False) - rev = snaps.get(conf['layout']) or conf.get('track', 'default') - if section in ready and ready[section] == rev: - continue - need_checkout.append((section, rev, conf)) - - for section, rev, conf in need_checkout: - try: - checkout_repo(section, rev, conf) - except Exception as err: - ui.write('%s\n' % err, label='confman.dirty') + if not checkout(need_checkout): return 1 - ready[section] = rev # baseline diff --git a/hgext3rd/confman/runner.py b/hgext3rd/confman/runner.py new file mode 100644 --- /dev/null +++ b/hgext3rd/confman/runner.py @@ -0,0 +1,64 @@ +import queue +import random +import threading + + +class Runner: + def __init__(self, jobs): + self.jobcount = jobs + self.threads = [] + self.jobqueue = queue.Queue() + self.jobresults = queue.Queue() + + def start(self): + if self.jobcount > 1: + for _ in range(self.jobcount): + self.threads.append(threading.Thread(target=self.run)) + for t in self.threads: + t.start() + + def stop(self): + while not self.jobqueue.empty(): + self.jobqueue.get_nowait() + for _ in self.threads: + self.jobqueue.put(None) + for t in self.threads: + t.join() + self.threads = [] + + def __enter__(self): + self.start() + return self + + def __exit__(self, type, value, traceback): + self.stop() + + def _call(self, fn, args, kwargs): + try: + return (args, kwargs, fn(*args, **kwargs), None) + except Exception as e: + return (args, kwargs, None, e) + + def run(self): + while True: + job = self.jobqueue.get() + if job is None: + return + + index, fn, args, kwargs = job + + self.jobresults.put((index, self._call(fn, args, kwargs))) + + def call(self, fn, calls): + if self.threads: + jobs = [ + (index, fn, args, kwargs) for index, (args, kwargs) in enumerate(calls) + ] + random.shuffle(jobs) + for job in jobs: + self.jobqueue.put(job) + results = [self.jobresults.get() for _ in jobs] + results.sort(key=lambda r: r[0]) + return [res for _, res in results] + else: + return [self._call(fn, args, kwargs) for args, kwargs in calls]