@@ 1,14 1,14 @@
"This module contains abtractions for managed repositories."
import os
import os.path as osp
-import urlparse
+import urllib.parse
import tarfile
import zipfile
from subprocess import check_output as call
from mercurial import hg, error, commands, archival, scmutil, util
-from hgcompat import pull as hgpull, push as hgpush
-from utils import download_file
+from .hgcompat import pull as hgpull, push as hgpush
+from .utils import download_file, WrappedRepo
class rcbase(object):
@@ 46,10 46,22 @@ class revisioncontext(rcbase):
def __init__(self, cset):
self._cset = cset
# be careful with the 'next commit'
- self.hex = cset.hex() if cset.node() is not None else None
- tags = set(cset.tags()) - set(['tip'])
- self.tag = min(tags, key=len) if tags else None
- self.branch = cset.branch()
+ self._hex = cset.hex() if cset.node() is not None else None
+ tags = set(cset.tags()) - set([b'tip'])
+ self._tag = min(tags, key=len) if tags else None
+ self._branch = cset.branch()
+
+ @property
+ def hex(self):
+ return self._hex and self._hex.decode('utf-8') or None
+
+ @property
+ def tag(self):
+ return self._tag and self._tag.decode('utf-8') or None
+
+ @property
+ def branch(self):
+ return self._branch and self._branch.decode('utf-8') or None
@property
def parents(self):
@@ 58,11 70,14 @@ class revisioncontext(rcbase):
@property
def phase(self):
- return self._cset.phasestr()
+ return self._cset.phasestr().decode('utf-8')
@property
def tags(self):
- return self._cset.tags()
+ return [
+ t.decode('utf-8')
+ for t in self._cset.tags()
+ ]
@property
def revnum(self):
@@ 83,18 98,18 @@ class revisioncontext(rcbase):
class gitrevisioncontext(rcbase):
def _call(self, *args):
- return call(('git',) + args, cwd=self.path).strip()
+ return call(
+ ('git',) + args, cwd=self.path
+ ).strip().decode('utf-8')
def __init__(self, path, hex=None):
-
self.path = path
-
if hex is None:
# No explicit rev: Find the long hash of the current one.
args = ['show', '--no-patch', '--format=format:%H']
try:
out = self._call(*args)
- self.hex = out.strip()
+ self.hex = out
except Exception:
self.hex = None
@@ 118,7 133,7 @@ class gitrevisioncontext(rcbase):
# Get the name of the current branch.
branch = self._call('rev-parse', '--abbrev-ref', 'HEAD')
- self.branch = branch.strip()
+ self.branch = branch
@property
def tags(self):
@@ 151,7 166,7 @@ def repoclassbyconf(conf, path, hggit=Fa
'''introspect the configuration parameters values and the filesystem
and deduct the right repository manager class.
'''
- uri = urlparse.urlparse(conf['pulluri'])
+ uri = urllib.parse.urlparse(conf['pulluri'])
pulluri = conf['pulluri']
# first check the local repo (if it exists)
@@ 295,8 310,10 @@ class gitrepo(managedrepo):
def _call(self, *args):
try:
- return call(('git',) + args, cwd=self.root).strip()
- except Exception, exc:
+ return call(
+ ('git',) + args, cwd=self.root
+ ).strip().decode('utf-8')
+ except Exception as exc:
# stdout will show the shit
pass
@@ 319,7 336,7 @@ class gitrepo(managedrepo):
def revsingle(self, rev, skiperror=False):
try:
out = self._call('rev-parse', rev)
- return gitrevisioncontext(self.root, hex=out.strip())
+ return gitrevisioncontext(self.root, hex=out)
except:
if skiperror:
return None
@@ 333,8 350,12 @@ class gitrepo(managedrepo):
def changestatus(self):
out = self._call('status', '--porcelain')
- stat = ''.join(set([l.strip().split()[0].replace('??', 'M')
- for l in out.splitlines()]))
+ stat = ''.join(
+ sorted(set(
+ [l.strip().split()[0].replace('??', 'M')
+ for l in out.splitlines()]
+ ))
+ )
return stat
def update_or_pull_and_update(self, section, secconf, rev):
@@ 350,7 371,9 @@ class hgrepo(managedrepo):
def __init__(self, conf, path):
super(hgrepo, self).__init__(conf, path)
- self.repo = hg.repository(self.ui, path=path)
+ self.repo = WrappedRepo(
+ hg.repository(self.ui, path=path.encode('utf-8'))
+ )
@property
def root(self):
@@ 361,7 384,7 @@ class hgrepo(managedrepo):
"""Return the highest cset of the revset matching the given revision
expression """
try:
- cset = scmutil.revsingle(self.repo, revexpr)
+ cset = scmutil.revsingle(self.repo, revexpr.encode('utf-8'))
except:
if skiperror:
return None
@@ 395,7 418,7 @@ class hgrepo(managedrepo):
def is_on_descendant(self, rev):
'''return True if the repository is on a descendant of ``rev``'''
- return bool(scmutil.revrange(self.repo, ['%s::.' % rev]))
+ return bool(scmutil.revrange(self.repo, [b'%s::.' % rev.encode('utf-8')]))
def workingctx(self):
"""Return the working/current context context of a repository
@@ 416,12 439,24 @@ class hgrepo(managedrepo):
if newsource != source:
conf.ui.warn('clone: using %r instead of %r\n' % (newsource, source))
source = newsource
- return commands.clone(conf.ui, source=source, dest=dest)
+ return commands.clone(
+ conf.ui,
+ source=source.encode('utf-8'),
+ dest=dest.encode('utf-8')
+ )
target = osp.join(sharepath, secconf['layout'])
if not osp.exists(target):
os.makedirs(target)
- commands.clone(conf.ui, source=source, dest=target)
- return hg.share(conf.ui, target, dest)
+ commands.clone(
+ conf.ui,
+ source=source.encode('utf-8'),
+ dest=target.encode('utf-8')
+ )
+ return hg.share(
+ conf.ui,
+ target.encode('utf-8'),
+ dest.encode('utf-8')
+ )
def pull_repo(self, section, conf):
"""Pull a managed repo from its configuration
@@ 430,16 465,20 @@ class hgrepo(managedrepo):
"""
ui = self.ui
pathname = self.conf.opts.get('use_hgrc_path') or 'default'
- pathuri = self.repo.ui.expandpath(pathname)
- if pathuri == pathname:
- pathuri = conf['pulluri']
+ pathuri = self.repo.ui.expandpath(pathname.encode('utf-8'))
+ if pathuri == pathname.encode('utf-8'):
+ pathuri = conf['pulluri'].encode('utf-8')
ui.warn('%s repo has no %s path, using configuration pulluri %s instead\n' %
- (section, pathname, pathuri))
+ (section, pathname, pathuri.decode('utf-8')))
source, _branches = hg.parseurl(pathuri, None)
newsource = self.conf.rewriteuri(source)
if newsource != source:
- ui.warn('pull: using %r instead of %r\n' % (newsource, source))
+ ui.warn('pull: using %r instead of %r\n' % (
+ newsource.decode('utf-8'),
+ source.decode('utf-8'))
+ )
+ source = newsource
try:
other = hg.peer(self.repo.ui, self.conf.opts, source)
@@ 457,16 496,16 @@ class hgrepo(managedrepo):
def push_repo(self, section, conf):
self.ui.write(section + '\n', label='confman.section')
pathname = self.conf.opts.get('use_hgrc_path', 'default')
- pathuri = self.repo.ui.expandpath(pathname)
+ pathuri = self.repo.ui.expandpath(pathname.encode('utf-8')).decode('utf-8')
if pathuri == pathname:
pathuri = conf['pulluri']
self.ui.warn('%s repo has no %s path, using configuration pulluri %s instead\n' %
(section, pathname, pathuri))
track = conf.get('track')
self.ui.write('pushing %s to %s\n' % (track, pathuri))
- source, __branches = hg.parseurl(pathuri, None)
+ source, __branches = hg.parseurl(pathuri.encode('utf-8'), None)
other = hg.peer(self.repo.ui, self.conf.opts, source)
- hgpush(self.repo, other, track)
+ hgpush(self.repo, other, track.encode('utf-8'))
def unknown_rev(self, rev):
"""Predicate to check if a revision belongs to a repository """
@@ 478,7 517,7 @@ class hgrepo(managedrepo):
def update(self, rev):
"Update the repository to `rev` "
- commands.update(self.ui, self.repo, rev=rev)
+ commands.update(self.ui, self.repo, rev=rev.encode('utf-8'))
def update_or_pull_and_update(self, section, conf, rev):
"""Try hard to update to a specified revision
@@ 499,7 538,10 @@ class hgrepo(managedrepo):
ui.write('updating to %s\n' % rev, label='confman.public-phase')
self.update(rev)
ui.write('updated to %s/%s from %s/%s\n' %
- (targetrev, targetctx.branch, currev, wctx.branch),
+ (targetrev,
+ targetctx.branch,
+ currev,
+ wctx.branch),
label='confman.updated')
return True
@@ 514,22 556,29 @@ class hgrepo(managedrepo):
def archive(self, zippath, prefix, rev, **opts):
"""Add an unversioned zip archive content of configuration repositories
at ``rev`` into ``zippath`` with internal ``prefix``"""
- ctx = scmutil.revsingle(self.repo, rev)
+ ctx = scmutil.revsingle(self.repo._repo, rev.encode('utf-8'))
if not ctx:
raise error.Abort('no working directory: please specify a revision')
matchfn = scmutil.match(ctx, [], opts)
node = ctx.node()
archivers = archival.archivers.copy()
archival.archivers['zip'] = zipit
- archival.archive(self.repo, zippath, node, 'zip',
- not opts.get('no_decode'), matchfn, prefix)
+ archival.archive(
+ self.repo._repo,
+ zippath.encode('utf-8'),
+ node,
+ b'zip',
+ not opts.get('no_decode'),
+ matchfn,
+ prefix.encode('utf-8')
+ )
archival.archivers.update(archivers)
def rewrite_conf(self, conf):
from difflib import unified_diff
from collections import defaultdict
from mercurial.config import config
- from utils import _unflatten
+ from .utils import _unflatten
# build the nested hgrc entries ([section] key value, key value, ...)
entries = _unflatten(conf).get('hgrc', defaultdict(dict))
@@ 539,30 588,32 @@ class hgrepo(managedrepo):
# already exist as is, and separate new entries from updated entries
hgrcpath = osp.join(self.repo.path, 'hgrc')
updated = defaultdict(dict)
- conf = config()
- conf.read(hgrcpath)
+ conf = config() # TODO: implement an str friendly config-like object
+ conf.read(hgrcpath.encode('utf-8'))
for section in conf:
- entry = entries.get(section)
+ usection = section.decode('utf-8')
+ entry = entries.get(usection)
if not entry:
continue
for key in conf[section]:
- value = conf[section][key]
- newvalue = entry.get(key)
+ ukey = key.decode('utf-8')
+ value = conf[section][key].decode('utf-8')
+ newvalue = entry.get(ukey)
if newvalue is not None and newvalue != value:
- updated[section][key] = newvalue
- entry.pop(key, None)
- if not entries[section]: # now empty
- entries.pop(section)
+ updated[usection][ukey] = newvalue
+ entry.pop(ukey, None)
+ if not entries[usection]: # now empty
+ entries.pop(usection)
# at this point entries contains exclusively *new* entries
# rewrite without altering otherwise fine parts of the file
if not osp.exists(hgrcpath):
- with open(hgrcpath, 'wb'):
+ with open(hgrcpath, 'w'):
self.ui.write('creating an hgrc file from scratch\n')
newhgrcpath = os.path.join(self.repo.path, 'hgrc.new')
- with open(newhgrcpath, 'wb') as newhgrc:
- with open(hgrcpath, 'rb') as hgrc:
+ with open(newhgrcpath, 'w') as newhgrc:
+ with open(hgrcpath, 'r') as hgrc:
section = None
for line in hgrc:
sline = line.strip()
@@ 583,7 634,7 @@ class hgrepo(managedrepo):
if sline.startswith('['): # new section
# handle new entries while in the previous section
if entries.get(section):
- for key, val in entries[section].items():
+ for key, val in list(entries[section].items()):
newhgrc.write('%s = %s\n' % (key, val))
newhgrc.write('\n')
entries.pop(section)
@@ 593,13 644,13 @@ class hgrepo(managedrepo):
# unprocessed entries
if entries:
- for key, val in entries[section].items():
+ for key, val in list(entries[section].items()):
newhgrc.write('%s = %s\n' % (key, val))
newhgrc.write('\n')
# show changes
- with open(hgrcpath, 'rb') as hgrc:
- with open(newhgrcpath, 'rb') as newhgrc:
+ with open(hgrcpath, 'r') as hgrc:
+ with open(newhgrcpath, 'r') as newhgrc:
diff = tuple(unified_diff(hgrc.readlines(), newhgrc.readlines(),
hgrcpath, newhgrcpath))
for line in diff:
@@ 616,10 667,10 @@ class hgrepo(managedrepo):
def files(self, opts):
"""return managed files in the working directory"""
- u = self.ui.copy()
+ u = self.ui._ui.copy()
paths = []
u.write = paths.append
- commands.files(self.ui, self.repo, **opts)
+ commands.files(self.ui._ui, self.repo, **opts)
return paths
@@ 1,19 1,17 @@
-# -*- coding: utf-8 -*-
"This module contains useful stuff to play with repository specs"
import os
import errno
import codecs
from collections import defaultdict
-import urlparse
+import urllib.request, urllib.parse, urllib.error
import contextlib
-from itertools import izip
from mercurial import util, hg, error
from mercurial.config import config, _
-from hgcompat import compilere, sortdict
+from .hgcompat import compilere
-from meta import CONFMANENTRIES
+from .meta import CONFMANENTRIES
def ending(line):
"Return the newline character(s) of the line."
@@ 31,6 29,87 @@ def _compilere(pattern):
return compilere(pattern + '$')
+class WrappedRepo:
+
+ def __init__(self, repo):
+ if isinstance(repo, WrappedRepo):
+ repo = repo._repo
+ self._repo = repo
+
+ def __getattr__(self, name):
+ if name in ('root', 'path', 'sharedpath'):
+ thing = getattr(self._repo, name)
+ if isinstance(thing, bytes):
+ return thing.decode('utf-8')
+ return thing
+ return getattr(self._repo, name)
+
+ def __getitem__(self, name):
+ return self._repo[name]
+
+ def __len__(self):
+ return len(self._repo)
+
+
+class WrappedUI:
+
+ def __init__(self, ui):
+ if isinstance(ui, WrappedUI):
+ ui = ui._ui
+ self._ui = ui
+
+ def _output(self, meth, *a, label=b'', **k):
+ if label:
+ label = label.encode('utf-8')
+ return meth(
+ *(elt.encode('utf-8')
+ if isinstance(elt, str) else elt
+ for elt in a),
+ label=label,
+ **k
+ )
+
+ def write(self, *a, label=b'', **k):
+ return self._output(
+ self._ui.write,
+ *a, label=label, **k
+ )
+
+ def status(self, *a, label=b'', **k):
+ return self._output(
+ self._ui.status,
+ *a, label=label, **k
+ )
+
+ def warn(self, *a, label=b'', **k):
+ return self._output(
+ self._ui.warn,
+ *a, label=label, **k
+ )
+
+ def error(self, *a, label=b'', **k):
+ return self._output(
+ self._ui.error,
+ *a, label=label, **k
+ )
+
+ def configpath(self, *args, **kw):
+ args = (
+ elt.encode('utf-8')
+ if isinstance(elt, str) else elt
+ for elt in args
+ )
+ path = self._ui.configpath(*args, **kw)
+ if path is not None:
+ return path.decode('utf-8')
+
+ def write_bytes(self, *a, label=b'', **k):
+ return self._ui.write(*a, label=label, **k)
+
+ def __getattr__(self, name):
+ return getattr(self._ui, name)
+
+
class sectionfilter(object):
"Callable that returns True if the section is included, else False"
rewhitelist = ()
@@ 76,11 155,11 @@ class oconfig(object):
def __init__(self, orig=None, confman=None):
if orig is None:
- self._data = sortdict()
+ self._data = {}
self._source = {}
self._unset = []
else:
- self._data = sortdict(orig._data)
+ self._data = dict(orig._data)
self._source = orig._source.copy()
self._unset = orig._unset[:]
confman = orig.confman
@@ 102,7 181,7 @@ class oconfig(object):
def set(self, section, item, value, source=''):
if section not in self:
- self._data[section] = sortdict()
+ self._data[section] = {}
self._data[section][item] = value
if source:
self._source[(section, item)] = source
@@ 112,14 191,14 @@ class oconfig(object):
def sections(self):
"Return the list of section names."
- return self._data.keys()
+ return list(self._data.keys())
def save(self, hgrcpath):
"Write the .hg/hgrc of a managed repo."
- with open(hgrcpath, 'wb') as hgrc:
+ with open(hgrcpath, 'w') as hgrc:
for section in self:
hgrc.write('[%s]\n' % section)
- for k, v in self[section].iteritems():
+ for k, v in self[section].items():
hgrc.write('%s = %s\n' % (k, v))
hgrc.write('\n')
@@ 169,7 248,7 @@ class oconfig(object):
if include:
try:
include(inc, remap=remap, sections=sections)
- except IOError, inst:
+ except IOError as inst:
if inst.errno != errno.ENOENT:
raise error.ParseError(_("cannot include %s (%s)")
% (inc, inst.strerror),
@@ 188,7 267,7 @@ class oconfig(object):
self.confman.sectionlevels[section].add(level)
# /PATCH
if section not in self:
- self._data[section] = sortdict()
+ self._data[section] = {}
continue
# PATCH: filter section
@@ 217,7 296,7 @@ class oconfig(object):
try:
include(inc, remap=remap, sections=sections, level=level+1,
section_filter=_section_filter)
- except IOError, inst:
+ except IOError as inst:
if inst.errno != errno.ENOENT:
raise error.ParseError(_("cannot expand %s (%s)")
% (inc, inst.strerror),
@@ 242,7 321,10 @@ class oconfig(object):
self._unset.append((section, name))
continue
- raise error.ParseError(l.rstrip(), ("%s:%s" % (src, line)))
+ raise error.ParseError(
+ l.rstrip().encode('utf-8'),
+ ("%s:%s" % (src, line)).encode('utf-8')
+ )
def parse_guestrepo(self, dirpath, level=0, section_filter=None):
"Parse guestrepo files in dirpath"
@@ 252,20 334,32 @@ class oconfig(object):
section_filter = sectionfilter()
mappingpath = os.path.join(dirpath, '.hggrmapping')
mappingconf = config()
- mappingconf.read(mappingpath)
+ mappingconf.read(mappingpath.encode('utf-8'))
section = None
- for section in mappingconf['']:
+ for section in mappingconf[b'']:
if section_filter(section):
- self.set(section, 'pulluri', mappingconf[''][section])
- self.confman.sectionlevels[section].add(level)
+ self.set(
+ section.decode('utf-8'),
+ 'pulluri',
+ mappingconf[b''][section].decode('utf-8')
+ )
+ self.confman.sectionlevels[section.decode('utf-8')].add(level)
guestpath = os.path.join(dirpath, '.hgguestrepo')
guestconf = config()
- guestconf.read(guestpath)
- for layout in guestconf['']:
- section, cset = guestconf[''][layout].split(None, 1)
+ guestconf.read(guestpath.encode('utf-8'))
+ for layout in guestconf[b'']:
+ section, cset = guestconf[b''][layout].split(None, 1)
if section_filter(section):
- self.set(section, 'layout', layout)
- self.set(section, 'track', cset)
+ self.set(
+ section.decode('utf-8'),
+ 'layout',
+ layout.decode('utf-8')
+ )
+ self.set(
+ section.decode('utf-8'),
+ 'track',
+ cset.decode('utf-8')
+ )
def read(self, path, fp=None, sections=None, remap=None, **kwargs):
if os.path.exists(path):
@@ 305,10 399,10 @@ def findrootpath(ui, conffilename, start
def readconf(ui, repo, args, opts):
"Parse the configuration file into a config object."
# prevent cyclic imports
- import gr
- from configuration import configurationmanager
- for cmrootpath, grrootpath in izip(findrootpath(ui, '.hgconf', repo.root),
- findrootpath(ui, '.hgguestrepo', repo.root)):
+ from . import gr
+ from .configuration import configurationmanager
+ for cmrootpath, grrootpath in zip(findrootpath(ui, '.hgconf', repo.root),
+ findrootpath(ui, '.hgguestrepo', repo.root)):
if cmrootpath:
confman = configurationmanager(ui, cmrootpath, args, opts)
break
@@ 316,9 410,14 @@ def readconf(ui, repo, args, opts):
confman = gr.configurationmanager(ui, grrootpath, args, opts)
break
else:
- raise error.Abort('cannot find an .hgconf file in the path and '
- 'parents up to the root', hint='see hg help confman')
- return confman, hg.repository(ui, confman.rootpath)
+ raise error.Abort(
+ b'cannot find an .hgconf file in the path and '
+ b'parents up to the root',
+ hint=b'see hg help confman'
+ )
+ return confman, WrappedRepo(
+ hg.repository(ui, confman.rootpath.encode('utf-8'))
+ )
# dictionnaries operations
@@ 327,7 426,7 @@ def _unflatten(flattened, skipkeys=CONFM
hgrc.path.default-push -> {'hgrc': {'path': {'defaul-push': ...}}}
"""
nested = defaultdict(lambda: defaultdict(dict))
- for key, value in flattened.iteritems():
+ for key, value in flattened.items():
if key in skipkeys:
continue
try:
@@ 345,9 444,9 @@ def _unflatten(flattened, skipkeys=CONFM
@contextlib.contextmanager
def download_file(source):
"""Download file at ``source``. This function manage file:// scheme"""
- u = urlparse.urlparse(source)
+ u = urllib.parse.urlparse(source)
if u.scheme == 'file':
- with open(os.path.join(*source[7:].split('/'))) as fp:
+ with open(os.path.join(*source[7:].split('/')), 'rb') as fp:
yield fp
else:
import requests, tempfile
@@ 366,10 465,10 @@ def _treegraph_unicode_encode_handler(er
"""Unicode error handler for tree graph characters. Shall be given to
codecs.register_error."""
obj = error.object[error.start:error.end + 1]
- obj = obj.replace(u'\N{BOX DRAWINGS LIGHT VERTICAL}', u'|') # │
- obj = obj.replace(u'\N{BOX DRAWINGS LIGHT VERTICAL AND RIGHT}', u'|') # ├
- obj = obj.replace(u'\N{BOX DRAWINGS LIGHT UP AND RIGHT}', u'`') # â””
- obj = obj.replace(u'\N{RIGHTWARDS ARROW}', u'-') # →
+ obj = obj.replace('\N{BOX DRAWINGS LIGHT VERTICAL}', '|') # │
+ obj = obj.replace('\N{BOX DRAWINGS LIGHT VERTICAL AND RIGHT}', '|') # ├
+ obj = obj.replace('\N{BOX DRAWINGS LIGHT UP AND RIGHT}', '`') # â””
+ obj = obj.replace('\N{RIGHTWARDS ARROW}', '-') # →
return obj, error.end + 1
codecs.register_error('treegraph', _treegraph_unicode_encode_handler)
@@ 377,7 476,7 @@ codecs.register_error('treegraph', _tree
def _confman_unicode_encode_handler(error):
obj = error.object[error.start:error.end + 1]
- obj = obj.replace(u'\N{CHECK MARK}', u'ok')
- obj = obj.replace(u'\N{MARRIAGE SYMBOL}', u'[shared]')
+ obj = obj.replace('\N{CHECK MARK}', 'ok')
+ obj = obj.replace('\N{MARRIAGE SYMBOL}', '[shared]')
return obj, error.end
codecs.register_error('confman', _confman_unicode_encode_handler)
@@ 43,13 43,14 @@
# completes fairly quickly, includes both shell and Python scripts, and
# includes some scripts that run daemon processes.)
-from __future__ import absolute_import, print_function
-
+
+
+import argparse
+import collections
import difflib
import distutils.version as version
import errno
import json
-import optparse
import os
import random
import re
@@ 58,43 59,104 @@ import signal
import socket
import subprocess
import sys
-try:
- import sysconfig
-except ImportError:
- # sysconfig doesn't exist in Python 2.6
- sysconfig = None
+import sysconfig
import tempfile
import threading
import time
import unittest
import xml.dom.minidom as minidom
+import importlib
try:
- import Queue as queue
+ import queue as queue
except ImportError:
import queue
+try:
+ import shlex
+ shellquote = shlex.quote
+except (ImportError, AttributeError):
+ import pipes
+ shellquote = pipes.quote
+
if os.environ.get('RTUNICODEPEDANTRY', False):
try:
- reload(sys)
+ importlib.reload(sys)
sys.setdefaultencoding("undefined")
except NameError:
pass
+origenviron = os.environ.copy()
osenvironb = getattr(os, 'environb', os.environ)
+osenvironb[b'ENCODING'] = b'ascii'
processlock = threading.Lock()
+pygmentspresent = False
+# ANSI color is unsupported prior to Windows 10
+if os.name != 'nt':
+ try: # is pygments installed
+ import pygments
+ import pygments.lexers as lexers
+ import pygments.lexer as lexer
+ import pygments.formatters as formatters
+ import pygments.token as token
+ import pygments.style as style
+ pygmentspresent = True
+ difflexer = lexers.DiffLexer()
+ terminal256formatter = formatters.Terminal256Formatter()
+ except ImportError:
+ pass
+
+if pygmentspresent:
+ class TestRunnerStyle(style.Style):
+ default_style = ""
+ skipped = token.string_to_tokentype("Token.Generic.Skipped")
+ failed = token.string_to_tokentype("Token.Generic.Failed")
+ skippedname = token.string_to_tokentype("Token.Generic.SName")
+ failedname = token.string_to_tokentype("Token.Generic.FName")
+ styles = {
+ skipped: '#e5e5e5',
+ skippedname: '#00ffff',
+ failed: '#7f0000',
+ failedname: '#ff0000',
+ }
+
+ class TestRunnerLexer(lexer.RegexLexer):
+ testpattern = r'[\w-]+\.(t|py)( \(case [\w-]+\))?'
+ tokens = {
+ 'root': [
+ (r'^Skipped', token.Generic.Skipped, 'skipped'),
+ (r'^Failed ', token.Generic.Failed, 'failed'),
+ (r'^ERROR: ', token.Generic.Failed, 'failed'),
+ ],
+ 'skipped': [
+ (testpattern, token.Generic.SName),
+ (r':.*', token.Generic.Skipped),
+ ],
+ 'failed': [
+ (testpattern, token.Generic.FName),
+ (r'(:| ).*', token.Generic.Failed),
+ ]
+ }
+
+ runnerformatter = formatters.Terminal256Formatter(style=TestRunnerStyle)
+ runnerlexer = TestRunnerLexer()
+
if sys.version_info > (3, 5, 0):
PYTHON3 = True
xrange = range # we use xrange in one place, and we'd rather not use range
def _bytespath(p):
+ if p is None:
+ return p
return p.encode('utf-8')
def _strpath(p):
+ if p is None:
+ return p
return p.decode('utf-8')
elif sys.version_info >= (3, 0, 0):
- print('%s is only supported on Python 3.5+ and 2.6-2.7, not %s' %
+ print('%s is only supported on Python 3.5+ and 2.7, not %s' %
(sys.argv[0], '.'.join(str(v) for v in sys.version_info[:3])))
sys.exit(70) # EX_SOFTWARE from `man 3 sysexit`
else:
@@ 112,18 174,51 @@ else:
# For Windows support
wifexited = getattr(os, "WIFEXITED", lambda x: False)
-def checkportisavailable(port):
- """return true if a port seems free to bind on localhost"""
+# Whether to use IPv6
+def checksocketfamily(name, port=20058):
+ """return true if we can listen on localhost using family=name
+
+ name should be either 'AF_INET', or 'AF_INET6'.
+ port being used is okay - EADDRINUSE is considered as successful.
+ """
+ family = getattr(socket, name, None)
+ if family is None:
+ return False
try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s = socket.socket(family, socket.SOCK_STREAM)
s.bind(('localhost', port))
s.close()
return True
except socket.error as exc:
- if not exc.errno == errno.EADDRINUSE:
+ if exc.errno == errno.EADDRINUSE:
+ return True
+ elif exc.errno in (errno.EADDRNOTAVAIL, errno.EPROTONOSUPPORT):
+ return False
+ else:
raise
+ else:
return False
+# useipv6 will be set by parseargs
+useipv6 = None
+
+def checkportisavailable(port):
+ """return true if a port seems free to bind on localhost"""
+ if useipv6:
+ family = socket.AF_INET6
+ else:
+ family = socket.AF_INET
+ try:
+ s = socket.socket(family, socket.SOCK_STREAM)
+ s.bind(('localhost', port))
+ s.close()
+ return True
+ except socket.error as exc:
+ if exc.errno not in (errno.EADDRINUSE, errno.EADDRNOTAVAIL,
+ errno.EPROTONOSUPPORT):
+ raise
+ return False
+
closefds = os.name == 'posix'
def Popen4(cmd, wd, timeout, env=None):
processlock.acquire()
@@ 187,111 282,152 @@ def parselistfiles(files, listtype, warn
f.close()
return entries
+def parsettestcases(path):
+ """read a .t test file, return a set of test case names
+
+ If path does not exist, return an empty set.
+ """
+ cases = set()
+ try:
+ with open(path, 'rb') as f:
+ for l in f:
+ if l.startswith(b'#testcases '):
+ cases.update(l[11:].split())
+ except IOError as ex:
+ if ex.errno != errno.ENOENT:
+ raise
+ return cases
+
def getparser():
"""Obtain the OptionParser used by the CLI."""
- parser = optparse.OptionParser("%prog [options] [tests]")
-
- # keep these sorted
- parser.add_option("--blacklist", action="append",
+ parser = argparse.ArgumentParser(usage='%(prog)s [options] [tests]')
+
+ selection = parser.add_argument_group('Test Selection')
+ selection.add_argument('--allow-slow-tests', action='store_true',
+ help='allow extremely slow tests')
+ selection.add_argument("--blacklist", action="append",
help="skip tests listed in the specified blacklist file")
- parser.add_option("--whitelist", action="append",
+ selection.add_argument("--changed",
+ help="run tests that are changed in parent rev or working directory")
+ selection.add_argument("-k", "--keywords",
+ help="run tests matching keywords")
+ selection.add_argument("-r", "--retest", action="store_true",
+ help = "retest failed tests")
+ selection.add_argument("--test-list", action="append",
+ help="read tests to run from the specified file")
+ selection.add_argument("--whitelist", action="append",
help="always run tests listed in the specified whitelist file")
- parser.add_option("--changed", type="string",
- help="run tests that are changed in parent rev or working directory")
- parser.add_option("-C", "--annotate", action="store_true",
- help="output files annotated with coverage")
- parser.add_option("-c", "--cover", action="store_true",
- help="print a test coverage report")
- parser.add_option("-d", "--debug", action="store_true",
+ selection.add_argument('tests', metavar='TESTS', nargs='*',
+ help='Tests to run')
+
+ harness = parser.add_argument_group('Test Harness Behavior')
+ harness.add_argument('--bisect-repo',
+ metavar='bisect_repo',
+ help=("Path of a repo to bisect. Use together with "
+ "--known-good-rev"))
+ harness.add_argument("-d", "--debug", action="store_true",
help="debug mode: write output of test scripts to console"
" rather than capturing and diffing it (disables timeout)")
- parser.add_option("-f", "--first", action="store_true",
+ harness.add_argument("-f", "--first", action="store_true",
help="exit on the first test failure")
- parser.add_option("-H", "--htmlcov", action="store_true",
- help="create an HTML report of the coverage of the files")
- parser.add_option("-i", "--interactive", action="store_true",
+ harness.add_argument("-i", "--interactive", action="store_true",
help="prompt to accept changed output")
- parser.add_option("-j", "--jobs", type="int",
+ harness.add_argument("-j", "--jobs", type=int,
help="number of jobs to run in parallel"
" (default: $%s or %d)" % defaults['jobs'])
- parser.add_option("--keep-tmpdir", action="store_true",
+ harness.add_argument("--keep-tmpdir", action="store_true",
help="keep temporary directory after running tests")
- parser.add_option("-k", "--keywords",
- help="run tests matching keywords")
- parser.add_option("-l", "--local", action="store_true",
+ harness.add_argument('--known-good-rev',
+ metavar="known_good_rev",
+ help=("Automatically bisect any failures using this "
+ "revision as a known-good revision."))
+ harness.add_argument("--list-tests", action="store_true",
+ help="list tests instead of running them")
+ harness.add_argument("--loop", action="store_true",
+ help="loop tests repeatedly")
+ harness.add_argument('--random', action="store_true",
+ help='run tests in random order')
+ harness.add_argument("-p", "--port", type=int,
+ help="port on which servers should listen"
+ " (default: $%s or %d)" % defaults['port'])
+ harness.add_argument('--profile-runner', action='store_true',
+ help='run statprof on run-tests')
+ harness.add_argument("-R", "--restart", action="store_true",
+ help="restart at last error")
+ harness.add_argument("--runs-per-test", type=int, dest="runs_per_test",
+ help="run each test N times (default=1)", default=1)
+ harness.add_argument("--shell",
+ help="shell to use (default: $%s or %s)" % defaults['shell'])
+ harness.add_argument('--showchannels', action='store_true',
+ help='show scheduling channels')
+ harness.add_argument("--slowtimeout", type=int,
+ help="kill errant slow tests after SLOWTIMEOUT seconds"
+ " (default: $%s or %d)" % defaults['slowtimeout'])
+ harness.add_argument("-t", "--timeout", type=int,
+ help="kill errant tests after TIMEOUT seconds"
+ " (default: $%s or %d)" % defaults['timeout'])
+ harness.add_argument("--tmpdir",
+ help="run tests in the given temporary directory"
+ " (implies --keep-tmpdir)")
+ harness.add_argument("-v", "--verbose", action="store_true",
+ help="output verbose messages")
+
+ hgconf = parser.add_argument_group('Mercurial Configuration')
+ hgconf.add_argument("--chg", action="store_true",
+ help="install and use chg wrapper in place of hg")
+ hgconf.add_argument("--compiler",
+ help="compiler to build with")
+ hgconf.add_argument('--extra-config-opt', action="append", default=[],
+ help='set the given config opt in the test hgrc')
+ hgconf.add_argument("-l", "--local", action="store_true",
help="shortcut for --with-hg=<testdir>/../hg, "
"and --with-chg=<testdir>/../contrib/chg/chg if --chg is set")
- parser.add_option("--loop", action="store_true",
- help="loop tests repeatedly")
- parser.add_option("--runs-per-test", type="int", dest="runs_per_test",
- help="run each test N times (default=1)", default=1)
- parser.add_option("-n", "--nodiff", action="store_true",
- help="skip showing test changes")
- parser.add_option("-p", "--port", type="int",
- help="port on which servers should listen"
- " (default: $%s or %d)" % defaults['port'])
- parser.add_option("--compiler", type="string",
- help="compiler to build with")
- parser.add_option("--pure", action="store_true",
+ hgconf.add_argument("--ipv6", action="store_true",
+ help="prefer IPv6 to IPv4 for network related tests")
+ hgconf.add_argument("--pure", action="store_true",
help="use pure Python code instead of C extensions")
- parser.add_option("-R", "--restart", action="store_true",
- help="restart at last error")
- parser.add_option("-r", "--retest", action="store_true",
- help="retest failed tests")
- parser.add_option("-S", "--noskips", action="store_true",
- help="don't report skip tests verbosely")
- parser.add_option("--shell", type="string",
- help="shell to use (default: $%s or %s)" % defaults['shell'])
- parser.add_option("-t", "--timeout", type="int",
- help="kill errant tests after TIMEOUT seconds"
- " (default: $%s or %d)" % defaults['timeout'])
- parser.add_option("--slowtimeout", type="int",
- help="kill errant slow tests after SLOWTIMEOUT seconds"
- " (default: $%s or %d)" % defaults['slowtimeout'])
- parser.add_option("--time", action="store_true",
- help="time how long each test takes")
- parser.add_option("--json", action="store_true",
- help="store test result data in 'report.json' file")
- parser.add_option("--tmpdir", type="string",
- help="run tests in the given temporary directory"
- " (implies --keep-tmpdir)")
- parser.add_option("-v", "--verbose", action="store_true",
- help="output verbose messages")
- parser.add_option("--xunit", type="string",
- help="record xunit results at specified path")
- parser.add_option("--view", type="string",
- help="external diff viewer")
- parser.add_option("--with-hg", type="string",
+ hgconf.add_argument("-3", "--py3k-warnings", action="store_true",
+ help="enable Py3k warnings on Python 2.7+")
+ hgconf.add_argument("--with-chg", metavar="CHG",
+ help="use specified chg wrapper in place of hg")
+ hgconf.add_argument("--with-hg",
metavar="HG",
help="test using specified hg script rather than a "
"temporary installation")
- parser.add_option("--chg", action="store_true",
- help="install and use chg wrapper in place of hg")
- parser.add_option("--with-chg", metavar="CHG",
- help="use specified chg wrapper in place of hg")
- parser.add_option("-3", "--py3k-warnings", action="store_true",
- help="enable Py3k warnings on Python 2.6+")
# This option should be deleted once test-check-py3-compat.t and other
# Python 3 tests run with Python 3.
- parser.add_option("--with-python3", metavar="PYTHON3",
- help="Python 3 interpreter (if running under Python 2)"
- " (TEMPORARY)")
- parser.add_option('--extra-config-opt', action="append",
- help='set the given config opt in the test hgrc')
- parser.add_option('--random', action="store_true",
- help='run tests in random order')
- parser.add_option('--profile-runner', action='store_true',
- help='run statprof on run-tests')
- parser.add_option('--allow-slow-tests', action='store_true',
- help='allow extremely slow tests')
- parser.add_option('--showchannels', action='store_true',
- help='show scheduling channels')
- parser.add_option('--known-good-rev', type="string",
- metavar="known_good_rev",
- help=("Automatically bisect any failures using this "
- "revision as a known-good revision."))
-
- for option, (envvar, default) in defaults.items():
+ hgconf.add_argument("--with-python3", metavar="PYTHON3",
+ help="Python 3 interpreter (if running under Python 2)"
+ " (TEMPORARY)")
+
+ reporting = parser.add_argument_group('Results Reporting')
+ reporting.add_argument("-C", "--annotate", action="store_true",
+ help="output files annotated with coverage")
+ reporting.add_argument("--color", choices=["always", "auto", "never"],
+ default=os.environ.get('HGRUNTESTSCOLOR', 'auto'),
+ help="colorisation: always|auto|never (default: auto)")
+ reporting.add_argument("-c", "--cover", action="store_true",
+ help="print a test coverage report")
+ reporting.add_argument('--exceptions', action='store_true',
+ help='log all exceptions and generate an exception report')
+ reporting.add_argument("-H", "--htmlcov", action="store_true",
+ help="create an HTML report of the coverage of the files")
+ reporting.add_argument("--json", action="store_true",
+ help="store test result data in 'report.json' file")
+ reporting.add_argument("--outputdir",
+ help="directory to write error logs to (default=test directory)")
+ reporting.add_argument("-n", "--nodiff", action="store_true",
+ help="skip showing test changes")
+ reporting.add_argument("-S", "--noskips", action="store_true",
+ help="don't report skip tests verbosely")
+ reporting.add_argument("--time", action="store_true",
+ help="time how long each test takes")
+ reporting.add_argument("--view",
+ help="external diff viewer")
+ reporting.add_argument("--xunit",
+ help="record xunit results at specified path")
+
+ for option, (envvar, default) in list(defaults.items()):
defaults[option] = type(default)(os.environ.get(envvar, default))
parser.set_defaults(**defaults)
@@ 299,7 435,7 @@ def getparser():
def parseargs(args, parser):
"""Parse arguments with our OptionParser and validate results."""
- (options, args) = parser.parse_args(args)
+ options = parser.parse_args(args)
# jython is always pure
if 'java' in sys.platform or '__pypy__' in sys.modules:
@@ 310,7 446,7 @@ def parseargs(args, parser):
if not (os.path.isfile(options.with_hg) and
os.access(options.with_hg, os.X_OK)):
parser.error('--with-hg must specify an executable hg script')
- if not os.path.basename(options.with_hg) == b'hg':
+ if os.path.basename(options.with_hg) not in [b'hg', b'hg.exe']:
sys.stderr.write('warning: --with-hg should specify an hg script\n')
if options.local:
testdir = os.path.dirname(_bytespath(canonpath(sys.argv[0])))
@@ 338,6 474,21 @@ def parseargs(args, parser):
parser.error('--chg does not work when --with-hg is specified '
'(use --with-chg instead)')
+ if options.color == 'always' and not pygmentspresent:
+ sys.stderr.write('warning: --color=always ignored because '
+ 'pygments is not installed\n')
+
+ if options.bisect_repo and not options.known_good_rev:
+ parser.error("--bisect-repo cannot be used without --known-good-rev")
+
+ global useipv6
+ if options.ipv6:
+ useipv6 = checksocketfamily('AF_INET6')
+ else:
+ # only use IPv6 if IPv4 is unavailable and IPv6 is available
+ useipv6 = ((not checksocketfamily('AF_INET'))
+ and checksocketfamily('AF_INET6'))
+
options.anycoverage = options.cover or options.annotate or options.htmlcov
if options.anycoverage:
try:
@@ 380,7 531,7 @@ def parseargs(args, parser):
if options.py3k_warnings:
if PYTHON3:
parser.error(
- '--py3k-warnings can only be used on Python 2.6 and 2.7')
+ '--py3k-warnings can only be used on Python 2.7')
if options.with_python3:
if PYTHON3:
parser.error('--with-python3 cannot be used when executing with '
@@ 413,7 564,7 @@ def parseargs(args, parser):
if options.showchannels:
options.nodiff = True
- return (options, args)
+ return options
def rename(src, dst):
"""Like os.rename(), trade atomicity and opened files friendliness
@@ 454,6 605,12 @@ def vlog(*msg):
# sans \t, \n and \r
CDATA_EVIL = re.compile(br"[\000-\010\013\014\016-\037]")
+# Match feature conditionalized output lines in the form, capturing the feature
+# list in group 2, and the preceeding line output in group 1:
+#
+# output..output (feature !)\n
+optline = re.compile(b'(.*) \((.+?) !\)\n$')
+
def cdatasafe(data):
"""Make a string safe to include in a CDATA block.
@@ 477,17 634,30 @@ def log(*msg):
print()
sys.stdout.flush()
+def highlightdiff(line, color):
+ if not color:
+ return line
+ assert pygmentspresent
+ return pygments.highlight(line.decode('latin1'), difflexer,
+ terminal256formatter).encode('latin1')
+
+def highlightmsg(msg, color):
+ if not color:
+ return msg
+ assert pygmentspresent
+ return pygments.highlight(msg, runnerlexer, runnerformatter)
+
def terminate(proc):
- """Terminate subprocess (with fallback for Python versions < 2.6)"""
+ """Terminate subprocess"""
vlog('# Terminating process %d' % proc.pid)
try:
- getattr(proc, 'terminate', lambda : os.kill(proc.pid, signal.SIGTERM))()
+ proc.terminate()
except OSError:
pass
def killdaemons(pidfile):
- return _killdaemons(pidfile, tryhard=False, remove=True,
- logfn=vlog)
+ return killdaemons(pidfile, tryhard=False, remove=True,
+ logfn=vlog)
class Test(unittest.TestCase):
"""Encapsulates a single, runnable test.
@@ 500,12 670,14 @@ class Test(unittest.TestCase):
# Status code reserved for skipped tests (used by hghave).
SKIPPED_STATUS = 80
- def __init__(self, path, tmpdir, keeptmpdir=False,
+ def __init__(self, path, outputdir, tmpdir, keeptmpdir=False,
debug=False,
- timeout=defaults['timeout'],
- startport=defaults['port'], extraconfigopts=None,
+ first=False,
+ timeout=None,
+ startport=None, extraconfigopts=None,
py3kwarnings=False, shell=None, hgcommand=None,
- slowtimeout=defaults['slowtimeout'], usechg=False):
+ slowtimeout=None, usechg=False,
+ useipv6=False):
"""Create a test from parameters.
path is the full path to the file defining the test.
@@ 536,15 708,24 @@ class Test(unittest.TestCase):
shell is the shell to execute tests in.
"""
+ if timeout is None:
+ timeout = defaults['timeout']
+ if startport is None:
+ startport = defaults['port']
+ if slowtimeout is None:
+ slowtimeout = defaults['slowtimeout']
self.path = path
self.bname = os.path.basename(path)
self.name = _strpath(self.bname)
self._testdir = os.path.dirname(path)
- self.errpath = os.path.join(self._testdir, b'%s.err' % self.bname)
+ self._outputdir = outputdir
+ self._tmpname = os.path.basename(path)
+ self.errpath = os.path.join(self._outputdir, b'%s.err' % self.bname)
self._threadtmp = tmpdir
self._keeptmpdir = keeptmpdir
self._debug = debug
+ self._first = first
self._timeout = timeout
self._slowtimeout = slowtimeout
self._startport = startport
@@ 553,6 734,7 @@ class Test(unittest.TestCase):
self._shell = _bytespath(shell)
self._hgcommand = hgcommand or b'hg'
self._usechg = usechg
+ self._useipv6 = useipv6
self._aborted = False
self._daemonpids = []
@@ 563,16 745,19 @@ class Test(unittest.TestCase):
self._testtmp = None
self._chgsockdir = None
+ self._refout = self.readrefout()
+
+ def readrefout(self):
+ """read reference output"""
# If we're not in --debug mode and reference output file exists,
# check test output against it.
- if debug:
- self._refout = None # to match "out is None"
+ if self._debug:
+ return None # to match "out is None"
elif os.path.exists(self.refpath):
- f = open(self.refpath, 'rb')
- self._refout = f.read().splitlines(True)
- f.close()
+ with open(self.refpath, 'rb') as f:
+ return f.read().splitlines(True)
else:
- self._refout = []
+ return []
# needed to get base class __repr__ running
@property
@@ 598,7 783,7 @@ class Test(unittest.TestCase):
if e.errno != errno.EEXIST:
raise
- name = os.path.basename(self.path)
+ name = self._tmpname
self._testtmp = os.path.join(self._threadtmp, name)
os.mkdir(self._testtmp)
@@ 641,21 826,12 @@ class Test(unittest.TestCase):
except KeyboardInterrupt:
self._aborted = True
raise
- except SkipTest as e:
+ except unittest.SkipTest as e:
result.addSkip(self, str(e))
# The base class will have already counted this as a
# test we "ran", but we want to exclude skipped tests
# from those we count towards those run.
result.testsRun -= 1
- except IgnoreTest as e:
- result.addIgnore(self, str(e))
- # As with skips, ignores also should be excluded from
- # the number of tests executed.
- result.testsRun -= 1
- except WarnTest as e:
- result.addWarn(self, str(e))
- except ReportedTest as e:
- pass
except self.failureException as e:
# This differs from unittest in that we don't capture
# the stack trace. This is for historical reasons and
@@ 688,6 864,7 @@ class Test(unittest.TestCase):
This will return a tuple describing the result of the test.
"""
env = self._getenv()
+ self._genrestoreenv(env)
self._daemonpids.append(env['DAEMON_PIDS'])
self._createhgrc(env['HGRCPATH'])
@@ 719,24 896,27 @@ class Test(unittest.TestCase):
self.fail('hg have failed checking for %s' % failed[-1])
else:
self._skipped = True
- raise SkipTest(missing[-1])
+ raise unittest.SkipTest(missing[-1])
elif ret == 'timeout':
self.fail('timed out')
elif ret is False:
- raise WarnTest('no result code from test')
+ self.fail('no result code from test')
elif out != self._refout:
# Diff generation may rely on written .err file.
if (ret != 0 or out != self._refout) and not self._skipped \
and not self._debug:
- f = open(self.errpath, 'wb')
- for line in out:
- f.write(line)
- f.close()
+ with open(self.errpath, 'wb') as f:
+ for line in out:
+ f.write(line)
# The result object handles diff calculation for us.
- if self._result.addOutputMismatch(self, ret, out, self._refout):
- # change was accepted, skip failing
- return
+ with firstlock:
+ if self._result.addOutputMismatch(self, ret, out, self._refout):
+ # change was accepted, skip failing
+ return
+ if self._first:
+ global firsterror
+ firsterror = True
if ret:
msg = 'output changed and ' + describe(ret)
@@ 768,16 948,15 @@ class Test(unittest.TestCase):
if (self._ret != 0 or self._out != self._refout) and not self._skipped \
and not self._debug and self._out:
- f = open(self.errpath, 'wb')
- for line in self._out:
- f.write(line)
- f.close()
+ with open(self.errpath, 'wb') as f:
+ for line in self._out:
+ f.write(line)
vlog("# Ret was:", self._ret, '(%s)' % self.name)
def _run(self, env):
# This should be implemented in child classes to run tests.
- raise SkipTest('unknown test type')
+ raise unittest.SkipTest('unknown test type')
def abort(self):
"""Terminate execution of this test."""
@@ 799,11 978,20 @@ class Test(unittest.TestCase):
self._portmap(0),
self._portmap(1),
self._portmap(2),
- (br'(?m)^(saved backup bundle to .*\.hg)( \(glob\))?$',
- br'\1 (glob)'),
+ (br'([^0-9])%s' % re.escape(self._localip()), br'\1$LOCALIP'),
+ (br'\bHG_TXNID=TXN:[a-f0-9]{40}\b', br'HG_TXNID=TXN:$ID$'),
]
r.append((self._escapepath(self._testtmp), b'$TESTTMP'))
+ replacementfile = os.path.join(self._testdir, b'common-pattern.py')
+
+ if os.path.exists(replacementfile):
+ data = {}
+ with open(replacementfile, mode='rb') as source:
+ # the intermediate 'compile' step help with debugging
+ code = compile(source.read(), replacementfile, 'exec')
+ exec(code, data)
+ r.extend(data.get('substitutions', ()))
return r
def _escapepath(self, p):
@@ 816,18 1004,53 @@ class Test(unittest.TestCase):
else:
return re.escape(p)
+ def _localip(self):
+ if self._useipv6:
+ return b'::1'
+ else:
+ return b'127.0.0.1'
+
+ def _genrestoreenv(self, testenv):
+ """Generate a script that can be used by tests to restore the original
+ environment."""
+ # Put the restoreenv script inside self._threadtmp
+ scriptpath = os.path.join(self._threadtmp, b'restoreenv.sh')
+ testenv['HGTEST_RESTOREENV'] = scriptpath
+
+ # Only restore environment variable names that the shell allows
+ # us to export.
+ name_regex = re.compile('^[a-zA-Z][a-zA-Z0-9_]*$')
+
+ # Do not restore these variables; otherwise tests would fail.
+ reqnames = {'PYTHON', 'TESTDIR', 'TESTTMP'}
+
+ with open(scriptpath, 'w') as envf:
+ for name, value in list(origenviron.items()):
+ if not name_regex.match(name):
+ # Skip environment variables with unusual names not
+ # allowed by most shells.
+ continue
+ if name in reqnames:
+ continue
+ envf.write('%s=%s\n' % (name, shellquote(value)))
+
+ for name in testenv:
+ if name in origenviron or name in reqnames:
+ continue
+ envf.write('unset %s\n' % (name,))
+
def _getenv(self):
"""Obtain environment variables to use during test execution."""
def defineport(i):
offset = '' if i == 0 else '%s' % i
env["HGPORT%s" % offset] = '%s' % (self._startport + i)
env = os.environ.copy()
- if sysconfig is not None:
- env['PYTHONUSERBASE'] = sysconfig.get_config_var('userbase')
+ env['PYTHONUSERBASE'] = sysconfig.get_config_var('userbase') or ''
+ env['HGEMITWARNINGS'] = '1'
env['TESTTMP'] = self._testtmp
env['HOME'] = self._testtmp
# This number should match portneeded in _getport
- for port in xrange(3):
+ for port in range(3):
# This list should be parallel to _portmap in _getreplacements
defineport(port)
env["HGRCPATH"] = os.path.join(self._threadtmp, b'.hgrc')
@@ 838,6 1061,11 @@ class Test(unittest.TestCase):
env["HGUSER"] = "test"
env["HGENCODING"] = "ascii"
env["HGENCODINGMODE"] = "strict"
+ env['HGIPV6'] = str(int(self._useipv6))
+
+ # LOCALIP could be ::1 or 127.0.0.1. Useful for tests that require raw
+ # IP addresses.
+ env['LOCALIP'] = self._localip()
# Reset some environment variables to well-known values so that
# the tests produce repeatable output.
@@ 848,12 1076,13 @@ class Test(unittest.TestCase):
env['TERM'] = 'xterm'
for k in ('HG HGPROF CDPATH GREP_OPTIONS http_proxy no_proxy ' +
+ 'HGPLAIN HGPLAINEXCEPT EDITOR VISUAL PAGER ' +
'NO_PROXY CHGDEBUG').split():
if k in env:
del env[k]
# unset env related to hooks
- for k in env.keys():
+ for k in list(env.keys()):
if k.startswith('HG_'):
del env[k]
@@ 864,29 1093,31 @@ class Test(unittest.TestCase):
def _createhgrc(self, path):
"""Create an hgrc file for this test."""
- hgrc = open(path, 'wb')
- hgrc.write(b'[ui]\n')
- hgrc.write(b'slash = True\n')
- hgrc.write(b'interactive = False\n')
- hgrc.write(b'mergemarkers = detailed\n')
- hgrc.write(b'promptecho = True\n')
- hgrc.write(b'[defaults]\n')
- hgrc.write(b'backout = -d "0 0"\n')
- hgrc.write(b'commit = -d "0 0"\n')
- hgrc.write(b'shelve = --date "0 0"\n')
- hgrc.write(b'tag = -d "0 0"\n')
- hgrc.write(b'[devel]\n')
- hgrc.write(b'all-warnings = true\n')
- hgrc.write(b'[largefiles]\n')
- hgrc.write(b'usercache = %s\n' %
- (os.path.join(self._testtmp, b'.cache/largefiles')))
-
- for opt in self._extraconfigopts:
- section, key = opt.split('.', 1)
- assert '=' in key, ('extra config opt %s must '
- 'have an = for assignment' % opt)
- hgrc.write(b'[%s]\n%s\n' % (section, key))
- hgrc.close()
+ with open(path, 'wb') as hgrc:
+ hgrc.write(b'[ui]\n')
+ hgrc.write(b'slash = True\n')
+ hgrc.write(b'interactive = False\n')
+ hgrc.write(b'mergemarkers = detailed\n')
+ hgrc.write(b'promptecho = True\n')
+ hgrc.write(b'[defaults]\n')
+ hgrc.write(b'[devel]\n')
+ hgrc.write(b'all-warnings = true\n')
+ hgrc.write(b'default-date = 0 0\n')
+ hgrc.write(b'[largefiles]\n')
+ hgrc.write(b'usercache = %s\n' %
+ (os.path.join(self._testtmp, b'.cache/largefiles')))
+ hgrc.write(b'[lfs]\n')
+ hgrc.write(b'usercache = %s\n' %
+ (os.path.join(self._testtmp, b'.cache/lfs')))
+ hgrc.write(b'[web]\n')
+ hgrc.write(b'address = localhost\n')
+ hgrc.write(b'ipv6 = %s\n' % str(self._useipv6).encode('ascii'))
+
+ for opt in self._extraconfigopts:
+ section, key = opt.encode('utf-8').split(b'.', 1)
+ assert b'=' in key, ('extra config opt %s must '
+ 'have an = for assignment' % opt)
+ hgrc.write(b'[%s]\n%s\n' % (section, key))
def fail(self, msg):
# unittest differentiates between errored and failed.
@@ 972,7 1203,7 @@ checkcodeglobpats = [
re.compile(br'^pulling from \$TESTTMP/.*[^)]$'),
# Not all platforms have 127.0.0.1 as loopback (though most do),
# so we always glob that too.
- re.compile(br'.*127.0.0.1.*$'),
+ re.compile(br'.*\$LOCALIP.*$'),
]
bchr = chr
@@ 990,23 1221,38 @@ class TTest(Test):
ESCAPEMAP = dict((bchr(i), br'\x%02x' % i) for i in range(256))
ESCAPEMAP.update({b'\\': b'\\\\', b'\r': br'\r'})
+ def __init__(self, path, *args, **kwds):
+ # accept an extra "case" parameter
+ case = kwds.pop('case', None)
+ self._case = case
+ self._allcases = parsettestcases(path)
+ super(TTest, self).__init__(path, *args, **kwds)
+ if case:
+ self.name = '%s (case %s)' % (self.name, _strpath(case))
+ self.errpath = b'%s.%s.err' % (self.errpath[:-4], case)
+ self._tmpname += b'-%s' % case
+
@property
def refpath(self):
return os.path.join(self._testdir, self.bname)
def _run(self, env):
- f = open(self.path, 'rb')
- lines = f.readlines()
- f.close()
+ with open(self.path, 'rb') as f:
+ lines = f.readlines()
+
+ # .t file is both reference output and the test input, keep reference
+ # output updated with the the test input. This avoids some race
+ # conditions where the reference output does not match the actual test.
+ if self._refout is not None:
+ self._refout = lines
salt, script, after, expected = self._parsetest(lines)
# Write out the generated script.
fname = b'%s.sh' % self._testtmp
- f = open(fname, 'wb')
- for l in script:
- f.write(l)
- f.close()
+ with open(fname, 'wb') as f:
+ for l in script:
+ f.write(l)
cmd = b'%s "%s"' % (self._shell, fname)
vlog("# Running", cmd)
@@ 1041,10 1287,24 @@ class TTest(Test):
if ret != 0:
return False, stdout
- if 'slow' in reqs:
+ if b'slow' in reqs:
self._timeout = self._slowtimeout
return True, None
+ def _iftest(self, args):
+ # implements "#if"
+ reqs = []
+ for arg in args:
+ if arg.startswith(b'no-') and arg[3:] in self._allcases:
+ if arg[3:] == self._case:
+ return False
+ elif arg in self._allcases:
+ if arg != self._case:
+ return False
+ else:
+ reqs.append(arg)
+ return self._hghave(reqs)[0]
+
def _parsetest(self, lines):
# We generate a shell script which outputs unique markers to line
# up script results with our source. These markers include input
@@ 1082,6 1342,13 @@ class TTest(Test):
script.append(b'alias hg="%s"\n' % self._hgcommand)
if os.getenv('MSYSTEM'):
script.append(b'alias pwd="pwd -W"\n')
+ if self._case:
+ if isinstance(self._case, str):
+ quoted = shellquote(self._case)
+ else:
+ quoted = shellquote(self._case.decode('utf8')).encode('utf8')
+ script.append(b'TESTCASE=%s\n' % quoted)
+ script.append(b'export TESTCASE\n')
n = 0
for n, l in enumerate(lines):
@@ 1102,7 1369,7 @@ class TTest(Test):
after.setdefault(pos, []).append(' !!! invalid #if\n')
if skipping is not None:
after.setdefault(pos, []).append(' !!! nested #if\n')
- skipping = not self._hghave(lsplit[1:])[0]
+ skipping = not self._iftest(lsplit[1:])
after.setdefault(pos, []).append(l)
elif l.startswith(b'#else'):
if skipping is None:
@@ 1190,12 1457,9 @@ class TTest(Test):
while i < len(els):
el = els[i]
- r = TTest.linematch(el, lout)
+ r = self.linematch(el, lout)
if isinstance(r, str):
- if r == '+glob':
- lout = el[:-1] + ' (glob)\n'
- r = '' # Warn only this line.
- elif r == '-glob':
+ if r == '-glob':
lout = ''.join(el.rsplit(' (glob)', 1))
r = '' # Warn only this line.
elif r == "retry":
@@ 1208,8 1472,18 @@ class TTest(Test):
if r:
els.pop(i)
break
- if el and el.endswith(b" (?)\n"):
- optional.append(i)
+ if el:
+ if el.endswith(b" (?)\n"):
+ optional.append(i)
+ else:
+ m = optline.match(el)
+ if m:
+ conditions = [
+ c for c in m.group(2).split(b' ')]
+
+ if not self._iftest(conditions):
+ optional.append(i)
+
i += 1
if r:
@@ 1235,8 1509,17 @@ class TTest(Test):
# clean up any optional leftovers
while expected.get(pos, None):
el = expected[pos].pop(0)
- if el and not el.endswith(b" (?)\n"):
- break
+ if el:
+ if not el.endswith(b" (?)\n"):
+ m = optline.match(el)
+ if m:
+ conditions = [c for c in m.group(2).split(b' ')]
+
+ if self._iftest(conditions):
+ # Don't append as optional line
+ continue
+ else:
+ continue
postout.append(b' ' + el)
if lcmd:
@@ 1260,6 1543,7 @@ class TTest(Test):
@staticmethod
def rematch(el, l):
try:
+ el = b'(?:' + el + b')'
# use \Z to ensure that the regex matches to the end of the string
if os.name == 'nt':
return re.match(el + br'\r?\n\Z', l)
@@ 1280,7 1564,7 @@ class TTest(Test):
return True
return b'-glob'
return True
- el = el.replace(b'127.0.0.1', b'*')
+ el = el.replace(b'$LOCALIP', b'*')
i, n = 0, len(el)
res = b''
while i < n:
@@ 1299,8 1583,7 @@ class TTest(Test):
res += re.escape(c)
return TTest.rematch(res, l)
- @staticmethod
- def linematch(el, l):
+ def linematch(self, el, l):
retry = False
if el == l: # perfect match (fast)
return True
@@ 1308,6 1591,15 @@ class TTest(Test):
if el.endswith(b" (?)\n"):
retry = "retry"
el = el[:-5] + b"\n"
+ else:
+ m = optline.match(el)
+ if m:
+ conditions = [c for c in m.group(2).split(b' ')]
+
+ el = m.group(1) + b"\n"
+ if not self._iftest(conditions):
+ retry = "retry" # Not required by listed features
+
if el.endswith(b" (esc)\n"):
if PYTHON3:
el = el[:-7].decode('unicode_escape') + '\n'
@@ 1322,9 1614,11 @@ class TTest(Test):
# ignore '(glob)' added to l by 'replacements'
if l.endswith(b" (glob)\n"):
l = l[:-8] + b"\n"
- return TTest.globmatch(el[:-8], l)
- if os.altsep and l.replace(b'\\', b'/') == el:
- return b'+glob'
+ return TTest.globmatch(el[:-8], l) or retry
+ if os.altsep:
+ _l = l.replace(b'\\', b'/')
+ if el == _l or os.name == 'nt' and el[:-1] + b'\r\n' == _l:
+ return True
return retry
@staticmethod
@@ 1355,18 1649,8 @@ class TTest(Test):
return TTest.ESCAPESUB(TTest._escapef, s)
iolock = threading.RLock()
-
-class SkipTest(Exception):
- """Raised to indicate that a test is to be skipped."""
-
-class IgnoreTest(Exception):
- """Raised to indicate that a test is to be ignored."""
-
-class WarnTest(Exception):
- """Raised to indicate that a test warned."""
-
-class ReportedTest(Exception):
- """Raised to indicate that a test already reported."""
+firstlock = threading.RLock()
+firsterror = False
class TestResult(unittest._TextTestResult):
"""Holds results when executing via unittest."""
@@ 1386,17 1670,19 @@ class TestResult(unittest._TextTestResul
# sense to map it into skip some day.
self.ignored = []
- # We have a custom "warned" result that isn't present in any Python
- # unittest implementation. It is very similar to failed. It may make
- # sense to map it into fail some day.
- self.warned = []
-
self.times = []
self._firststarttime = None
# Data stored for the benefit of generating xunit reports.
self.successes = []
self.faildata = {}
+ if options.color == 'auto':
+ self.color = pygmentspresent and self.stream.isatty()
+ elif options.color == 'never':
+ self.color = False
+ else: # 'always', for testing purposes
+ self.color = pygmentspresent
+
def addFailure(self, test, reason):
self.failures.append((test, reason))
@@ 1408,7 1694,10 @@ class TestResult(unittest._TextTestResul
self.stream.write('t')
else:
if not self._options.nodiff:
- self.stream.write('\nERROR: %s output changed\n' % test)
+ self.stream.write('\n')
+ # Exclude the '\n' from highlighting to lex correctly
+ formatted = 'ERROR: %s output changed\n' % test
+ self.stream.write(highlightmsg(formatted, self.color))
self.stream.write('!')
self.stream.flush()
@@ 1445,22 1734,9 @@ class TestResult(unittest._TextTestResul
self.testsRun += 1
self.stream.flush()
- def addWarn(self, test, reason):
- self.warned.append((test, reason))
-
- if self._options.first:
- self.stop()
-
- with iolock:
- if self.showAll:
- self.stream.writeln('warned %s' % reason)
- else:
- self.stream.write('~')
- self.stream.flush()
-
def addOutputMismatch(self, test, ret, got, expected):
"""Record a mismatch in test output for a particular test."""
- if self.shouldStop:
+ if self.shouldStop or firsterror:
# don't print, some other test case already failed and
# printed, we're just stale and probably failed due to our
# temp dir getting cleaned up.
@@ 1482,13 1758,12 @@ class TestResult(unittest._TextTestResul
servefail, lines = getdiff(expected, got,
test.refpath, test.errpath)
if servefail:
- self.addFailure(
- test,
+ raise test.failureException(
'server failed to start (HGPORT=%s)' % test._startport)
- raise ReportedTest('server failed to start')
else:
self.stream.write('\n')
for line in lines:
+ line = highlightdiff(line, self.color)
if PYTHON3:
self.stream.flush()
self.stream.buffer.write(line)
@@ 1499,14 1774,19 @@ class TestResult(unittest._TextTestResul
# handle interactive prompt without releasing iolock
if self._options.interactive:
- self.stream.write('Accept this change? [n] ')
- answer = sys.stdin.readline().strip()
- if answer.lower() in ('y', 'yes'):
- if test.name.endswith('.t'):
- rename(test.errpath, test.path)
- else:
- rename(test.errpath, '%s.out' % test.path)
- accepted = True
+ if test.readrefout() != expected:
+ self.stream.write(
+ 'Reference output has changed (run again to prompt '
+ 'changes)')
+ else:
+ self.stream.write('Accept this change? [n] ')
+ answer = sys.stdin.readline().strip()
+ if answer.lower() in ('y', 'yes'):
+ if test.path.endswith(b'.t'):
+ rename(test.errpath, test.path)
+ else:
+ rename(test.errpath, '%s.out' % test.path)
+ accepted = True
if not accepted:
self.faildata[test.name] = b''.join(lines)
@@ 1600,13 1880,13 @@ class TestSuite(unittest.TestSuite):
def get():
num_tests[0] += 1
if getattr(test, 'should_reload', False):
- return self._loadtest(test.path, num_tests[0])
+ return self._loadtest(test, num_tests[0])
return test
if not os.path.exists(test.path):
result.addSkip(test, "Doesn't exist")
continue
- if not (self._whitelist and test.name in self._whitelist):
+ if not (self._whitelist and test.bname in self._whitelist):
if self._blacklist and test.bname in self._blacklist:
result.addSkip(test, 'blacklisted')
continue
@@ 1616,9 1896,8 @@ class TestSuite(unittest.TestSuite):
continue
if self._keywords:
- f = open(test.path, 'rb')
- t = f.read().lower() + test.bname.lower()
- f.close()
+ with open(test.path, 'rb') as f:
+ t = f.read().lower() + test.bname.lower()
ignored = False
for k in self._keywords.lower().split():
if k not in t:
@@ 1628,7 1907,7 @@ class TestSuite(unittest.TestSuite):
if ignored:
continue
- for _ in xrange(self._runs_per_test):
+ for _ in range(self._runs_per_test):
tests.append(get())
runtests = list(tests)
@@ 1642,6 1921,8 @@ class TestSuite(unittest.TestSuite):
if not v:
channel = n
break
+ else:
+ raise ValueError('Could not find output channel')
channels[channel] = "=" + test.name[5:].split(".")[0]
try:
test(result)
@@ 1651,10 1932,11 @@ class TestSuite(unittest.TestSuite):
except: # re-raises
done.put(('!', test, 'run-test raised an error, see traceback'))
raise
- try:
- channels[channel] = ''
- except IndexError:
- pass
+ finally:
+ try:
+ channels[channel] = ''
+ except IndexError:
+ pass
def stat():
count = 0
@@ 1670,7 1952,7 @@ class TestSuite(unittest.TestSuite):
with iolock:
sys.stdout.write(d + ' ')
sys.stdout.flush()
- for x in xrange(10):
+ for x in range(10):
if channels:
time.sleep(.1)
count += 1
@@ 1698,7 1980,7 @@ class TestSuite(unittest.TestSuite):
if getattr(test, 'should_reload', False):
num_tests[0] += 1
tests.append(
- self._loadtest(test.name, num_tests[0]))
+ self._loadtest(test, num_tests[0]))
else:
tests.append(test)
if self._jobs == 1:
@@ 1733,10 2015,10 @@ class TestSuite(unittest.TestSuite):
# alphabetically, while times for each test are listed from oldest to
# newest.
-def loadtimes(testdir):
+def loadtimes(outputdir):
times = []
try:
- with open(os.path.join(testdir, b'.testtimes-')) as fp:
+ with open(os.path.join(outputdir, b'.testtimes-')) as fp:
for line in fp:
ts = line.split()
times.append((ts[0], [float(t) for t in ts[1:]]))
@@ 1745,8 2027,8 @@ def loadtimes(testdir):
raise
return times
-def savetimes(testdir, result):
- saved = dict(loadtimes(testdir))
+def savetimes(outputdir, result):
+ saved = dict(loadtimes(outputdir))
maxruns = 5
skipped = set([str(t[0]) for t in result.skipped])
for tdata in result.times:
@@ 1757,11 2039,11 @@ def savetimes(testdir, result):
ts[:] = ts[-maxruns:]
fd, tmpname = tempfile.mkstemp(prefix=b'.testtimes',
- dir=testdir, text=True)
+ dir=outputdir, text=True)
with os.fdopen(fd, 'w') as fp:
for name, ts in sorted(saved.items()):
fp.write('%s %s\n' % (name, ' '.join(['%.3f' % (t,) for t in ts])))
- timepath = os.path.join(testdir, b'.testtimes')
+ timepath = os.path.join(outputdir, b'.testtimes')
try:
os.unlink(timepath)
except OSError:
@@ 1779,6 2061,25 @@ class TextTestRunner(unittest.TextTestRu
self._runner = runner
+ def listtests(self, test):
+ result = TestResult(self._runner.options, self.stream,
+ self.descriptions, 0)
+ test = sorted(test, key=lambda t: t.name)
+ for t in test:
+ print(t.name)
+ result.addSuccess(t)
+
+ if self._runner.options.xunit:
+ with open(self._runner.options.xunit, "wb") as xuf:
+ self._writexunit(result, xuf)
+
+ if self._runner.options.json:
+ jsonpath = os.path.join(self._runner._outputdir, b'report.json')
+ with open(jsonpath, 'w') as fp:
+ self._writejson(result, fp)
+
+ return result
+
def run(self, test):
result = TestResult(self._runner.options, self.stream,
self.descriptions, self.verbosity)
@@ 1786,7 2087,6 @@ class TextTestRunner(unittest.TextTestRu
test(result)
failed = len(result.failures)
- warned = len(result.warned)
skipped = len(result.skipped)
ignored = len(result.ignored)
@@ 1795,126 2095,94 @@ class TextTestRunner(unittest.TextTestRu
if not self._runner.options.noskips:
for test, msg in result.skipped:
- self.stream.writeln('Skipped %s: %s' % (test.name, msg))
- for test, msg in result.warned:
- self.stream.writeln('Warned %s: %s' % (test.name, msg))
+ formatted = 'Skipped %s: %s\n' % (test.name, msg)
+ self.stream.write(highlightmsg(formatted, result.color))
for test, msg in result.failures:
- self.stream.writeln('Failed %s: %s' % (test.name, msg))
+ formatted = 'Failed %s: %s\n' % (test.name, msg)
+ self.stream.write(highlightmsg(formatted, result.color))
for test, msg in result.errors:
self.stream.writeln('Errored %s: %s' % (test.name, msg))
if self._runner.options.xunit:
- with open(self._runner.options.xunit, 'wb') as xuf:
- timesd = dict((t[0], t[3]) for t in result.times)
- doc = minidom.Document()
- s = doc.createElement('testsuite')
- s.setAttribute('name', 'run-tests')
- s.setAttribute('tests', str(result.testsRun))
- s.setAttribute('errors', "0") # TODO
- s.setAttribute('failures', str(failed))
- s.setAttribute('skipped', str(skipped + ignored))
- doc.appendChild(s)
- for tc in result.successes:
- t = doc.createElement('testcase')
- t.setAttribute('name', tc.name)
- t.setAttribute('time', '%.3f' % timesd[tc.name])
- s.appendChild(t)
- for tc, err in sorted(result.faildata.items()):
- t = doc.createElement('testcase')
- t.setAttribute('name', tc)
- t.setAttribute('time', '%.3f' % timesd[tc])
- # createCDATASection expects a unicode or it will
- # convert using default conversion rules, which will
- # fail if string isn't ASCII.
- err = cdatasafe(err).decode('utf-8', 'replace')
- cd = doc.createCDATASection(err)
- t.appendChild(cd)
- s.appendChild(t)
- xuf.write(doc.toprettyxml(indent=' ', encoding='utf-8'))
+ with open(self._runner.options.xunit, "wb") as xuf:
+ self._writexunit(result, xuf)
if self._runner.options.json:
- jsonpath = os.path.join(self._runner._testdir, b'report.json')
+ jsonpath = os.path.join(self._runner._outputdir, b'report.json')
with open(jsonpath, 'w') as fp:
- timesd = {}
- for tdata in result.times:
- test = tdata[0]
- timesd[test] = tdata[1:]
-
- outcome = {}
- groups = [('success', ((tc, None)
- for tc in result.successes)),
- ('failure', result.failures),
- ('skip', result.skipped)]
- for res, testcases in groups:
- for tc, __ in testcases:
- if tc.name in timesd:
- diff = result.faildata.get(tc.name, b'')
- tres = {'result': res,
- 'time': ('%0.3f' % timesd[tc.name][2]),
- 'cuser': ('%0.3f' % timesd[tc.name][0]),
- 'csys': ('%0.3f' % timesd[tc.name][1]),
- 'start': ('%0.3f' % timesd[tc.name][3]),
- 'end': ('%0.3f' % timesd[tc.name][4]),
- 'diff': diff.decode('unicode_escape'),
- }
- else:
- # blacklisted test
- tres = {'result': res}
-
- outcome[tc.name] = tres
- jsonout = json.dumps(outcome, sort_keys=True, indent=4,
- separators=(',', ': '))
- fp.writelines(("testreport =", jsonout))
+ self._writejson(result, fp)
self._runner._checkhglib('Tested')
- savetimes(self._runner._testdir, result)
+ savetimes(self._runner._outputdir, result)
if failed and self._runner.options.known_good_rev:
- def nooutput(args):
- p = subprocess.Popen(args, stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE)
- p.stdout.read()
- p.wait()
- for test, msg in result.failures:
- nooutput(['hg', 'bisect', '--reset']),
- nooutput(['hg', 'bisect', '--bad', '.'])
- nooutput(['hg', 'bisect', '--good',
- self._runner.options.known_good_rev])
- # TODO: we probably need to forward some options
- # that alter hg's behavior inside the tests.
- rtc = '%s %s %s' % (sys.executable, sys.argv[0], test)
- sub = subprocess.Popen(['hg', 'bisect', '--command', rtc],
- stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE)
- data = sub.stdout.read()
- sub.wait()
- m = re.search(
- (r'\nThe first (?P<goodbad>bad|good) revision '
- r'is:\nchangeset: +\d+:(?P<node>[a-f0-9]+)\n.*\n'
- r'summary: +(?P<summary>[^\n]+)\n'),
- data, (re.MULTILINE | re.DOTALL))
- if m is None:
- self.stream.writeln(
- 'Failed to identify failure point for %s' % test)
- continue
- dat = m.groupdict()
- verb = 'broken' if dat['goodbad'] == 'bad' else 'fixed'
- self.stream.writeln(
- '%s %s by %s (%s)' % (
- test, verb, dat['node'], dat['summary']))
+ self._bisecttests(t for t, m in result.failures)
self.stream.writeln(
- '# Ran %d tests, %d skipped, %d warned, %d failed.'
- % (result.testsRun,
- skipped + ignored, warned, failed))
+ '# Ran %d tests, %d skipped, %d failed.'
+ % (result.testsRun, skipped + ignored, failed))
if failed:
self.stream.writeln('python hash seed: %s' %
os.environ['PYTHONHASHSEED'])
if self._runner.options.time:
self.printtimes(result.times)
+ if self._runner.options.exceptions:
+ exceptions = aggregateexceptions(
+ os.path.join(self._runner._outputdir, b'exceptions'))
+ total = sum(exceptions.values())
+
+ self.stream.writeln('Exceptions Report:')
+ self.stream.writeln('%d total from %d frames' %
+ (total, len(exceptions)))
+ for (frame, line, exc), count in exceptions.most_common():
+ self.stream.writeln('%d\t%s: %s' % (count, frame, exc))
+
+ self.stream.flush()
+
return result
+ def _bisecttests(self, tests):
+ bisectcmd = ['hg', 'bisect']
+ bisectrepo = self._runner.options.bisect_repo
+ if bisectrepo:
+ bisectcmd.extend(['-R', os.path.abspath(bisectrepo)])
+ def pread(args):
+ env = os.environ.copy()
+ env['HGPLAIN'] = '1'
+ p = subprocess.Popen(args, stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE, env=env)
+ data = p.stdout.read()
+ p.wait()
+ return data
+ for test in tests:
+ pread(bisectcmd + ['--reset']),
+ pread(bisectcmd + ['--bad', '.'])
+ pread(bisectcmd + ['--good', self._runner.options.known_good_rev])
+ # TODO: we probably need to forward more options
+ # that alter hg's behavior inside the tests.
+ opts = ''
+ withhg = self._runner.options.with_hg
+ if withhg:
+ opts += ' --with-hg=%s ' % shellquote(_strpath(withhg))
+ rtc = '%s %s %s %s' % (sys.executable, sys.argv[0], opts,
+ test)
+ data = pread(bisectcmd + ['--command', rtc])
+ m = re.search(
+ (br'\nThe first (?P<goodbad>bad|good) revision '
+ br'is:\nchangeset: +\d+:(?P<node>[a-f0-9]+)\n.*\n'
+ br'summary: +(?P<summary>[^\n]+)\n'),
+ data, (re.MULTILINE | re.DOTALL))
+ if m is None:
+ self.stream.writeln(
+ 'Failed to identify failure point for %s' % test)
+ continue
+ dat = m.groupdict()
+ verb = 'broken' if dat['goodbad'] == 'bad' else 'fixed'
+ self.stream.writeln(
+ '%s %s by %s (%s)' % (
+ test, verb, dat['node'], dat['summary']))
+
def printtimes(self, times):
# iolock held by run
self.stream.writeln('# Producing time report')
@@ 1927,6 2195,140 @@ class TextTestRunner(unittest.TextTestRu
cuser, csys, real, start, end = tdata[1:6]
self.stream.writeln(cols % (start, end, cuser, csys, real, test))
+ @staticmethod
+ def _writexunit(result, outf):
+ # See http://llg.cubic.org/docs/junit/ for a reference.
+ timesd = dict((t[0], t[3]) for t in result.times)
+ doc = minidom.Document()
+ s = doc.createElement('testsuite')
+ s.setAttribute('name', 'run-tests')
+ s.setAttribute('tests', str(result.testsRun))
+ s.setAttribute('errors', "0") # TODO
+ s.setAttribute('failures', str(len(result.failures)))
+ s.setAttribute('skipped', str(len(result.skipped) +
+ len(result.ignored)))
+ doc.appendChild(s)
+ for tc in result.successes:
+ t = doc.createElement('testcase')
+ t.setAttribute('name', tc.name)
+ tctime = timesd.get(tc.name)
+ if tctime is not None:
+ t.setAttribute('time', '%.3f' % tctime)
+ s.appendChild(t)
+ for tc, err in sorted(result.faildata.items()):
+ t = doc.createElement('testcase')
+ t.setAttribute('name', tc)
+ tctime = timesd.get(tc)
+ if tctime is not None:
+ t.setAttribute('time', '%.3f' % tctime)
+ # createCDATASection expects a unicode or it will
+ # convert using default conversion rules, which will
+ # fail if string isn't ASCII.
+ err = cdatasafe(err).decode('utf-8', 'replace')
+ cd = doc.createCDATASection(err)
+ # Use 'failure' here instead of 'error' to match errors = 0,
+ # failures = len(result.failures) in the testsuite element.
+ failelem = doc.createElement('failure')
+ failelem.setAttribute('message', 'output changed')
+ failelem.setAttribute('type', 'output-mismatch')
+ failelem.appendChild(cd)
+ t.appendChild(failelem)
+ s.appendChild(t)
+ for tc, message in result.skipped:
+ # According to the schema, 'skipped' has no attributes. So store
+ # the skip message as a text node instead.
+ t = doc.createElement('testcase')
+ t.setAttribute('name', tc.name)
+ binmessage = message.encode('utf-8')
+ message = cdatasafe(binmessage).decode('utf-8', 'replace')
+ cd = doc.createCDATASection(message)
+ skipelem = doc.createElement('skipped')
+ skipelem.appendChild(cd)
+ t.appendChild(skipelem)
+ s.appendChild(t)
+ outf.write(doc.toprettyxml(indent=' ', encoding='utf-8'))
+
+ @staticmethod
+ def _writejson(result, outf):
+ timesd = {}
+ for tdata in result.times:
+ test = tdata[0]
+ timesd[test] = tdata[1:]
+
+ outcome = {}
+ groups = [('success', ((tc, None)
+ for tc in result.successes)),
+ ('failure', result.failures),
+ ('skip', result.skipped)]
+ for res, testcases in groups:
+ for tc, __ in testcases:
+ if tc.name in timesd:
+ diff = result.faildata.get(tc.name, b'')
+ try:
+ diff = diff.decode('unicode_escape')
+ except UnicodeDecodeError as e:
+ diff = '%r decoding diff, sorry' % e
+ tres = {'result': res,
+ 'time': ('%0.3f' % timesd[tc.name][2]),
+ 'cuser': ('%0.3f' % timesd[tc.name][0]),
+ 'csys': ('%0.3f' % timesd[tc.name][1]),
+ 'start': ('%0.3f' % timesd[tc.name][3]),
+ 'end': ('%0.3f' % timesd[tc.name][4]),
+ 'diff': diff,
+ }
+ else:
+ # blacklisted test
+ tres = {'result': res}
+
+ outcome[tc.name] = tres
+ jsonout = json.dumps(outcome, sort_keys=True, indent=4,
+ separators=(',', ': '))
+ outf.writelines(("testreport =", jsonout))
+
+def sorttests(testdescs, shuffle=False):
+ """Do an in-place sort of tests."""
+ if shuffle:
+ random.shuffle(testdescs)
+ return
+
+ # keywords for slow tests
+ slow = {b'svn': 10,
+ b'cvs': 10,
+ b'hghave': 10,
+ b'largefiles-update': 10,
+ b'run-tests': 10,
+ b'corruption': 10,
+ b'race': 10,
+ b'i18n': 10,
+ b'check': 100,
+ b'gendoc': 100,
+ b'contrib-perf': 200,
+ }
+ perf = {}
+
+ def sortkey(f):
+ # run largest tests first, as they tend to take the longest
+ f = f['path']
+ try:
+ return perf[f]
+ except KeyError:
+ try:
+ val = -os.stat(f).st_size
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ perf[f] = -1e9 # file does not exist, tell early
+ return -1e9
+ for kw, mul in list(slow.items()):
+ if kw in f:
+ val *= mul
+ if f.endswith(b'.py'):
+ val /= 10.0
+ perf[f] = val / 1000.0
+ return perf[f]
+
+ testdescs.sort(key=sortkey)
+
class TestRunner(object):
"""Holds context for executing tests.
@@ 1935,7 2337,6 @@ class TestRunner(object):
# Programs required to run tests.
REQUIREDTOOLS = [
- os.path.basename(_bytespath(sys.executable)),
b'diff',
b'grep',
b'unzip',
@@ 1954,6 2355,7 @@ class TestRunner(object):
self.options = None
self._hgroot = None
self._testdir = None
+ self._outputdir = None
self._hgtmp = None
self._installdir = None
self._bindir = None
@@ 1971,18 2373,20 @@ class TestRunner(object):
oldmask = os.umask(0o22)
try:
parser = parser or getparser()
- options, args = parseargs(args, parser)
- # positional arguments are paths to test files to run, so
- # we make sure they're all bytestrings
- args = [_bytespath(a) for a in args]
+ options = parseargs(args, parser)
+ tests = [_bytespath(a) for a in options.tests]
+ if options.test_list is not None:
+ for listfile in options.test_list:
+ with open(listfile, 'rb') as f:
+ tests.extend(t for t in f.read().splitlines() if t)
self.options = options
self._checktools()
- tests = self.findtests(args)
+ testdescs = self.findtests(tests)
if options.profile_runner:
import statprof
statprof.start()
- result = self._run(tests)
+ result = self._run(testdescs)
if options.profile_runner:
statprof.stop()
statprof.display()
@@ 1991,52 2395,28 @@ class TestRunner(object):
finally:
os.umask(oldmask)
- def _run(self, tests):
- if self.options.random:
- random.shuffle(tests)
- else:
- # keywords for slow tests
- slow = {b'svn': 10,
- b'cvs': 10,
- b'hghave': 10,
- b'largefiles-update': 10,
- b'run-tests': 10,
- b'corruption': 10,
- b'race': 10,
- b'i18n': 10,
- b'check': 100,
- b'gendoc': 100,
- b'contrib-perf': 200,
- }
- perf = {}
- def sortkey(f):
- # run largest tests first, as they tend to take the longest
- try:
- return perf[f]
- except KeyError:
- try:
- val = -os.stat(f).st_size
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- perf[f] = -1e9 # file does not exist, tell early
- return -1e9
- for kw, mul in slow.items():
- if kw in f:
- val *= mul
- if f.endswith(b'.py'):
- val /= 10.0
- perf[f] = val / 1000.0
- return perf[f]
- tests.sort(key=sortkey)
+ def _run(self, testdescs):
+ sorttests(testdescs, shuffle=self.options.random)
self._testdir = osenvironb[b'TESTDIR'] = getattr(
os, 'getcwdb', os.getcwd)()
+ # assume all tests in same folder for now
+ if testdescs:
+ pathname = os.path.dirname(testdescs[0]['path'])
+ if pathname:
+ osenvironb[b'TESTDIR'] = os.path.join(osenvironb[b'TESTDIR'],
+ pathname)
+ if self.options.outputdir:
+ self._outputdir = canonpath(_bytespath(self.options.outputdir))
+ else:
+ self._outputdir = self._testdir
+ if testdescs and pathname:
+ self._outputdir = os.path.join(self._outputdir, pathname)
if 'PYTHONHASHSEED' not in os.environ:
- # use a NON-RANDOM python hash seed all the time
+ # use a random python hash seed all the time
# we do the randomness ourself to know what seed is used
- os.environ['PYTHONHASHSEED'] = str(666)
+ os.environ['PYTHONHASHSEED'] = str(random.getrandbits(32))
if self.options.tmpdir:
self.options.keep_tmpdir = True
@@ 2048,11 2428,6 @@ class TestRunner(object):
print("error: temp dir %r already exists" % tmpdir)
return 1
- # Automatically removing tmpdir sounds convenient, but could
- # really annoy anyone in the habit of using "--tmpdir=/tmp"
- # or "--tmpdir=$HOME".
- #vlog("# Removing temp dir", tmpdir)
- #shutil.rmtree(tmpdir)
os.makedirs(tmpdir)
else:
d = None
@@ 2074,12 2449,27 @@ class TestRunner(object):
self._tmpbindir = os.path.join(self._hgtmp, b'install', b'bin')
os.makedirs(self._tmpbindir)
- # This looks redundant with how Python initializes sys.path from
- # the location of the script being executed. Needed because the
- # "hg" specified by --with-hg is not the only Python script
- # executed in the test suite that needs to import 'mercurial'
- # ... which means it's not really redundant at all.
- self._pythondir = self._bindir
+ normbin = os.path.normpath(os.path.abspath(whg))
+ normbin = normbin.replace(os.sep.encode('ascii'), b'/')
+
+ # Other Python scripts in the test harness need to
+ # `import mercurial`. If `hg` is a Python script, we assume
+ # the Mercurial modules are relative to its path and tell the tests
+ # to load Python modules from its directory.
+ with open(whg, 'rb') as fh:
+ initial = fh.read(1024)
+
+ if re.match(b'#!.*python', initial):
+ self._pythondir = self._bindir
+ # If it looks like our in-repo Rust binary, use the source root.
+ # This is a bit hacky. But rhg is still not supported outside the
+ # source directory. So until it is, do the simple thing.
+ elif re.search(b'/rust/target/[^/]+/hg', normbin):
+ self._pythondir = os.path.dirname(self._testdir)
+ # Fall back to the legacy behavior.
+ else:
+ self._pythondir = self._bindir
+
else:
self._installdir = os.path.join(self._hgtmp, b"install")
self._bindir = os.path.join(self._installdir, b"bin")
@@ 2151,14 2541,32 @@ class TestRunner(object):
self._coveragefile = os.path.join(self._testdir, b'.coverage')
+ if self.options.exceptions:
+ exceptionsdir = os.path.join(self._outputdir, b'exceptions')
+ try:
+ os.makedirs(exceptionsdir)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ # Remove all existing exception reports.
+ for f in os.listdir(exceptionsdir):
+ os.unlink(os.path.join(exceptionsdir, f))
+
+ osenvironb[b'HGEXCEPTIONSDIR'] = exceptionsdir
+ logexceptions = os.path.join(self._testdir, b'logexceptions.py')
+ self.options.extra_config_opt.append(
+ 'extensions.logexceptions=%s' % logexceptions.decode('utf-8'))
+
vlog("# Using TESTDIR", self._testdir)
vlog("# Using RUNTESTDIR", osenvironb[b'RUNTESTDIR'])
vlog("# Using HGTMP", self._hgtmp)
vlog("# Using PATH", os.environ["PATH"])
vlog("# Using", IMPL_PATH, osenvironb[IMPL_PATH])
+ vlog("# Writing to directory", self._outputdir)
try:
- return self._runtests(tests) or 0
+ return self._runtests(testdescs) or 0
finally:
time.sleep(.1)
self._cleanup()
@@ 2178,35 2586,62 @@ class TestRunner(object):
else:
args = os.listdir(b'.')
- return [t for t in args
- if os.path.basename(t).startswith(b'test-')
- and (t.endswith(b'.py') or t.endswith(b'.t'))]
-
- def _runtests(self, tests):
- try:
- if self._installdir:
- self._installhg()
- self._checkhglib("Testing")
+ expanded_args = []
+ for arg in args:
+ if os.path.isdir(arg):
+ if not arg.endswith(b'/'):
+ arg += b'/'
+ expanded_args.extend([arg + a for a in os.listdir(arg)])
+ else:
+ expanded_args.append(arg)
+ args = expanded_args
+
+ tests = []
+ for t in args:
+ if not (os.path.basename(t).startswith(b'test-')
+ and (t.endswith(b'.py') or t.endswith(b'.t'))):
+ continue
+ if t.endswith(b'.t'):
+ # .t file may contain multiple test cases
+ cases = sorted(parsettestcases(t))
+ if cases:
+ tests += [{'path': t, 'case': c} for c in sorted(cases)]
+ else:
+ tests.append({'path': t})
else:
- self._usecorrectpython()
- if self.options.chg:
- assert self._installdir
- self._installchg()
-
+ tests.append({'path': t})
+ return tests
+
+ def _runtests(self, testdescs):
+ def _reloadtest(test, i):
+ # convert a test back to its description dict
+ desc = {'path': test.path}
+ case = getattr(test, '_case', None)
+ if case:
+ desc['case'] = case
+ return self._gettest(desc, i)
+
+ try:
if self.options.restart:
- orig = list(tests)
- while tests:
- if os.path.exists(tests[0] + ".err"):
+ orig = list(testdescs)
+ while testdescs:
+ desc = testdescs[0]
+ # desc['path'] is a relative path
+ if 'case' in desc:
+ errpath = b'%s.%s.err' % (desc['path'], desc['case'])
+ else:
+ errpath = b'%s.err' % desc['path']
+ errpath = os.path.join(self._outputdir, errpath)
+ if os.path.exists(errpath):
break
- tests.pop(0)
- if not tests:
+ testdescs.pop(0)
+ if not testdescs:
print("running all tests")
- tests = orig
-
- tests = [self._gettest(t, i) for i, t in enumerate(tests)]
+ testdescs = orig
+
+ tests = [self._gettest(d, i) for i, d in enumerate(testdescs)]
failed = False
- warned = False
kws = self.options.keywords
if kws is not None and PYTHON3:
kws = kws.encode('utf-8')
@@ 2220,17 2655,28 @@ class TestRunner(object):
loop=self.options.loop,
runs_per_test=self.options.runs_per_test,
showchannels=self.options.showchannels,
- tests=tests, loadtest=self._gettest)
+ tests=tests, loadtest=_reloadtest)
verbosity = 1
if self.options.verbose:
verbosity = 2
runner = TextTestRunner(self, verbosity=verbosity)
- result = runner.run(suite)
+
+ if self.options.list_tests:
+ result = runner.listtests(suite)
+ else:
+ if self._installdir:
+ self._installhg()
+ self._checkhglib("Testing")
+ else:
+ self._usecorrectpython()
+ if self.options.chg:
+ assert self._installdir
+ self._installchg()
+
+ result = runner.run(suite)
if result.failures:
failed = True
- if result.warned:
- warned = True
if self.options.anycoverage:
self._outputcoverage()
@@ 2240,18 2686,16 @@ class TestRunner(object):
if failed:
return 1
- if warned:
- return 80
def _getport(self, count):
port = self._ports.get(count) # do we have a cached entry?
if port is None:
portneeded = 3
# above 100 tries we just give up and let test reports failure
- for tries in xrange(100):
+ for tries in range(100):
allfree = True
port = self.options.port + self._portoffset
- for idx in xrange(portneeded):
+ for idx in range(portneeded):
if not checkportisavailable(port + idx):
allfree = False
break
@@ 2261,13 2705,14 @@ class TestRunner(object):
self._ports[count] = port
return port
- def _gettest(self, test, count):
+ def _gettest(self, testdesc, count):
"""Obtain a Test by looking at its filename.
Returns a Test instance. The Test may not be runnable if it doesn't
map to a known type.
"""
- lctest = test.lower()
+ path = testdesc['path']
+ lctest = path.lower()
testcls = Test
for ext, cls in self.TESTTYPES:
@@ 2275,19 2720,24 @@ class TestRunner(object):
testcls = cls
break
- refpath = os.path.join(self._testdir, test)
+ refpath = os.path.join(self._testdir, path)
tmpdir = os.path.join(self._hgtmp, b'child%d' % count)
- t = testcls(refpath, tmpdir,
+ # extra keyword parameters. 'case' is used by .t tests
+ kwds = dict((k, testdesc[k]) for k in ['case'] if k in testdesc)
+
+ t = testcls(refpath, self._outputdir, tmpdir,
keeptmpdir=self.options.keep_tmpdir,
debug=self.options.debug,
+ first=self.options.first,
timeout=self.options.timeout,
startport=self._getport(count),
extraconfigopts=self.options.extra_config_opt,
py3kwarnings=self.options.py3k_warnings,
shell=self.options.shell,
hgcommand=self._hgcommand,
- usechg=bool(self.options.with_chg or self.options.chg))