mirror of
				https://github.com/eledio-devices/thirdparty-littlefs.git
				synced 2025-10-31 00:32:38 +01:00 
			
		
		
		
	The idea behind emubd (file per block), was neat, but doesn't add much value over a block device that just operates on a single linear file (other than adding a significant amount of overhead). Initially it helped with debugging, but when the metadata format became more complex in v2, most debugging ends up going through the debug.py script anyways. Aside from being simpler, moving to filebd means it is also possible to mount disk images directly. Also introduced rambd, which keeps the disk contents in RAM. This is very useful for testing where it increases the speed _significantly_. - test_dirs w/ filebd - 0m7.170s - test_dirs w/ rambd - 0m0.966s These follow the emubd model of using the lfs_config for geometry. I'm not convinced this is the best approach, but it gets the job done. I've also added lfs_ramdb_createcfg to add additional config similar to lfs_file_opencfg. This is useful for specifying erase_value, which tells the block device to simulate erases similar to flash devices. Note that the default (-1) meets the minimum block device requirements and is the most performant.
		
			
				
	
	
		
			635 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			635 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| # This script manages littlefs tests, which are configured with
 | |
| # .toml files stored in the tests directory.
 | |
| #
 | |
| 
 | |
| import toml
 | |
| import glob
 | |
| import re
 | |
| import os
 | |
| import io
 | |
| import itertools as it
 | |
| import collections.abc as abc
 | |
| import subprocess as sp
 | |
| import base64
 | |
| import sys
 | |
| import copy
 | |
| import shlex
 | |
| 
 | |
| TESTDIR = 'tests_'
 | |
| RULES = """
 | |
| define FLATTEN
 | |
| %$(subst /,.,$(target:.c=.t.c)): $(target)
 | |
|     cat <(echo '#line 1 "$$<"') $$< > $$@
 | |
| endef
 | |
| $(foreach target,$(SRC),$(eval $(FLATTEN)))
 | |
| 
 | |
| -include tests_/*.d
 | |
| 
 | |
| .SECONDARY:
 | |
| %.c: %.t.c
 | |
|     ./scripts/explode_asserts.py $< -o $@
 | |
| 
 | |
| %.test: override CFLAGS += -fdiagnostics-color=always
 | |
| %.test: override CFLAGS += -ggdb
 | |
| %.test: %.test.o $(foreach f,$(subst /,.,$(SRC:.c=.o)),%.test.$f)
 | |
|     $(CC) $(CFLAGS) $^ $(LFLAGS) -o $@
 | |
| """
 | |
| GLOBALS = """
 | |
| //////////////// AUTOGENERATED TEST ////////////////
 | |
| #include "lfs.h"
 | |
| #include "filebd/lfs_filebd.h"
 | |
| #include "rambd/lfs_rambd.h"
 | |
| #include <stdio.h>
 | |
| const char *LFS_DISK = NULL;
 | |
| """
 | |
| DEFINES = {
 | |
|     "LFS_READ_SIZE": 16,
 | |
|     "LFS_PROG_SIZE": "LFS_READ_SIZE",
 | |
|     "LFS_BLOCK_SIZE": 512,
 | |
|     "LFS_BLOCK_COUNT": 1024,
 | |
|     "LFS_BLOCK_CYCLES": 1024,
 | |
|     "LFS_CACHE_SIZE": "(64 % LFS_PROG_SIZE == 0 ? 64 : LFS_PROG_SIZE)",
 | |
|     "LFS_LOOKAHEAD_SIZE": 16,
 | |
|     "LFS_ERASE_VALUE": 0xff,
 | |
| }
 | |
