@@ 1,13 1,14 @@
"This module contains main command actions"
import os
import os.path as osp
+import queue
import shutil
import sys
from mercurial import error, registrar
from mercurial.i18n import _
-from . import forge
+from . import forge, runner
from .managed import gitrepo, hgrepo
from .opts import DEFAULTOPTS, EXCLUDEOPT, INCLUDEOPT, PULLURIOPT, REMOTEOPTS
from .utils import WrappedRepo, WrappedUI, readconf
@@ 59,6 60,7 @@ def command(name, opts):
"do not update managed if it is on a descendant of track",
),
("", "ci", False, "enable ci mode"),
+ ("j", "jobs", 1, "number of operations to run in parallel"),
],
)
def ensureconf(ui, repo, *args, **opts):
@@ 101,52 103,58 @@ def ensureconf(ui, repo, *args, **opts):
def checkout_repo(section, rev, conf):
confman.checkout_section(section, snaps, opts.get("keep_descendant", False))
- # phase 1 - checkout all the repos that may extend the configuration,
- # update the configuration, and repeat until no new section appear
- confcomplete = False
- while not confcomplete:
+ with runner.Runner(opts.get('jobs', 1)) as r:
+
+ def checkout(need_checkout):
+ nonlocal ready
+
+ res = r.call(checkout_repo, [(args, {}) for args in need_checkout])
+
+ has_errors = False
+ for ((section, rev, _), _, _, exc) in res:
+ if exc:
+ ui.write('%s\n' % exc, label='confman.dirty')
+ has_errors = True
+ else:
+ ready[section] = rev
+ return not has_errors
+
+ # phase 1 - checkout all the repos that may extend the configuration,
+ # update the configuration, and repeat until no new section appear
+ confcomplete = False
+ while not confcomplete:
+ need_checkout = []
+ for section in confman.filtered_sections():
+ conf = confman.confs[section]
+ if confman._check_parameters(section, True):
+ continue
+ if not any(s.startswith("expand") for s in conf):
+ continue
+ rev = snaps.get(conf["layout"]) or conf.get("track", "default")
+ if ready.get(section) == rev:
+ continue
+ need_checkout.append((section, rev, conf))
+
+ if not need_checkout:
+ confcomplete = True
+ elif not checkout(need_checkout):
+ return 1
+
+ confman._readconf()
+
+ # phase 2 - at this point, we know the configuration is stable, we can
+ # checkout all the remaining repos without reading it.
need_checkout = []
for section in confman.filtered_sections():
conf = confman.confs[section]
- if confman._check_parameters(section, True):
- continue
+ confman._check_parameters(section, False)
rev = snaps.get(conf["layout"]) or conf.get("track", "default")
if ready.get(section) == rev:
continue
- if any(s.startswith("expand") for s in conf):
- need_checkout.append((section, rev, conf))
-
- if not need_checkout:
- confcomplete = True
-
- for section, rev, conf in need_checkout:
- try:
- checkout_repo(section, rev, conf)
- except Exception as err:
- ui.write('%s\n' % err, label='confman.dirty')
- return 1
- ready[section] = rev
-
- confman._readconf()
+ need_checkout.append((section, rev, conf))
- # phase 2 - at this point, we know the configuration is stable, we can
- # checkout all the remaining repos without reading it.
- need_checkout = []
- for section in confman.filtered_sections():
- conf = confman.confs[section]
- confman._check_parameters(section, False)
- rev = snaps.get(conf['layout']) or conf.get('track', 'default')
- if section in ready and ready[section] == rev:
- continue
- need_checkout.append((section, rev, conf))
-
- for section, rev, conf in need_checkout:
- try:
- checkout_repo(section, rev, conf)
- except Exception as err:
- ui.write('%s\n' % err, label='confman.dirty')
+ if not checkout(need_checkout):
return 1
- ready[section] = rev
# baseline
@@ 0,0 1,64 @@
+import queue
+import random
+import threading
+
+
+class Runner:
+ def __init__(self, jobs):
+ self.jobcount = jobs
+ self.threads = []
+ self.jobqueue = queue.Queue()
+ self.jobresults = queue.Queue()
+
+ def start(self):
+ if self.jobcount > 1:
+ for _ in range(self.jobcount):
+ self.threads.append(threading.Thread(target=self.run))
+ for t in self.threads:
+ t.start()
+
+ def stop(self):
+ while not self.jobqueue.empty():
+ self.jobqueue.get_nowait()
+ for _ in self.threads:
+ self.jobqueue.put(None)
+ for t in self.threads:
+ t.join()
+ self.threads = []
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.stop()
+
+ def _call(self, fn, args, kwargs):
+ try:
+ return (args, kwargs, fn(*args, **kwargs), None)
+ except Exception as e:
+ return (args, kwargs, None, e)
+
+ def run(self):
+ while True:
+ job = self.jobqueue.get()
+ if job is None:
+ return
+
+ index, fn, args, kwargs = job
+
+ self.jobresults.put((index, self._call(fn, args, kwargs)))
+
+ def call(self, fn, calls):
+ if self.threads:
+ jobs = [
+ (index, fn, args, kwargs) for index, (args, kwargs) in enumerate(calls)
+ ]
+ random.shuffle(jobs)
+ for job in jobs:
+ self.jobqueue.put(job)
+ results = [self.jobresults.get() for _ in jobs]
+ results.sort(key=lambda r: r[0])
+ return [res for _, res in results]
+ else:
+ return [self._call(fn, args, kwargs) for args, kwargs in calls]