#!/usr/bin/env python3 # TODO # -v --verbose # --color # --gdb # --reentrant import toml import glob import re import os import io import itertools as it import collections.abc as abc import subprocess as sp import base64 import sys import copy TEST_DIR = 'tests_' RULES = """ define FLATTEN %$(subst /,.,$(target:.c=.t.c)): $(target) cat <(echo '#line 1 "$$<"') $$< > $$@ endef $(foreach target,$(SRC),$(eval $(FLATTEN))) -include tests_/*.d %.c: %.t.c ./scripts/explode_asserts.py $< -o $@ %.test: %.test.o $(foreach f,$(subst /,.,$(SRC:.c=.o)),%.test.$f) $(CC) $(CFLAGS) $^ $(LFLAGS) -o $@ """ GLOBALS = """ //////////////// AUTOGENERATED TEST //////////////// #include "lfs.h" #include "emubd/lfs_emubd.h" #include """ DEFINES = { "LFS_READ_SIZE": 16, "LFS_PROG_SIZE": "LFS_READ_SIZE", "LFS_BLOCK_SIZE": 512, "LFS_BLOCK_COUNT": 1024, "LFS_BLOCK_CYCLES": 1024, "LFS_CACHE_SIZE": "(64 % LFS_PROG_SIZE == 0 ? 64 : LFS_PROG_SIZE)", "LFS_LOOKAHEAD_SIZE": 16, } PROLOGUE = """ // prologue __attribute__((unused)) lfs_t lfs; __attribute__((unused)) lfs_emubd_t bd; __attribute__((unused)) lfs_file_t file; __attribute__((unused)) lfs_dir_t dir; __attribute__((unused)) struct lfs_info info; __attribute__((unused)) uint8_t buffer[1024]; __attribute__((unused)) char path[1024]; __attribute__((unused)) const struct lfs_config cfg = { .context = &bd, .read = &lfs_emubd_read, .prog = &lfs_emubd_prog, .erase = &lfs_emubd_erase, .sync = &lfs_emubd_sync, .read_size = LFS_READ_SIZE, .prog_size = LFS_PROG_SIZE, .block_size = LFS_BLOCK_SIZE, .block_count = LFS_BLOCK_COUNT, .block_cycles = LFS_BLOCK_CYCLES, .cache_size = LFS_CACHE_SIZE, .lookahead_size = LFS_LOOKAHEAD_SIZE, }; lfs_emubd_create(&cfg, "blocks"); """ EPILOGUE = """ // epilogue lfs_emubd_destroy(&cfg); """ PASS = '\033[32m✓\033[0m' FAIL = '\033[31m✗\033[0m' class TestFailure(Exception): def __init__(self, case, stdout=None, assert_=None): self.case = case self.stdout = stdout self.assert_ = assert_ class TestCase: def __init__(self, suite, config, caseno=None, lineno=None, **_): self.suite = suite self.caseno = caseno self.lineno = lineno self.code = config['code'] self.defines = config.get('define', {}) self.leaky = config.get('leaky', False) def __str__(self): if hasattr(self, 'permno'): return '%s[%d,%d]' % (self.suite.name, self.caseno, self.permno) else: return '%s[%d]' % (self.suite.name, self.caseno) def permute(self, defines, permno=None, **_): ncase = copy.copy(self) ncase.case = self ncase.perms = [ncase] ncase.permno = permno ncase.defines = defines return ncase def build(self, f, **_): # prologue f.write('void test_case%d(' % self.caseno) defines = self.perms[0].defines first = True for k, v in sorted(defines.items()): if not all(perm.defines[k] == v for perm in self.perms): if not first: f.write(',') else: first = False f.write('\n') f.write(8*' '+'int %s' % k) f.write(') {\n') defines = self.perms[0].defines for k, v in sorted(defines.items()): if all(perm.defines[k] == v for perm in self.perms): f.write(4*' '+'#define %s %s\n' % (k, v)) f.write(PROLOGUE) f.write('\n') f.write(4*' '+'// test case %d\n' % self.caseno) f.write(4*' '+'#line %d "%s"\n' % (self.lineno, self.suite.path)) # test case goes here f.write(self.code) # epilogue f.write(EPILOGUE) f.write('\n') defines = self.perms[0].defines for k, v in sorted(defines.items()): if all(perm.defines[k] == v for perm in self.perms): f.write(4*' '+'#undef %s\n' % k) f.write('}\n') def test(self, **args): cmd = ['./%s.test' % self.suite.path, repr(self.caseno), repr(self.permno)] # run in valgrind? if args.get('valgrind', False) and not self.leaky: cmd = ['valgrind', '--leak-check=full', '--error-exitcode=4', '-q'] + cmd # run test case! stdout = [] if args.get('verbose', False): print(' '.join(cmd)) proc = sp.Popen(cmd, universal_newlines=True, bufsize=1, stdout=sp.PIPE, stderr=sp.STDOUT) for line in iter(proc.stdout.readline, ''): stdout.append(line) if args.get('verbose', False): sys.stdout.write(line) proc.wait() if proc.returncode != 0: # failed, try to parse assert? assert_ = None for line in stdout: try: m = re.match('^([^:\\n]+):([0-9]+):assert: (.*)$', line) # found an assert, print info from file with open(m.group(1)) as f: lineno = int(m.group(2)) line = next(it.islice(f, lineno-1, None)).strip('\n') assert_ = { 'path': m.group(1), 'lineno': lineno, 'line': line, 'message': m.group(3), } except: pass self.result = TestFailure(self, stdout, assert_) raise self.result else: self.result = PASS return self.result class TestSuite: def __init__(self, path, TestCase=TestCase, **args): self.name = os.path.basename(path) if self.name.endswith('.toml'): self.name = self.name[:-len('.toml')] self.path = path self.TestCase = TestCase with open(path) as f: # load tests config = toml.load(f) # find line numbers f.seek(0) linenos = [] for i, line in enumerate(f): if re.match(r'^\s*code\s*=\s*(\'\'\'|""")', line): linenos.append(i + 2) # grab global config self.defines = config.get('define', {}) # create initial test cases self.cases = [] for i, (case, lineno) in enumerate(zip(config['case'], linenos)): self.cases.append(self.TestCase( self, case, caseno=i, lineno=lineno, **args)) def __str__(self): return self.name def __lt__(self, other): return self.name < other.name def permute(self, defines={}, **args): for case in self.cases: # lets find all parameterized definitions, in one of # - args.D (defines) # - suite.defines # - case.defines # - DEFINES initial = {} for define in it.chain( defines.items(), self.defines.items(), case.defines.items(), DEFINES.items()): if define[0] not in initial: try: initial[define[0]] = eval(define[1]) except: initial[define[0]] = define[1] # expand permutations expanded = [] pending = [initial] while pending: perm = pending.pop() for k, v in sorted(perm.items()): if not isinstance(v, str) and isinstance(v, abc.Iterable): for nv in reversed(v): nperm = perm.copy() nperm[k] = nv pending.append(nperm) break else: expanded.append(perm) case.perms = [] for i, defines in enumerate(expanded): case.perms.append(case.permute(defines, permno=i, **args)) self.perms = [perm for case in self.cases for perm in case.perms] return self.perms def build(self, **args): # build test.c f = io.StringIO() f.write(GLOBALS) for case in self.cases: f.write('\n') case.build(f, **args) f.write('\n') f.write('int main(int argc, char **argv) {\n') f.write(4*' '+'int case_ = (argc == 3) ? atoi(argv[1]) : 0;\n') f.write(4*' '+'int perm = (argc == 3) ? atoi(argv[2]) : 0;\n') for perm in self.perms: f.write(4*' '+'if (argc != 3 || ' '(case_ == %d && perm == %d)) { ' % ( perm.caseno, perm.permno)) f.write('test_case%d(' % perm.caseno) first = True for k, v in sorted(perm.defines.items()): if not all(perm.defines[k] == v for perm in perm.case.perms): if not first: f.write(', ') else: first = False f.write(str(v)) f.write('); }\n') f.write('}\n') # add test-related rules rules = RULES rules = rules.replace(' ', '\t') with open(self.path + '.test.mk', 'w') as mk: mk.write(rules) mk.write('\n') mk.write('%s: %s\n' % (self.path+'.test.t.c', self.path)) mk.write('\tbase64 -d <<< ') mk.write(base64.b64encode( f.getvalue().encode('utf8')).decode('utf8')) mk.write(' > $@\n') self.makefile = self.path + '.test.mk' self.target = self.path + '.test' return self.makefile, self.target def test(self, caseno=None, permno=None, **args): # run test suite! if not args.get('verbose', True): sys.stdout.write(self.name + ' ') sys.stdout.flush() for perm in self.perms: if caseno is not None and perm.caseno != caseno: continue if permno is not None and perm.permno != permno: continue try: perm.test(**args) except TestFailure as failure: if not args.get('verbose', True): sys.stdout.write(FAIL) sys.stdout.flush() if not args.get('keep_going', False): if not args.get('verbose', True): sys.stdout.write('\n') raise else: if not args.get('verbose', True): sys.stdout.write(PASS) sys.stdout.flush() if not args.get('verbose', True): sys.stdout.write('\n') def main(**args): testpath = args['testpath'] # optional brackets for specific test m = re.search(r'\[(\d+)(?:,(\d+))?\]$', testpath) if m: caseno = int(m.group(1)) permno = int(m.group(2)) if m.group(2) is not None else None testpath = testpath[:m.start()] else: caseno = None permno = None # figure out the suite's toml file if os.path.isdir(testpath): testpath = testpath + '/test_*.toml' elif os.path.isfile(testpath): testpath = testpath elif testpath.endswith('.toml'): testpath = TEST_DIR + '/' + testpath else: testpath = TEST_DIR + '/' + testpath + '.toml' # find tests suites = [] for path in glob.glob(testpath): suites.append(TestSuite(path, **args)) # sort for reproducability suites = sorted(suites) # generate permutations defines = {} for define in args['D']: k, v, *_ = define.split('=', 2) + [''] defines[k] = v for suite in suites: suite.permute(defines, **args) # build tests in parallel print('====== building ======') makefiles = [] targets = [] for suite in suites: makefile, target = suite.build(**args) makefiles.append(makefile) targets.append(target) cmd = (['make', '-f', 'Makefile'] + list(it.chain.from_iterable(['-f', m] for m in makefiles)) + ['CFLAGS+=-fdiagnostics-color=always'] + [target for target in targets]) stdout = [] if args.get('verbose', False): print(' '.join(cmd)) proc = sp.Popen(cmd, universal_newlines=True, bufsize=1, stdout=sp.PIPE, stderr=sp.STDOUT) for line in iter(proc.stdout.readline, ''): stdout.append(line) if args.get('verbose', False): sys.stdout.write(line) proc.wait() if proc.returncode != 0: if not args.get('verbose', False): for line in stdout: sys.stdout.write(line) sys.exit(-3) print('built %d test suites, %d test cases, %d permutations' % ( len(suites), sum(len(suite.cases) for suite in suites), sum(len(suite.perms) for suite in suites))) print('====== testing ======') try: for suite in suites: suite.test(caseno, permno, **args) except TestFailure: pass print('====== results ======') passed = 0 failed = 0 for suite in suites: for perm in suite.perms: if not hasattr(perm, 'result'): continue if perm.result == PASS: passed += 1 else: sys.stdout.write("--- %s ---\n" % perm) if perm.result.assert_: for line in perm.result.stdout[:-1]: sys.stdout.write(line) sys.stdout.write( "\033[97m{path}:{lineno}:\033[91massert:\033[0m " "{message}\n{line}\n".format( **perm.result.assert_)) else: for line in perm.result.stdout: sys.stdout.write(line) sys.stdout.write('\n') failed += 1 print('tests passed: %d' % passed) print('tests failed: %d' % failed) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Run parameterized tests in various configurations.") parser.add_argument('testpath', nargs='?', default=TEST_DIR, help="Description of test(s) to run. By default, this is all tests \ found in the \"{0}\" directory. Here, you can specify a different \ directory of tests, a specific file, a suite by name, and even a \ specific test case by adding brackets. For example \ \"test_dirs[0]\" or \"{0}/test_dirs.toml[0]\".".format(TEST_DIR)) parser.add_argument('-D', action='append', default=[], help="Overriding parameter definitions.") parser.add_argument('-v', '--verbose', action='store_true', help="Output everything that is happening.") parser.add_argument('-k', '--keep-going', action='store_true', help="Run all tests instead of stopping on first error. Useful for CI.") # parser.add_argument('--gdb', action='store_true', # help="Run tests under gdb. Useful for debugging failures.") parser.add_argument('--valgrind', action='store_true', help="Run non-leaky tests under valgrind to check for memory leaks. \ Tests marked as \"leaky = true\" run normally.") main(**vars(parser.parse_args()))