| PROLOGUE = """
 | |
|     // prologue
 | |
|     __attribute__((unused)) lfs_t lfs;
 | |
|     __attribute__((unused)) lfs_filebd_t filebd;
 | |
|     __attribute__((unused)) lfs_rambd_t rambd;
 | |
|     __attribute__((unused)) lfs_file_t file;
 | |
|     __attribute__((unused)) lfs_dir_t dir;
 | |
|     __attribute__((unused)) struct lfs_info info;
 | |
|     __attribute__((unused)) uint8_t buffer[1024];
 | |
|     __attribute__((unused)) char path[1024];
 | |
|     
 | |
|     __attribute__((unused)) const struct lfs_config cfg = {
 | |
|         .context = LFS_DISK ? (void*)&filebd : (void*)&rambd,
 | |
|         .read  = LFS_DISK ? &lfs_filebd_read  : &lfs_rambd_read,
 | |
|         .prog  = LFS_DISK ? &lfs_filebd_prog  : &lfs_rambd_prog,
 | |
|         .erase = LFS_DISK ? &lfs_filebd_erase : &lfs_rambd_erase,
 | |
|         .sync  = LFS_DISK ? &lfs_filebd_sync  : &lfs_rambd_sync,
 | |
| 
 | |
|         .read_size      = LFS_READ_SIZE,
 | |
|         .prog_size      = LFS_PROG_SIZE,
 | |
|         .block_size     = LFS_BLOCK_SIZE,
 | |
|         .block_count    = LFS_BLOCK_COUNT,
 | |
|         .block_cycles   = LFS_BLOCK_CYCLES,
 | |
|         .cache_size     = LFS_CACHE_SIZE,
 | |
|         .lookahead_size = LFS_LOOKAHEAD_SIZE,
 | |
|     };
 | |
| 
 | |
|     __attribute__((unused)) const struct lfs_filebd_config filecfg = {
 | |
|         .erase_value = LFS_ERASE_VALUE,
 | |
|     };
 | |
|     __attribute__((unused)) const struct lfs_rambd_config ramcfg = {
 | |
|         .erase_value = LFS_ERASE_VALUE,
 | |
|     };
 | |
| 
 | |
|     if (LFS_DISK) {
 | |
|         lfs_filebd_createcfg(&cfg, LFS_DISK, &filecfg);
 | |
|     } else {
 | |
|         lfs_rambd_createcfg(&cfg, &ramcfg);
 | |
|     }
 | |
| """
 | |
| EPILOGUE = """
 | |
|     // epilogue
 | |
|     if (LFS_DISK) {
 | |
|         lfs_filebd_destroy(&cfg);
 | |
|     } else {
 | |
|         lfs_rambd_destroy(&cfg);
 | |
|     }
 | |
| """
 | |
| PASS = '\033[32m✓\033[0m'
 | |
| FAIL = '\033[31m✗\033[0m'
 | |
| 
 | |
| class TestFailure(Exception):
 | |
|     def __init__(self, case, returncode=None, stdout=None, assert_=None):
 | |
|         self.case = case
 | |
|         self.returncode = returncode
 | |
|         self.stdout = stdout
 | |
|         self.assert_ = assert_
 | |
| 
 | |
| class TestCase:
 | |
|     def __init__(self, config, suite=None, caseno=None, lineno=None, **_):
 | |
|         self.suite = suite
 | |
|         self.caseno = caseno
 | |
|         self.lineno = lineno
 | |
| 
 | |
|         self.code = config['code']
 | |
|         self.defines = config.get('define', {})
 | |
|         self.leaky = config.get('leaky', False)
 | |
| 
 | |
|     def __str__(self):
 | |
|         if hasattr(self, 'permno'):
 | |
|             return '%s[%d,%d]' % (self.suite.name, self.caseno, self.permno)
 | |
|         else:
 | |
|             return '%s[%d]' % (self.suite.name, self.caseno)
 | |
| 
 | |
|     def permute(self, defines, permno=None, **_):
 | |
|         ncase = copy.copy(self)
 | |
|         ncase.case = self
 | |
|         ncase.perms = [ncase]
 | |
|         ncase.permno = permno
 | |
|         ncase.defines = defines
 | |
|         return ncase
 | |
| 
 | |
|     def build(self, f, **_):
 | |
|         # prologue
 | |
|         f.write('void test_case%d(' % self.caseno)
 | |
|         first = True
 | |
|         for k, v in sorted(self.perms[0].defines.items()):
 | |
|             if k not in self.defines:
 | |
|                 if not first:
 | |
|                     f.write(',')
 | |
|                 else:
 | |
|                     first = False
 | |
|                 f.write('\n')
 | |
|                 f.write(8*' '+'__attribute__((unused)) intmax_t %s' % k)
 | |
|         f.write(') {\n')
 | |
| 
 | |
|         for k, v in sorted(self.defines.items()):
 | |
