ensureconf: implement concurrency
2 files changed, 111 insertions(+), 39 deletions(-)

M hgext3rd/confman/commands.py
A => hgext3rd/confman/runner.py
M hgext3rd/confman/commands.py +47 -39
@@ 1,13 1,14 @@ 
 "This module contains main command actions"
 import os
 import os.path as osp
+import queue
 import shutil
 import sys
 
 from mercurial import error, registrar
 from mercurial.i18n import _
 
-from . import forge
+from . import forge, runner
 from .managed import gitrepo, hgrepo
 from .opts import DEFAULTOPTS, EXCLUDEOPT, INCLUDEOPT, PULLURIOPT, REMOTEOPTS
 from .utils import WrappedRepo, WrappedUI, readconf

          
@@ 59,6 60,7 @@ def command(name, opts):
             "do not update managed if it is on a descendant of track",
         ),
         ("", "ci", False, "enable ci mode"),
+        ("j", "jobs", 1, "number of operations to run in parallel"),
     ],
 )
 def ensureconf(ui, repo, *args, **opts):

          
@@ 101,52 103,58 @@ def ensureconf(ui, repo, *args, **opts):
         def checkout_repo(section, rev, conf):
             confman.checkout_section(section, snaps, opts.get("keep_descendant", False))
 
-    # phase 1 - checkout all the repos that may extend the configuration,
-    # update the configuration, and repeat until no new section appear
-    confcomplete = False
-    while not confcomplete:
+    with runner.Runner(opts.get('jobs', 1)) as r:
+
+        def checkout(need_checkout):
+            nonlocal ready
+
+            res = r.call(checkout_repo, [(args, {}) for args in need_checkout])
+
+            has_errors = False
+            for ((section, rev, _), _, _, exc) in res:
+                if exc:
+                    ui.write('%s\n' % exc, label='confman.dirty')
+                    has_errors = True
+                else:
+                    ready[section] = rev
+            return not has_errors
+
+        # phase 1 - checkout all the repos that may extend the configuration,
+        # update the configuration, and repeat until no new section appear
+        confcomplete = False
+        while not confcomplete:
+            need_checkout = []
+            for section in confman.filtered_sections():
+                conf = confman.confs[section]
+                if confman._check_parameters(section, True):
+                    continue
+                if not any(s.startswith("expand") for s in conf):
+                    continue
+                rev = snaps.get(conf["layout"]) or conf.get("track", "default")
+                if ready.get(section) == rev:
+                    continue
+                need_checkout.append((section, rev, conf))
+
+            if not need_checkout:
+                confcomplete = True
+            elif not checkout(need_checkout):
+                return 1
+
+            confman._readconf()
+
+        # phase 2 - at this point, we know the configuration is stable, we can
+        # checkout all the remaining repos without reading it.
         need_checkout = []
         for section in confman.filtered_sections():
             conf = confman.confs[section]
-            if confman._check_parameters(section, True):
-                continue
+            confman._check_parameters(section, False)
             rev = snaps.get(conf["layout"]) or conf.get("track", "default")
             if ready.get(section) == rev:
                 continue
-            if any(s.startswith("expand") for s in conf):
-                need_checkout.append((section, rev, conf))
-
-        if not need_checkout:
-            confcomplete = True
-
-        for section, rev, conf in need_checkout:
-            try:
-                checkout_repo(section, rev, conf)
-            except Exception as err:
-                ui.write('%s\n' % err, label='confman.dirty')
-                return 1
-            ready[section] = rev
-
-        confman._readconf()
+            need_checkout.append((section, rev, conf))
 
-    # phase 2 - at this point, we know the configuration is stable, we can
-    # checkout all the remaining repos without reading it.
-    need_checkout = []
-    for section in confman.filtered_sections():
-        conf = confman.confs[section]
-        confman._check_parameters(section, False)
-        rev = snaps.get(conf['layout']) or conf.get('track', 'default')
-        if section in ready and ready[section] == rev:
-            continue
-        need_checkout.append((section, rev, conf))
-
-    for section, rev, conf in need_checkout:
-        try:
-            checkout_repo(section, rev, conf)
-        except Exception as err:
-            ui.write('%s\n' % err, label='confman.dirty')
+        if not checkout(need_checkout):
             return 1
-        ready[section] = rev
 
 
 # baseline

          
A => hgext3rd/confman/runner.py +64 -0
@@ 0,0 1,64 @@ 
+import queue
+import random
+import threading
+
+
+class Runner:
+    def __init__(self, jobs):
+        self.jobcount = jobs
+        self.threads = []
+        self.jobqueue = queue.Queue()
+        self.jobresults = queue.Queue()
+
+    def start(self):
+        if self.jobcount > 1:
+            for _ in range(self.jobcount):
+                self.threads.append(threading.Thread(target=self.run))
+            for t in self.threads:
+                t.start()
+
+    def stop(self):
+        while not self.jobqueue.empty():
+            self.jobqueue.get_nowait()
+        for _ in self.threads:
+            self.jobqueue.put(None)
+        for t in self.threads:
+            t.join()
+        self.threads = []
+
+    def __enter__(self):
+        self.start()
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.stop()
+
+    def _call(self, fn, args, kwargs):
+        try:
+            return (args, kwargs, fn(*args, **kwargs), None)
+        except Exception as e:
+            return (args, kwargs, None, e)
+
+    def run(self):
+        while True:
+            job = self.jobqueue.get()
+            if job is None:
+                return
+
+            index, fn, args, kwargs = job
+
+            self.jobresults.put((index, self._call(fn, args, kwargs)))
+
+    def call(self, fn, calls):
+        if self.threads:
+            jobs = [
+                (index, fn, args, kwargs) for index, (args, kwargs) in enumerate(calls)
+            ]
+            random.shuffle(jobs)
+            for job in jobs:
+                self.jobqueue.put(job)
+            results = [self.jobresults.get() for _ in jobs]
+            results.sort(key=lambda r: r[0])
+            return [res for _, res in results]
+        else:
+            return [self._call(fn, args, kwargs) for args, kwargs in calls]