# HG changeset patch # User Christophe de Vienne # Date 1674830271 -3600 # Fri Jan 27 15:37:51 2023 +0100 # Node ID de707a9a4870071232414783638ed96460c821aa # Parent 1eb79be7030da95fe174c3fe85e32c2ea50a6933 Initial implementation of 'cfensureconf --ci' diff --git a/hgext3rd/confman/commands.py b/hgext3rd/confman/commands.py --- a/hgext3rd/confman/commands.py +++ b/hgext3rd/confman/commands.py @@ -7,33 +7,34 @@ from mercurial import error, registrar from mercurial.i18n import _ +from . import forge from .managed import gitrepo, hgrepo from .opts import DEFAULTOPTS, EXCLUDEOPT, INCLUDEOPT, PULLURIOPT, REMOTEOPTS from .utils import WrappedRepo, WrappedUI, readconf -ENC = os.environ.get('ENCODING') +ENC = os.environ.get("ENCODING") cmdtable = {} _command = registrar.command(cmdtable) def command(name, opts): - name = name.encode('utf-8') + name = name.encode("utf-8") newopts = [ - tuple(item.encode('utf-8') if isinstance(item, str) else item for item in elt) + tuple(item.encode("utf-8") if isinstance(item, str) else item for item in elt) for elt in opts ] def wrap_command(func): def wrapped_func(ui, repo, *args, **kw): - newargs = tuple(elt.decode('utf-8') for elt in args) + newargs = tuple(elt.decode("utf-8") for elt in args) newk = {} for k, v in kw.items(): if isinstance(v, bytes): - v = v.decode('utf-8') + v = v.decode("utf-8") elif isinstance(v, list): v = [ - elt.decode('utf-8') if isinstance(elt, bytes) else elt + elt.decode("utf-8") if isinstance(elt, bytes) else elt for elt in v ] newk[k] = v @@ -46,17 +47,18 @@ @command( - 'cfensureconf', + "cfensureconf", DEFAULTOPTS + REMOTEOPTS + [ - ('s', 'share-path', '', 'specify share path'), + ("s", "share-path", "", "specify share path"), ( - '', - 'keep-descendant', + "", + "keep-descendant", False, - 'do not update managed if it is on a descendant of track', + "do not update managed if it is on a descendant of track", ), + ("", "ci", False, "enable ci mode"), ], ) def ensureconf(ui, repo, *args, **opts): @@ -83,6 +85,22 @@ snaps = confman.readsnapshot() + if opts.get("ci", False): + + def checkout_repo(section, rev, conf): + pulluri = conf["pulluri"] + if pulluri.endswith(".tar.gz") or pulluri.endswith(".zip"): + confman.checkout_section( + section, snaps, opts.get("keep_descendant", False) + ) + else: + forge.checkout(ui, section, rev, conf) + + else: + + def checkout_repo(section, rev, conf): + confman.checkout_section(section, snaps, opts.get("keep_descendant", False)) + # phase 1 - checkout all the repos that may extend the configuration, # update the configuration, and repeat until no new section appear confcomplete = False @@ -92,7 +110,7 @@ conf = confman.confs[section] if confman._check_parameters(section, True): continue - rev = snaps.get(conf['layout']) or conf.get('track', 'default') + rev = snaps.get(conf["layout"]) or conf.get("track", "default") if ready.get(section) == rev: continue if any(s.startswith("expand") for s in conf): @@ -103,9 +121,7 @@ for section, rev, conf in need_checkout: try: - confman.checkout_section( - section, snaps, opts.get('keep_descendant', False) - ) + checkout_repo(section, rev, conf) except Exception as err: ui.write('%s\n' % err, label='confman.dirty') return 1 @@ -126,7 +142,7 @@ for section, rev, conf in need_checkout: try: - confman.checkout_section(section, snaps, opts.get('keep_descendant', False)) + checkout_repo(section, rev, conf) except Exception as err: ui.write('%s\n' % err, label='confman.dirty') return 1 @@ -137,12 +153,12 @@ @command( - 'cfbaseline', + "cfbaseline", DEFAULTOPTS + [ - ('', 'force-hgconf', False, 'force the generation of an .hgconf file'), - ('', 'propagate', False, 'edit inner configuration also'), - ('Z', 'ignoremaster', True, 'ignore the /master tag'), + ("", "force-hgconf", False, "force the generation of an .hgconf file"), + ("", "propagate", False, "edit inner configuration also"), + ("Z", "ignoremaster", True, "ignore the /master tag"), ], ) def baseline(ui, repo, *args, **opts): @@ -165,17 +181,17 @@ tagmap = {} untagged = [] for section, conf, managed in confman.iterrepos(): - track = conf.get('track') + track = conf.get("track") # we don't record these - if opts.get('propagate') and 'expand' in conf: - if not opts.get('root_path'): - opts['root_path'] = confman.rootpath + if opts.get("propagate") and "expand" in conf: + if not opts.get("root_path"): + opts["root_path"] = confman.rootpath baseline(ui, managed.repo, *args, **opts) - if track is None and not opts.get('force_hgconf'): + if track is None and not opts.get("force_hgconf"): continue ctx = managed.workingctx() - ignoretag = ctx.tag and opts.get('ignoremaster') and ctx.tag.endswith('/master') + ignoretag = ctx.tag and opts.get("ignoremaster") and ctx.tag.endswith("/master") if ctx.tag and not ignoretag: if track != ctx.tag: tagmap[section] = ctx.tag @@ -188,57 +204,57 @@ tagmap[section] = ctx.hex if untagged: - ui.write('The following repositories are not tag/branch aligned:\n') + ui.write("The following repositories are not tag/branch aligned:\n") for section in untagged: - ui.write('%s\n' % section, label='confman.dirty') + ui.write("%s\n" % section, label="confman.dirty") if not tagmap: - ui.write('Nothing to do.\n') + ui.write("Nothing to do.\n") return - hgconfpath = osp.join(confman.rootpath, '.hgconf') - if opts.get('force_hgconf'): + hgconfpath = osp.join(confman.rootpath, ".hgconf") + if opts.get("force_hgconf"): confman.save(hgconfpath) ui.write('\nA fresh ".hgconf" has been created\n') confman, repo = readconf(ui, repo, args, opts) rewritten = confman.save_gently(tagmap) if rewritten: - ui.write('The following entries have been updated:\n') + ui.write("The following entries have been updated:\n") for section, tag in rewritten: - ui.write('%s : ' % section, label='confman.section') - ui.write('%s\n' % tag, label='confman.tagaligned') + ui.write("%s : " % section, label="confman.section") + ui.write("%s\n" % tag, label="confman.tagaligned") # pull -@command('cfpull', DEFAULTOPTS + REMOTEOPTS) +@command("cfpull", DEFAULTOPTS + REMOTEOPTS) def pull(ui, repo, *args, **opts): """pull managed repositories""" confman, repo = readconf(ui, repo, args, opts) for section, conf, managed in confman.iterrepos(): - ui.write(section + '\n', label='confman.section') - ui.status('pulling repo %s\n' % section) + ui.write(section + "\n", label="confman.section") + ui.status("pulling repo %s\n" % section) try: managed.pull_repo(section, conf) except error.RepoError: - ui.write('unable to pull from ', label='confman.dirty') - ui.write(conf['pulluri']) - ui.write('\n') + ui.write("unable to pull from ", label="confman.dirty") + ui.write(conf["pulluri"]) + ui.write("\n") continue # push -@command('cfpush', DEFAULTOPTS + REMOTEOPTS) +@command("cfpush", DEFAULTOPTS + REMOTEOPTS) def push(ui, repo, *args, **opts): """push managed repositories up to their tracked rev""" confman, repo = readconf(ui, repo, args, opts) for section, conf, managed in confman.iterrepos(): - track = conf.get('track') + track = conf.get("track") if track is None: continue managed.push_repo(section, conf) @@ -247,7 +263,7 @@ # summary -@command('cfsummary', DEFAULTOPTS) +@command("cfsummary", DEFAULTOPTS) def summary(ui, repo, *args, **opts): """print a summary of the managed repositories @@ -260,106 +276,106 @@ snaps = confman.readsnapshot() def obs(ctx): - return ' obsolete' if ctx.obsolete() else '' + return " obsolete" if ctx.obsolete() else "" def summary(managed, conf, rctx): """write an advanced summary of managed repo at changeset rctx. it helps when there are two parents.""" branch = rctx.branch - ui.write('(%s' % branch) + ui.write("(%s" % branch) if rctx.obsolete(): - ui.write(' obsolete') + ui.write(" obsolete") phase = rctx.phase if phase: - ui.write(' ') - ui.write('%s' % phase, label='confman.%s-phase' % phase) - ui.write(') ') + ui.write(" ") + ui.write("%s" % phase, label="confman.%s-phase" % phase) + ui.write(") ") tags = rctx.tags if tags: - ui.write(min(tags, key=len), ' ') + ui.write(min(tags, key=len), " ") if snaps: - snapshot = snaps.get(conf.get('layout')) + snapshot = snaps.get(conf.get("layout")) showsnapshotstate(managed, ui, snapshot, rctx) - ui.write(' ') + ui.write(" ") # baseline state - track = conf.get('track') + track = conf.get("track") trackctx = managed.revsingle(track, skiperror=True) if track is None: - ui.write('[no baseline]', label='confman.nobaseline') + ui.write("[no baseline]", label="confman.nobaseline") elif track == branch: - ui.write('[baseline aligned with branch]', label='confman.branchaligned') + ui.write("[baseline aligned with branch]", label="confman.branchaligned") elif track in tags: ui.write_bytes( - '\N{CHECK MARK}'.encode(ENC or sys.stdout.encoding, 'confman'), - label='confman.tagaligned', + "\N{CHECK MARK}".encode(ENC or sys.stdout.encoding, "confman"), + label="confman.tagaligned", ) elif track == str(rctx.revnum) or rctx.hex.startswith(track): ui.write( - '[baseline aligned with%s cset %s]' % (obs(trackctx), track[:12]), - label='confman.csetaligned', + "[baseline aligned with%s cset %s]" % (obs(trackctx), track[:12]), + label="confman.csetaligned", ) elif trackctx == rctx: ui.write( - '[baseline aligned with revset %r]' % track, - label='confman.revsetaligned', + "[baseline aligned with revset %r]" % track, + label="confman.revsetaligned", ) elif trackctx in rctx.ancestors(): ui.write( - '[at descendant of%s %r]' % (obs(trackctx), track), - label='confman.snapolder', + "[at descendant of%s %r]" % (obs(trackctx), track), + label="confman.snapolder", ) elif trackctx in rctx.descendants(): ui.write( - '[at parent of%s %r]' % (obs(trackctx), track), - label='confman.snapnewer', + "[at parent of%s %r]" % (obs(trackctx), track), + label="confman.snapnewer", ) elif trackctx: ui.write( - '[baseline%s %r in a parallel branch]' % (obs(trackctx), track), - label='confman.snapparallel', + "[baseline%s %r in a parallel branch]" % (obs(trackctx), track), + label="confman.snapparallel", ) else: - ui.write('[baseline says %r]' % track, label='confman.unaligned') + ui.write("[baseline says %r]" % track, label="confman.unaligned") # show a pseudo-root - ui.write('%s\n' % osp.basename(confman.rootpath), label='confman.section') + ui.write("%s\n" % osp.basename(confman.rootpath), label="confman.section") # start it for section, conf, managed in confman.iterrepos(): node = confman.unicodetreenode(section) - ui.write_bytes(node.encode(ENC or sys.stdout.encoding, 'treegraph')) - ui.write(section, label='confman.section') + ui.write_bytes(node.encode(ENC or sys.stdout.encoding, "treegraph")) + ui.write(section, label="confman.section") if managed.isshared(): - ui.write(' ') - char = '\N{MARRIAGE SYMBOL}'.encode(ENC or sys.stdout.encoding, 'confman') - ui.write_bytes(char, label='confman.shared') - ui.write(' ') + ui.write(" ") + char = "\N{MARRIAGE SYMBOL}".encode(ENC or sys.stdout.encoding, "confman") + ui.write_bytes(char, label="confman.shared") + ui.write(" ") rctx = managed.currentctx(allow_p2=True) parents = rctx.parents nbparents = len(parents) if not parents: - ui.write('\n') + ui.write("\n") continue for parent in parents: if nbparents > 1: - ui.write(' ') + ui.write(" ") summary(managed, conf, parent) stat = managed.changestatus() if stat and nbparents <= 1: - ui.write(' ') - ui.write(stat, label='confman.dirty') - ui.write('\n') + ui.write(" ") + ui.write(stat, label="confman.dirty") + ui.write("\n") # broadcast -@command('cfbroadcast', DEFAULTOPTS + [('e', 'execute', [], 'execute command')]) +@command("cfbroadcast", DEFAULTOPTS + [("e", "execute", [], "execute command")]) def broadcast(ui, repo, *args, **opts): """execute a shell command in the context of managed repositories @@ -378,23 +394,23 @@ """ confman, repo = readconf(ui, repo, args, opts) - commands = opts.get('execute') + commands = opts.get("execute") if not commands: - ui.write('nothing to execute\n') + ui.write("nothing to execute\n") return import subprocess for section, conf, managed in confman.iterrepos(): - ui.write('%s\n' % section, label='confman.section') - params = dict(list(conf.items()) + [('section', section)]) + ui.write("%s\n" % section, label="confman.section") + params = dict(list(conf.items()) + [("section", section)]) for command in commands: try: command = command % params except KeyError as err: ui.write( - 'skip %s: unknown parameter %s\n' % (section, err), - label='confman.dirty', + "skip %s: unknown parameter %s\n" % (section, err), + label="confman.dirty", ) continue proc = subprocess.Popen( @@ -409,17 +425,17 @@ ui.write_bytes(out) if proc.returncode != 0: ui.write( - 'finished with return code %s\n' % proc.returncode, - label='confman.dirty', + "finished with return code %s\n" % proc.returncode, + label="confman.dirty", ) @command( - 'cffiles', + "cffiles", DEFAULTOPTS + [ - ('n', 'no-section', False, 'do not display section name'), - ('0', 'print0', False, 'end filenames with NUL, for use with xargs'), + ("n", "no-section", False, "do not display section name"), + ("0", "print0", False, "end filenames with NUL, for use with xargs"), ], ) def files(ui, repo, *args, **opts): @@ -435,8 +451,8 @@ """ confman, repo = readconf(ui, repo, args, opts) for section, conf, managed in confman.iterrepos(): - if not opts.get('no_section'): - ui.write(section + '\n', label='confman.section') + if not opts.get("no_section"): + ui.write(section + "\n", label="confman.section") for path in managed.files(opts): ui.write(path) @@ -445,11 +461,11 @@ @command( - 'cfupdateconf', + "cfupdateconf", DEFAULTOPTS + [ - ('', 'difftool', 'diff', 'diff command'), - ('a', 'apply', False, 'really apply the changes (irreversible)'), + ("", "difftool", "diff", "diff command"), + ("a", "apply", False, "really apply the changes (irreversible)"), ], ) def updateconf(ui, repo, *args, **opts): @@ -469,13 +485,13 @@ rewrites = {} for section, conf, managed in confman.iterrepos(): - ui.write('%s\n' % section, label='confman.section') + ui.write("%s\n" % section, label="confman.section") writtendiff = managed.rewrite_conf(conf) if writtendiff: - ui.write(' ... updated\n') + ui.write(" ... updated\n") -@command('cfclear', DEFAULTOPTS) +@command("cfclear", DEFAULTOPTS) def clear(ui, repo, *args, **opts): """Delete all managed directories""" confman, repo = readconf(ui, repo, args, opts) @@ -491,41 +507,41 @@ @command( - 'debugcfrequirements', + "debugcfrequirements", [ INCLUDEOPT, EXCLUDEOPT, PULLURIOPT, - ('e', 'editable', False, 'use local project path or and develop mode'), + ("e", "editable", False, "use local project path or and develop mode"), ], ) def requirements(ui, repo, *args, **opts): """generate a requirements.txt file from the .hgconf specification""" confman, repo = readconf(ui, repo, args, opts) - with open('requirements.txt', 'wb') as req: + with open("requirements.txt", "wb") as req: for section, conf, managed in confman.iterrepos(): - if opts.get('editable'): - req.write('-e %s\n' % (conf['layout'],)) + if opts.get("editable"): + req.write("-e %s\n" % (conf["layout"],)) else: # base case: exists on pypi if isinstance(managed, hgrepo): - prefix, suffix = 'hg+', '@' + conf.get('track', 'default') + prefix, suffix = "hg+", "@" + conf.get("track", "default") elif isinstance(managed, gitrepo): - prefix, suffix = 'git+', '@' + conf.get('track', 'default') + prefix, suffix = "git+", "@" + conf.get("track", "default") else: - prefix, suffix = '', '' + prefix, suffix = "", "" uri = ( - conf.get('hgrc.paths.%s' % opts.get('use_hgrc_path')) - or conf['pulluri'] + conf.get("hgrc.paths.%s" % opts.get("use_hgrc_path")) + or conf["pulluri"] ) - req.write('%s%s%s\n' % (prefix, uri.format(**conf), suffix)) + req.write("%s%s%s\n" % (prefix, uri.format(**conf), suffix)) # DEPRECATED -@command('debugsnapshot', DEFAULTOPTS) +@command("debugsnapshot", DEFAULTOPTS) def snapshot(ui, repo, *args, **opts): """record changeset ids of the managed repositories into the `.hgsnap` file @@ -548,39 +564,39 @@ def showsnapshotstate(self, ui, snapshot, rctx): snaprctx = self.revsingle(snapshot, skiperror=True) if snapshot is None: - ui.write('[no snapshot]', label='confman.nosnap') + ui.write("[no snapshot]", label="confman.nosnap") elif self.unknown_rev(snapshot): - ui.write('[unknown snapshot]', label='confman.snapunknown') + ui.write("[unknown snapshot]", label="confman.snapunknown") elif rctx == snaprctx: - ui.write('[snapshot aligned]', label='confman.snapaligned') + ui.write("[snapshot aligned]", label="confman.snapaligned") elif snaprctx in rctx.ancestors(): - ui.write('[at descendant of snapshot]', label='confman.snapolder') + ui.write("[at descendant of snapshot]", label="confman.snapolder") elif snaprctx in rctx.descendants(): - ui.write('[at parent of snapshot]', label='confman.snapnewer') + ui.write("[at parent of snapshot]", label="confman.snapnewer") else: - ui.write('[snapshot in parallel branch]', label='confman.snapparallel') + ui.write("[snapshot in parallel branch]", label="confman.snapparallel") -@command('debugwritegrfiles', DEFAULTOPTS + [PULLURIOPT]) +@command("debugwritegrfiles", DEFAULTOPTS + [PULLURIOPT]) def writegrfiles(ui, repo, *args, **opts): "write guestrepo files from configuration.." confman, repo = readconf(ui, repo, args, opts) - with open(os.path.join(confman.rootpath, '.hgguestrepo'), 'w') as gr: - with open(os.path.join(confman.rootpath, '.hggrmapping'), 'w') as mp: + with open(os.path.join(confman.rootpath, ".hgguestrepo"), "w") as gr: + with open(os.path.join(confman.rootpath, ".hggrmapping"), "w") as mp: for section, secconf, managed in confman.iterrepos(): - if 'expand' in secconf: + if "expand" in secconf: continue - layout = secconf.get('layout') - track = secconf.get('track') - pulluri = secconf.get('pulluri') - if opts.get('use_hgrc_path'): + layout = secconf.get("layout") + track = secconf.get("track") + pulluri = secconf.get("pulluri") + if opts.get("use_hgrc_path"): pulluri = secconf.get( - 'hgrc.paths.' + opts.get('use_hgrc_path'), pulluri + "hgrc.paths." + opts.get("use_hgrc_path"), pulluri ) try: ctx = managed.revsingle(track) track = ctx.tag or ctx.hex except: pass - gr.write('%s = %s %s\n' % (layout, section, track)) - mp.write('%s = %s\n' % (section, pulluri)) + gr.write("%s = %s %s\n" % (layout, section, track)) + mp.write("%s = %s\n" % (section, pulluri)) diff --git a/hgext3rd/confman/forge.py b/hgext3rd/confman/forge.py new file mode 100644 --- /dev/null +++ b/hgext3rd/confman/forge.py @@ -0,0 +1,180 @@ +import os +import tempfile +import urllib.request +import zipfile + +# https://github.com/orus-io/elm-spa/archive/refs/heads/master.zip +# https://github.com/orus-io/elm-spa/archive/refs/tags/1.2.0.zip +# https://github.com/orus-io/elm-spa/archive/refs/heads/example-cleanup.zip +# https://github.com/orus-io/elm-spa/archive/8a97e89fbc2933f3f53037bae53d730d7e496df2.zip + + +TRACK_CSET = "track-cset" +TRACK_BRANCH = "track-branch" +TRACK_TAG = "track-tag" + + +def match_github(pulluri, track): + if pulluri.startswith("https://github.com") or pulluri.startswith( + "git@github.com:" + ): + path = ( + pulluri.removeprefix("https://github.com/") + .removeprefix("git@github.com:") + .removesuffix(".git") + ) + return { + "track": track, + "path": path, + "name": path.split("/")[-1], + } + return None + + +def match_orus_io_hg(pulluri, track): + if pulluri.endswith(".git"): + return None + if pulluri.startswith("https://orus.io/") or pulluri.startswith( + "ssh://hg@orus.io/" + ): + path = pulluri.removeprefix("https://orus.io/").removeprefix( + "ssh://hg@orus.io/" + ) + return { + "track": track, + "path": path, + "name": path.split("/")[-1], + "token": "?private_token=" + os.environ.get("CI_JOB_TOKEN") + if "CI_JOB_TOKEN" in os.environ + else "", + } + return None + + +def match_orus_io_git(pulluri, track): + if not pulluri.endswith(".git"): + return None + if pulluri.startswith("https://orus.io/") or pulluri.startswith( + "ssh://hg@orus.io/" + ): + path = ( + pulluri.removeprefix("https://orus.io/") + .removeprefix("ssh://hg@orus.io/") + .removesuffix(".git") + ) + return { + "track": track, + "path": path, + "name": path.split("/")[-1], + "token": "?private_token=" + os.environ.get("CI_JOB_TOKEN") + if "CI_JOB_TOKEN" in os.environ + else "", + } + return None + + +known_forges = { + "github.com": { + "match": match_github, + TRACK_CSET: { + "url": "https://github.com/%(path)s/archive/%(track)s.zip", + "prefix": "%(name)s-%(track)s", + }, + TRACK_BRANCH: { + "url": "https://github.com/%(path)s/archive/refs/heads/%(track)s.zip", + "prefix": "%(name)s-%(track)s", + }, + TRACK_TAG: { + "url": "https://github.com/%(path)s/archive/refs/tags/%(track)s.zip", + "prefix": "%(name)s-%(track)s", + }, + }, + "orus.io-hg": { + "match": match_orus_io_hg, + TRACK_CSET: { + "url": "https://orus.io/%(path)s/-/archive/%(track)s/%(name)s-%(track)s.zip%(token)s", + "prefix": "%(name)s-%(track)s", + }, + TRACK_BRANCH: { + "url": "https://orus.io/%(path)s/-/archive/branch/%(track)s/%(name)s-branch-%(track)s.zip%(token)s", + "prefix": "%(name)s-branch-%(track)s", + }, + TRACK_TAG: { + "url": "https://orus.io/%(path)s/-/archive/%(track)s/%(name)s-%(track)s.zip%(token)s", + "prefix": "%(name)s-%(track)s", + }, + }, + "orus.io-git": { + "match": match_orus_io_git, + TRACK_CSET: { + "url": "https://orus.io/%(path)s/-/archive/%(track)s/%(name)s-%(track)s.zip%(token)s", + "prefix": "%(name)s-%(track)s", + }, + TRACK_BRANCH: { + "url": "https://orus.io/%(path)s/-/archive/%(track)s/%(name)s-%(track)s.zip%(token)s", + "prefix": "%(name)s-%(track)s", + }, + TRACK_TAG: { + "url": "https://orus.io/%(path)s/-/archive/%(track)s/%(name)s-%(track)s.zip%(token)s", + "prefix": "%(name)s-%(track)s", + }, + }, +} + + +def download_url(pulluri, rev, kind=TRACK_CSET): + for forge, settings in known_forges.items(): + m = settings["match"](pulluri, rev) + if m is not None: + return ( + settings[kind]["url"] % m, + settings[kind]["prefix"] % m, + ) + return None, None + + +def checkout(ui, section, rev, secconf): + # poke the forge type (bb, gh, gl, heptapod) + for k in (TRACK_CSET, TRACK_TAG, TRACK_BRANCH): + url, prefix = download_url(secconf["pulluri"], rev, k) + if url is None: + continue + try: + ui.write("fetching %s...\n" % (url)) + req = urllib.request.Request( + url, + ) + f = urllib.request.urlopen(req) + if f.headers.get("Content-Type") != "application/zip": + ui.write("not a zipfile (%s)\n" % (f.headers.get("Content-Type"))) + continue + with tempfile.NamedTemporaryFile( + "w+b", + prefix="confman-" + section.replace("/", "-") + "-", + suffix=".zip", + delete=False, + ) as t: + while True: + b = f.read(1024 * 1024) + if len(b) == 0: + break + t.write(b) + t.flush() + + z = zipfile.ZipFile(t.name) + for info in z.infolist(): + if info.is_dir(): + continue + path = os.path.join( + secconf["layout"], + info.filename.removeprefix(prefix).strip("/"), + ) + + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + with open(path, "wb") as out: + out.write(z.read(info)) + ui.write(" -> extracted to %s\n" % (secconf["layout"])) + break + except urllib.error.URLError as e: + ui.write("failed: error was: %s\n" % (e)) diff --git a/tests/test_forge.py b/tests/test_forge.py new file mode 100644 --- /dev/null +++ b/tests/test_forge.py @@ -0,0 +1,72 @@ +import pytest + +from hgext3rd.confman import forge + + +@pytest.mark.parametrize( + "args,expected", + [ + ( + ( + "https://github.com/orus-io/elm-spa", + "8a97e89fbc2933f3f53037bae53d730d7e496df2", + forge.TRACK_CSET, + ), + ( + "https://github.com/orus-io/elm-spa/archive/8a97e89fbc2933f3f53037bae53d730d7e496df2.zip", + "elm-spa-8a97e89fbc2933f3f53037bae53d730d7e496df2", + ), + ), + ( + ("https://github.com/orus-io/elm-spa", "master", forge.TRACK_BRANCH), + ( + "https://github.com/orus-io/elm-spa/archive/refs/heads/master.zip", + "elm-spa-master", + ), + ), + ( + ("git@github.com:orus-io/elm-spa", "1.2.0", forge.TRACK_TAG), + ( + "https://github.com/orus-io/elm-spa/archive/refs/tags/1.2.0.zip", + "elm-spa-1.2.0", + ), + ), + ( + ( + "https://orus.io/orus-io/go-orusapi", + "7a8082df2d8e4e34b3375ed56899522746ac9d05", + forge.TRACK_CSET, + ), + ( + "https://orus.io/orus-io/go-orusapi/-/archive/7a8082df2d8e4e34b3375ed56899522746ac9d05/go-orusapi-7a8082df2d8e4e34b3375ed56899522746ac9d05.zip", + "go-orusapi-7a8082df2d8e4e34b3375ed56899522746ac9d05", + ), + ), + ( + ("https://orus.io/orus-io/go-orusapi", "default", forge.TRACK_BRANCH), + ( + "https://orus.io/orus-io/go-orusapi/-/archive/branch/default/go-orusapi-branch-default.zip", + "go-orusapi-branch-default", + ), + ), + ( + ("https://orus.io/orus-io/rednerd", "v0.5.0", forge.TRACK_TAG), + ( + "https://orus.io/orus-io/rednerd/-/archive/v0.5.0/rednerd-v0.5.0.zip", + "rednerd-v0.5.0", + ), + ), + ], +) +def test_forge_download_url(args, expected): + assert forge.download_url(*args) == expected + + +# https://github.com/orus-io/elm-spa/archive/refs/heads/master.zip +# https://github.com/orus-io/elm-spa/archive/refs/tags/1.2.0.zip +# https://github.com/orus-io/elm-spa/archive/refs/heads/example-cleanup.zip +# https://github.com/orus-io/elm-spa/archive/8a97e89fbc2933f3f53037bae53d730d7e496df2.zip + +# https://orus.io/orus-io/go-orusapi/-/archive/branch/default/go-orusapi-branch-default.zip +# https://orus.io/orus-io/go-orusapi/-/archive/7a8082df2d8e4e34b3375ed56899522746ac9d05/go-orusapi-7a8082df2d8e4e34b3375ed56899522746ac9d05.zip +# https://orus.io/orus-io/rednerd/-/archive/v0.5.0/rednerd-v0.5.0.zip