|             if k not in self.suite.defines:
 | |
|                 f.write(4*' '+'#define %s %s\n' % (k, v))
 | |
| 
 | |
|         f.write(PROLOGUE)
 | |
|         f.write('\n')
 | |
|         f.write(4*' '+'// test case %d\n' % self.caseno)
 | |
|         f.write(4*' '+'#line %d "%s"\n' % (self.lineno, self.suite.path))
 | |
| 
 | |
|         # test case goes here
 | |
|         f.write(self.code)
 | |
| 
 | |
|         # epilogue
 | |
|         f.write(EPILOGUE)
 | |
|         f.write('\n')
 | |
| 
 | |
|         for k, v in sorted(self.defines.items()):
 | |
|             if k not in self.suite.defines:
 | |
|                 f.write(4*' '+'#undef %s\n' % k)
 | |
| 
 | |
|         f.write('}\n')
 | |
| 
 | |
|     def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
 | |
|         # build command
 | |
|         cmd = exec + ['./%s.test' % self.suite.path,
 | |
|             repr(self.caseno), repr(self.permno)]
 | |
|         if persist:
 | |
|             cmd.append(self.suite.path + '.test.disk')
 | |
| 
 | |
|         # failed? drop into debugger?
 | |
|         if gdb and failure:
 | |
|             ncmd = ['gdb']
 | |
|             if gdb == 'assert':
 | |
|                 ncmd.extend(['-ex', 'r'])
 | |
|                 if failure.assert_:
 | |
|                     ncmd.extend(['-ex', 'up'])
 | |
|             elif gdb == 'start':
 | |
|                 ncmd.extend([
 | |
|                     '-ex', 'b %s:%d' % (self.suite.path, self.lineno),
 | |
|                     '-ex', 'r'])
 | |
|             ncmd.extend(['--args'] + cmd)
 | |
| 
 | |
|             if args.get('verbose', False):
 | |
|                 print(' '.join(shlex.quote(c) for c in ncmd))
 | |
|             sys.exit(sp.call(ncmd))
 | |
| 
 | |
|         # run test case!
 | |
|         stdout = []
 | |
|         assert_ = None
 | |
|         if args.get('verbose', False):
 | |
|             print(' '.join(shlex.quote(c) for c in cmd))
 | |
|         proc = sp.Popen(cmd,
 | |
|             universal_newlines=True,
 | |
|             bufsize=1,
 | |
|             stdout=sp.PIPE,
 | |
|             stderr=sp.STDOUT)
 | |
|         for line in iter(proc.stdout.readline, ''):
 | |
|             stdout.append(line)
 | |
|             if args.get('verbose', False):
 | |
|                 sys.stdout.write(line)
 | |
|             # intercept asserts
 | |
|             m = re.match('^([^:]+):([0-9]+):(assert): (.*)$', line)
 | |
|             if m and assert_ is None:
 | |
|                 try:
 | |
|                     with open(m.group(1)) as f:
 | |
|                         lineno = int(m.group(2))
 | |
|                         line = next(it.islice(f, lineno-1, None)).strip('\n')
 | |
|                     assert_ = {
 | |
|                         'path': m.group(1),
 | |
|                         'line': line,
 | |
|                         'lineno': lineno,
 | |
|                         'message': m.group(4)}
 | |
|                 except:
 | |
|                     pass
 | |
|         proc.wait()
 | |
| 
 | |
|         # did we pass?
 | |
|         if proc.returncode != 0:
 | |
|             raise TestFailure(self, proc.returncode, stdout, assert_)
 | |
|         else:
 | |
|             return PASS
 | |
| 
 | |
| class ValgrindTestCase(TestCase):
 | |
|     def __init__(self, config, **args):
 | |
|         self.leaky = config.get('leaky', False)
 | |
|         super().__init__(config, **args)
 | |
| 
 | |
|     def test(self, exec=[], **args):
 | |
|         if self.leaky:
 | |
|             return
 | |
| 
 | |
|         exec = exec + [
 | |
|             'valgrind',
 | |
|             '--leak-check=full',
 | |
|             '--error-exitcode=4',
 | |
|             '-q']
 | |
|         return super().test(exec=exec, **args)
 | |
| 
 | |
| class ReentrantTestCase(TestCase):
 | |
|     def __init__(self, config, **args):
 | |
|         self.reentrant = config.get('reentrant', False)
 | |
|         super().__init__(config, **args)
 | |
| 
 | |
|     def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
 | |
|         if not self.reentrant:
 | |
|             return
 | |
| 
 | |
|         # clear disk first?
 | |
|         if not persist:
 | |
|             try:
 | |
|                 os.remove(self.suite.path + '.test.disk')
 | |
|             except FileNotFoundError:
 | |
|                 pass
 | |
| 
 | |
|         for cycles in it.count(1):
 | |
|             # exact cycle we should drop into debugger?
 | |
|             if gdb and failure and failure.cycleno == cycles:
 | |
|                 return super().test(exec=exec, persist=True,
 | |
|                     gdb=gdb, failure=failure, **args)
 | |
| 
 | |
|             # run tests, but kill the program after prog/erase has
 | |
|             # been hit n cycles. We exit with a special return code if the
 | |
|             # program has not finished, since this isn't a test failure.
 | |
|             nexec = exec + [
 | |
|                 'gdb', '-batch-silent',
 | |
|                 '-ex', 'handle all nostop',
 | |
|                 '-ex', 'b lfs_filebd_prog',
 | |
|                 '-ex', 'b lfs_filebd_erase',
 | |
|                 '-ex', 'r',
 | |
|                 ] + cycles*['-ex', 'c'] + [
 | |
|                 '-ex', 'q '
 | |
|                     '!$_isvoid($_exitsignal) ? $_exitsignal : '
 | |
|                     '!$_isvoid($_exitcode) ? $_exitcode : '
 | |
|                     '33',
 | |
|                 '--args']
 | |
|             try:
 | |
|                 return super().test(exec=nexec, persist=True, **args)
 | |
|             except TestFailure as nfailure:
 | |
|                 if nfailure.returncode == 33:
 | |
|                     continue
 | |
|                 else:
 | |
|                     nfailure.cycleno = cycles
 | |
|                     raise
 | |
| 
 | |
| class TestSuite:
 | |
|     def __init__(self, path, TestCase=TestCase, **args):
 | |
|         self.name = os.path.basename(path)
 | |
|         if self.name.endswith('.toml'):
 | |
|             self.name = self.name[:-len('.toml')]
 | |
|         self.path = path
 | |
|         self.TestCase = TestCase
 | |
| 
 | |
|         with open(path) as f:
 | |
|             # load tests
 | |
|             config = toml.load(f)
 | |
| 
 | |
|             # find line numbers
 | |
|             f.seek(0)
 | |
|             linenos = []
 | |
|             for i, line in enumerate(f):
 | |
|                 if re.match(r'^\s*code\s*=\s*(\'\'\'|""")', line):
 | |
|                     linenos.append(i + 2)
 | |
| 
 | |
|         # grab global config
 | |
|         self.defines = config.get('define', {})
 | |
| 
 | |
|         # create initial test cases
 | |
|         self.cases = []
 | |
|         for i, (case, lineno) in enumerate(zip(config['case'], linenos)):
 | |
|             self.cases.append(self.TestCase(case,
 | |
|                 suite=self, caseno=i, lineno=lineno, **args))
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.name
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         return self.name < other.name
 | |
| 
 | |
|     def permute(self, defines={}, **args):
 | |
|         for case in self.cases:
 | |
|             # lets find all parameterized definitions, in one of [args.D,
 | |
|             # suite.defines, case.defines, DEFINES]. Note that each of these
 | |
|             # can be either a dict of defines, or a list of dicts, expressing
 | |
|             # an initial set of permutations.
 | |
|             pending = [{}]
 | |
|             for inits in [defines, self.defines, case.defines, DEFINES]:
 | |
|                 if not isinstance(inits, list):
 | |
|                     inits = [inits]
 | |
| 
 | |
|                 npending = []
 | |
|                 for init, pinit in it.product(inits, pending):
 | |
|                     ninit = pinit.copy()
 | |
|                     for k, v in init.items():
 | |
|                         if k not in ninit:
 | |
|                             try:
 | |
|                                 ninit[k] = eval(v)
 | |
|                             except:
 | |
|                                 ninit[k] = v
 | |
|                     npending.append(ninit)
 | |
| 
 | |
|                 pending = npending
 | |
| 
 | |
|             # expand permutations
 | |
|             pending = list(reversed(pending))
 | |
|             expanded = []
 | |
|             while pending:
 | |
|                 perm = pending.pop()
 | |
|                 for k, v in sorted(perm.items()):
 | |
|                     if not isinstance(v, str) and isinstance(v, abc.Iterable):
 | |
|                         for nv in reversed(v):
 | |
|                             nperm = perm.copy()
 | |
|                             nperm[k] = nv
 | |
|                             pending.append(nperm)
 | |
|                         break
 | |
|                 else:
 | |
|                     expanded.append(perm)
 | |
| 
 | |
|             # generate permutations
 | |
|             case.perms = []
 | |
|             for i, perm in enumerate(expanded):
 | |
|                 case.perms.append(case.permute(perm, permno=i, **args))
 | |
| 
 | |
|             # also track non-unique defines
 | |
|             case.defines = {}
 | |
|             for k, v in case.perms[0].defines.items():
 | |
|                 if all(perm.defines[k] == v for perm in case.perms):
 | |
|                     case.defines[k] = v
 | |
| 
 | |
|         # track all perms and non-unique defines
 | |
|         self.perms = []
 | |
|         for case in self.cases:
 | |
|             self.perms.extend(case.perms)
 | |
| 
 | |
|         self.defines = {}
 | |
|         for k, v in self.perms[0].defines.items():
 | |
|             if all(perm.defines[k] == v for perm in self.perms):
 | |
|                 self.defines[k] = v
 | |
| 
 | |
|         return self.perms
 | |
| 
 | |
|     def build(self, **args):
 | |
|         # build test.c
 | |
|         f = io.StringIO()
 | |
|         f.write(GLOBALS)
 | |
| 
 | |
|         for case in self.cases:
 | |
|             f.write('\n')
 | |
|             case.build(f, **args)
 | |
| 
 | |
|         f.write('\n')
 | |
|         f.write('int main(int argc, char **argv) {\n')
 | |
|         f.write(4*' '+'int case_ = (argc >= 3) ? atoi(argv[1]) : 0;\n')
 | |
|         f.write(4*' '+'int perm = (argc >= 3) ? atoi(argv[2]) : 0;\n')
 | |
|         f.write(4*' '+'LFS_DISK = (argc >= 4) ? argv[3] : NULL;\n')
 | |
|         for perm in self.perms:
 | |
|             f.write(4*' '+'if (argc < 3 || '
 | |
|                 '(case_ == %d && perm == %d)) { ' % (
 | |
|                     perm.caseno, perm.permno))
 | |
|             f.write('test_case%d(' % perm.caseno)
 | |
|             first = True
 | |
|             for k, v in sorted(perm.defines.items()):
 | |
|                 if k not in perm.case.defines:
 | |
|                     if not first:
 | |
|                         f.write(', ')
 | |
|                     else:
 | |
|                         first = False
 | |
|                     f.write(str(v))
 | |
|             f.write('); }\n')
 | |
|         f.write('}\n')
 | |
| 
 | |
|         # add test-related rules
 | |
|         rules = RULES.replace(4*' ', '\t')
 | |
| 
 | |
|         with open(self.path + '.test.mk', 'w') as mk:
 | |
|             mk.write(rules)
 | |
|             mk.write('\n')
 | |
| 
 | |
|             # add truely global defines globally
 | |
|             for k, v in sorted(self.defines.items()):
 | |
|                 mk.write('%s: override CFLAGS += -D%s=%r\n' % (
 | |
|                     self.path+'.test', k, v))
 | |
| 
 | |
|             # write test.c in base64 so make can decide when to rebuild
 | |
|             mk.write('%s: %s\n' % (self.path+'.test.t.c', self.path))
 | |
|             mk.write('\t@base64 -d <<< ')
 | |
|             mk.write(base64.b64encode(
 | |
|                 f.getvalue().encode('utf8')).decode('utf8'))
 | |
|             mk.write(' > $@\n')
 | |
| 
 | |
|         self.makefile = self.path + '.test.mk'
 | |
|         self.target = self.path + '.test'
 | |
|         return self.makefile, self.target
 | |
| 
 | |
|     def test(self, caseno=None, permno=None, **args):
 | |
|         # run test suite!
 | |
|         if not args.get('verbose', True):
 | |
|             sys.stdout.write(self.name + ' ')
 | |
|             sys.stdout.flush()
 | |
|         for perm in self.perms:
 | |
|             if caseno is not None and perm.caseno != caseno:
 | |
|                 continue
 | |
|             if permno is not None and perm.permno != permno:
 | |
|                 continue
 | |
| 
 | |
|             try:
 | |
|                 result = perm.test(**args)
 | |
|             except TestFailure as failure:
 | |
|                 perm.result = failure
 | |
|                 if not args.get('verbose', True):
 | |
|                     sys.stdout.write(FAIL)
 | |
|                     sys.stdout.flush()
 | |
|                 if not args.get('keep_going', False):
 | |
|                     if not args.get('verbose', True):
 | |
|                         sys.stdout.write('\n')
 | |
|                     raise
 | |
|             else:
 | |
|                 if result == PASS:
 | |
|                     perm.result = PASS
 | |
|                     if not args.get('verbose', True):
 | |
|                         sys.stdout.write(PASS)
 | |
|                         sys.stdout.flush()
 | |
| 
 | |
|         if not args.get('verbose', True):
 | |
|             sys.stdout.write('\n')
 | |
| 
 | |
| def main(**args):
 | |
|     testpath = args['testpath']
 | |
| 
 | |
|     # optional brackets for specific test
 | |
|     m = re.search(r'\[(\d+)(?:,(\d+))?\]$', testpath)
 | |
|     if m:
 | |
|         caseno = int(m.group(1))
 | |
|         permno = int(m.group(2)) if m.group(2) is not None else None
 | |
|         testpath = testpath[:m.start()]
 | |
|     else:
 | |
|         caseno = None
 | |
|         permno = None
 | |
| 
 | |
|     # figure out the suite's toml file
 | |
|     if os.path.isdir(testpath):
 | |
|         testpath = testpath + '/test_*.toml'
 | |
|     elif os.path.isfile(testpath):
 | |
|         testpath = testpath
 | |
|     elif testpath.endswith('.toml'):
 | |
|         testpath = TESTDIR + '/' + testpath
 | |
|     else:
 | |
|         testpath = TESTDIR + '/' + testpath + '.toml'
 | |
| 
 | |
|     # find tests
 | |
|     suites = []
 | |
|     for path in glob.glob(testpath):
 | |
|         if args.get('valgrind', False):
 | |
|             suites.append(TestSuite(path, TestCase=ValgrindTestCase, **args))
 | |
|         elif args.get('reentrant', False):
 | |
|             suites.append(TestSuite(path, TestCase=ReentrantTestCase, **args))
 | |
|         else:
 | |
|             suites.append(TestSuite(path, **args))
 | |
| 
 | |
|     # sort for reproducability
 | |
|     suites = sorted(suites)
 | |
| 
 | |
|     # generate permutations
 | |
|     defines = {}
 | |
|     for define in args['D']:
 | |
|         k, v, *_ = define.split('=', 2) + ['']
 | |
|         defines[k] = v
 | |
| 
 | |
|     for suite in suites:
 | |
|         suite.permute(defines, **args)
 | |
| 
 | |
|     # build tests in parallel
 | |
|     print('====== building ======')
 | |
|     makefiles = []
 | |
|     targets = []
 | |
|     for suite in suites:
 | |
|         makefile, target = suite.build(**args)
 | |
|         makefiles.append(makefile)
 | |
|         targets.append(target)
 | |
| 
 | |
|     cmd = (['make', '-f', 'Makefile'] +
 | |
|         list(it.chain.from_iterable(['-f', m] for m in makefiles)) +
 | |
|         [target for target in targets])
 | |
|     stdout = []
 | |
|     if args.get('verbose', False):
 | |
|         print(' '.join(shlex.quote(c) for c in cmd))
 | |
|     proc = sp.Popen(cmd,
 | |
|         universal_newlines=True,
 | |
|         bufsize=1,
 | |
|         stdout=sp.PIPE,
 | |
|         stderr=sp.STDOUT)
 | |
|     for line in iter(proc.stdout.readline, ''):
 | |
|         stdout.append(line)
 | |
|         if args.get('verbose', False):
 | |
|             sys.stdout.write(line)
 | |
|     proc.wait()
 | |
| 
 | |
|     if proc.returncode != 0:
 | |
|         if not args.get('verbose', False):
 | |
|             for line in stdout:
 | |
|                 sys.stdout.write(line)
 | |
|         sys.exit(-3)
 | |
| 
 | |
|     print('built %d test suites, %d test cases, %d permutations' % (
 | |
|         len(suites),
 | |
|         sum(len(suite.cases) for suite in suites),
 | |
|         sum(len(suite.perms) for suite in suites)))
 | |
| 
 | |
|     print('====== testing ======')
 | |
|     try:
 | |
|         for suite in suites:
 | |
|             suite.test(caseno, permno, **args)
 | |
|     except TestFailure:
 | |
|         pass
 | |
| 
 | |
|     if args.get('gdb', False):
 | |
|         failure = None
 | |
|         for suite in suites:
 | |
|             for perm in suite.perms:
 | |
|                 if getattr(perm, 'result', PASS) != PASS:
 | |
|                     failure = perm.result
 | |
|         if failure is not None:
 | |
|             print('======= gdb ======')
 | |
|             # drop into gdb
 | |
|             failure.case.test(failure=failure, **args)
 | |
|             sys.exit(0)
 | |
| 
 | |
|     print('====== results ======')
 | |
|     passed = 0
 | |
|     failed = 0
 | |
|     for suite in suites:
 | |
|         for perm in suite.perms:
 | |
|             if not hasattr(perm, 'result'):
 | |
|                 continue
 | |
| 
 | |
|             if perm.result == PASS:
 | |
|                 passed += 1
 | |
|             else:
 | |
|                 sys.stdout.write("--- %s ---\n" % perm)
 | |
|                 if perm.result.assert_:
 | |
|                     for line in perm.result.stdout[:-1]:
 | |
|                         sys.stdout.write(line)
 | |
|                     sys.stdout.write(
 | |
|                         "\033[97m{path}:{lineno}:\033[91massert:\033[0m "
 | |
|                         "{message}\n{line}\n".format(
 | |
|                             **perm.result.assert_))
 | |
|                 else:
 | |
|                     for line in perm.result.stdout:
 | |
|                         sys.stdout.write(line)
 | |
|                 sys.stdout.write('\n')
 | |
|                 failed += 1
 | |
| 
 | |
|     print('tests passed: %d' % passed)
 | |
|     print('tests failed: %d' % failed)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     import argparse
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description="Run parameterized tests in various configurations.")
 | |
|     parser.add_argument('testpath', nargs='?', default=TESTDIR,
 | |
|         help="Description of test(s) to run. By default, this is all tests \
 | |
|             found in the \"{0}\" directory. Here, you can specify a different \
 | |
|             directory of tests, a specific file, a suite by name, and even a \
 | |
|             specific test case by adding brackets. For example \
 | |
|             \"test_dirs[0]\" or \"{0}/test_dirs.toml[0]\".".format(TESTDIR))
 | |
|     parser.add_argument('-D', action='append', default=[],
 | |
|         help="Overriding parameter definitions.")
 | |
|     parser.add_argument('-v', '--verbose', action='store_true',
 | |
|         help="Output everything that is happening.")
 | |
|     parser.add_argument('-k', '--keep-going', action='store_true',
 | |
|         help="Run all tests instead of stopping on first error. Useful for CI.")
 | |
|     parser.add_argument('-p', '--persist', action='store_true',
 | |
|         help="Don't reset the tests disk before each test.")
 | |
|     parser.add_argument('-g', '--gdb', choices=['init', 'start', 'assert'],
 | |
|         nargs='?', const='assert',
 | |
|         help="Drop into gdb on test failure.")
 | |
|     parser.add_argument('--valgrind', action='store_true',
 | |
|         help="Run non-leaky tests under valgrind to check for memory leaks.")
 | |
|     parser.add_argument('--reentrant', action='store_true',
 | |
|         help="Run reentrant tests with simulated power-loss.")
 | |
|     parser.add_argument('-e', '--exec', default=[], type=lambda e: e.split(' '),
 | |
|         help="Run tests with another executable prefixed on the command line.")
 | |
|     main(**vars(parser.parse_args()))
 |