mirror of
				https://github.com/eledio-devices/thirdparty-littlefs.git
				synced 2025-10-31 16:14:16 +01:00 
			
		
		
		
	Since this is already going to be a breaking API change, this renames structs/variables named _config -> _cfg. This is in order to be consistent with functions such as lfs_file_opencfg.
		
			
				
	
	
		
			779 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			779 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| # This script manages littlefs tests, which are configured with
 | |
| # .toml files stored in the tests directory.
 | |
| #
 | |
| 
 | |
| import toml
 | |
| import glob
 | |
| import re
 | |
| import os
 | |
| import io
 | |
| import itertools as it
 | |
| import collections.abc as abc
 | |
| import subprocess as sp
 | |
| import base64
 | |
| import sys
 | |
| import copy
 | |
| import shlex
 | |
| import pty
 | |
| import errno
 | |
| import signal
 | |
| 
 | |
| TESTDIR = 'tests'
 | |
| RULES = """
 | |
| define FLATTEN
 | |
| tests/%$(subst /,.,$(target)): $(target)
 | |
|     ./scripts/explode_asserts.py $$< -o $$@
 | |
| endef
 | |
| $(foreach target,$(SRC),$(eval $(FLATTEN)))
 | |
| 
 | |
| -include tests/*.d
 | |
| 
 | |
| .SECONDARY:
 | |
| %.test: %.test.o $(foreach f,$(subst /,.,$(SRC:.c=.o)),%.$f)
 | |
|     $(CC) $(CFLAGS) $^ $(LFLAGS) -o $@
 | |
| """
 | |
| GLOBALS = """
 | |
| //////////////// AUTOGENERATED TEST ////////////////
 | |
| #include "lfs.h"
 | |
| #include "bd/lfs_testbd.h"
 | |
| #include <stdio.h>
 | |
| extern const char *lfs_testbd_path;
 | |
| extern uint32_t lfs_testbd_cycles;
 | |
| """
 | |
| DEFINES = {
 | |
|     'LFS_READ_SIZE': 16,
 | |
|     'LFS_PROG_SIZE': 'LFS_READ_SIZE',
 | |
|     'LFS_BLOCK_SIZE': 512,
 | |
|     'LFS_BLOCK_COUNT': 1024,
 | |
|     'LFS_BLOCK_CYCLES': -1,
 | |
|     'LFS_CACHE_SIZE': '(64 % LFS_PROG_SIZE == 0 ? 64 : LFS_PROG_SIZE)',
 | |
|     'LFS_LOOKAHEAD_SIZE': 16,
 | |
|     'LFS_ERASE_VALUE': 0xff,
 | |
|     'LFS_ERASE_CYCLES': 0,
 | |
|     'LFS_BADBLOCK_BEHAVIOR': 'LFS_TESTBD_BADBLOCK_PROGERROR',
 | |
| }
 | |
| PROLOGUE = """
 | |
|     // prologue
 | |
|     __attribute__((unused)) lfs_t lfs;
 | |
|     __attribute__((unused)) lfs_testbd_t bd;
 | |
|     __attribute__((unused)) lfs_file_t file;
 | |
|     __attribute__((unused)) lfs_dir_t dir;
 | |
|     __attribute__((unused)) struct lfs_info info;
 | |
|     __attribute__((unused)) char path[1024];
 | |
|     __attribute__((unused)) uint8_t buffer[1024];
 | |
|     __attribute__((unused)) lfs_size_t size;
 | |
|     __attribute__((unused)) int err;
 | |
|     
 | |
|     __attribute__((unused)) const struct lfs_cfg cfg = {
 | |
|         .context        = &bd,
 | |
|         .read           = lfs_testbd_read,
 | |
|         .prog           = lfs_testbd_prog,
 | |
|         .erase          = lfs_testbd_erase,
 | |
|         .sync           = lfs_testbd_sync,
 | |
|         .read_size      = LFS_READ_SIZE,
 | |
|         .prog_size      = LFS_PROG_SIZE,
 | |
|         .block_size     = LFS_BLOCK_SIZE,
 | |
|         .block_count    = LFS_BLOCK_COUNT,
 | |
|         .block_cycles   = LFS_BLOCK_CYCLES,
 | |
|         .cache_size     = LFS_CACHE_SIZE,
 | |
|         .lookahead_size = LFS_LOOKAHEAD_SIZE,
 | |
|     };
 | |
| 
 | |
|     __attribute__((unused)) const struct lfs_testbd_cfg bdcfg = {
 | |
|         .erase_value        = LFS_ERASE_VALUE,
 | |
|         .erase_cycles       = LFS_ERASE_CYCLES,
 | |
|         .badblock_behavior  = LFS_BADBLOCK_BEHAVIOR,
 | |
|         .power_cycles       = lfs_testbd_cycles,
 | |
|     };
 | |
| 
 | |
|     lfs_testbd_createcfg(&cfg, lfs_testbd_path, &bdcfg) => 0;
 | |
| """
 | |
| EPILOGUE = """
 | |
|     // epilogue
 | |
|     lfs_testbd_destroy(&cfg) => 0;
 | |
| """
 | |
| PASS = '\033[32m✓\033[0m'
 | |
| FAIL = '\033[31m✗\033[0m'
 | |
| 
 | |
| class TestFailure(Exception):
 | |
|     def __init__(self, case, returncode=None, stdout=None, assert_=None):
 | |
|         self.case = case
 | |
|         self.returncode = returncode
 | |
|         self.stdout = stdout
 | |
|         self.assert_ = assert_
 | |
| 
 | |
| class TestCase:
 | |
|     def __init__(self, config, filter=filter,
 | |
|             suite=None, caseno=None, lineno=None, **_):
 | |
|         self.config = config
 | |
|         self.filter = filter
 | |
|         self.suite = suite
 | |
|         self.caseno = caseno
 | |
|         self.lineno = lineno
 | |
| 
 | |
|         self.code = config['code']
 | |
|         self.code_lineno = config['code_lineno']
 | |
|         self.defines = config.get('define', {})
 | |
|         self.if_ = config.get('if', None)
 | |
|         self.in_ = config.get('in', None)
 | |
| 
 | |
|     def __str__(self):
 | |
|         if hasattr(self, 'permno'):
 | |
|             if any(k not in self.case.defines for k in self.defines):
 | |
|                 return '%s#%d#%d (%s)' % (
 | |
|                     self.suite.name, self.caseno, self.permno, ', '.join(
 | |
|                         '%s=%s' % (k, v) for k, v in self.defines.items()
 | |
|                         if k not in self.case.defines))
 | |
|             else:
 | |
|                 return '%s#%d#%d' % (
 | |
|                     self.suite.name, self.caseno, self.permno)
 | |
|         else:
 | |
|             return '%s#%d' % (
 | |
|                 self.suite.name, self.caseno)
 | |
| 
 | |
|     def permute(self, class_=None, defines={}, permno=None, **_):
 | |
|         ncase = (class_ or type(self))(self.config)
 | |
|         for k, v in self.__dict__.items():
 | |
|             setattr(ncase, k, v)
 | |
|         ncase.case = self
 | |
|         ncase.perms = [ncase]
 | |
|         ncase.permno = permno
 | |
|         ncase.defines = defines
 | |
|         return ncase
 | |
| 
 | |
|     def build(self, f, **_):
 | |
|         # prologue
 | |
|         for k, v in sorted(self.defines.items()):
 | |
|             if k not in self.suite.defines:
 | |
|                 f.write('#define %s %s\n' % (k, v))
 | |
| 
 | |
|         f.write('void test_case%d(%s) {' % (self.caseno, ','.join(
 | |
|             '\n'+8*' '+'__attribute__((unused)) intmax_t %s' % k
 | |
|             for k in sorted(self.perms[0].defines)
 | |
|             if k not in self.defines)))
 | |
| 
 | |
|         f.write(PROLOGUE)
 | |
|         f.write('\n')
 | |
|         f.write(4*' '+'// test case %d\n' % self.caseno)
 | |
|         f.write(4*' '+'#line %d "%s"\n' % (self.code_lineno, self.suite.path))
 | |
| 
 | |
|         # test case goes here
 | |
|         f.write(self.code)
 | |
| 
 | |
|         # epilogue
 | |
|         f.write(EPILOGUE)
 | |
|         f.write('}\n')
 | |
| 
 | |
|         for k, v in sorted(self.defines.items()):
 | |
|             if k not in self.suite.defines:
 | |
|                 f.write('#undef %s\n' % k)
 | |
| 
 | |
|     def shouldtest(self, **args):
 | |
|         if (self.filter is not None and
 | |
|                 len(self.filter) >= 1 and
 | |
|                 self.filter[0] != self.caseno):
 | |
|             return False
 | |
|         elif (self.filter is not None and
 | |
|                 len(self.filter) >= 2 and
 | |
|                 self.filter[1] != self.permno):
 | |
|             return False
 | |
|         elif args.get('no_internal', False) and self.in_ is not None:
 | |
|             return False
 | |
|         elif self.if_ is not None:
 | |
|             if_ = self.if_
 | |
|             while True:
 | |
|                 for k, v in sorted(self.defines.items(),
 | |
|                         key=lambda x: len(x[0]), reverse=True):
 | |
|                     if k in if_:
 | |
|                         if_ = if_.replace(k, '(%s)' % v)
 | |
|                         break
 | |
|                 else:
 | |
|                     break
 | |
|             if_ = (
 | |
|                 re.sub('(\&\&|\?)', ' and ',
 | |
|                 re.sub('(\|\||:)', ' or ',
 | |
|                 re.sub('!(?!=)', ' not ', if_))))
 | |
|             return eval(if_)
 | |
|         else:
 | |
|             return True
 | |
| 
 | |
|     def test(self, exec=[], persist=False, cycles=None,
 | |
|             gdb=False, failure=None, disk=None, **args):
 | |
|         # build command
 | |
|         cmd = exec + ['./%s.test' % self.suite.path,
 | |
|             repr(self.caseno), repr(self.permno)]
 | |
| 
 | |
|         # persist disk or keep in RAM for speed?
 | |
|         if persist:
 | |
|             if not disk:
 | |
|                 disk = self.suite.path + '.disk'
 | |
|             if persist != 'noerase':
 | |
|                 try:
 | |
|                     with open(disk, 'w') as f:
 | |
|                         f.truncate(0)
 | |
|                     if args.get('verbose', False):
 | |
|                         print('truncate --size=0', disk)
 | |
|                 except FileNotFoundError:
 | |
|                     pass
 | |
| 
 | |
|             cmd.append(disk)
 | |
| 
 | |
|         # simulate power-loss after n cycles?
 | |
|         if cycles:
 | |
|             cmd.append(str(cycles))
 | |
| 
 | |
|         # failed? drop into debugger?
 | |
|         if gdb and failure:
 | |
|             ncmd = ['gdb']
 | |
|             if gdb == 'assert':
 | |
|                 ncmd.extend(['-ex', 'r'])
 | |
|                 if failure.assert_:
 | |
|                     ncmd.extend(['-ex', 'up 2'])
 | |
|             elif gdb == 'main':
 | |
|                 ncmd.extend([
 | |
|                     '-ex', 'b %s:%d' % (self.suite.path, self.code_lineno),
 | |
|                     '-ex', 'r'])
 | |
|             ncmd.extend(['--args'] + cmd)
 | |
| 
 | |
|             if args.get('verbose', False):
 | |
|                 print(' '.join(shlex.quote(c) for c in ncmd))
 | |
|             signal.signal(signal.SIGINT, signal.SIG_IGN)
 | |
|             sys.exit(sp.call(ncmd))
 | |
| 
 | |
|         # run test case!
 | |
|         mpty, spty = pty.openpty()
 | |
|         if args.get('verbose', False):
 | |
|             print(' '.join(shlex.quote(c) for c in cmd))
 | |
|         proc = sp.Popen(cmd, stdout=spty, stderr=spty)
 | |
|         os.close(spty)
 | |
|         mpty = os.fdopen(mpty, 'r', 1)
 | |
|         stdout = []
 | |
|         assert_ = None
 | |
|         try:
 | |
|             while True:
 | |
|                 try:
 | |
|                     line = mpty.readline()
 | |
|                 except OSError as e:
 | |
|                     if e.errno == errno.EIO:
 | |
|                         break
 | |
|                     raise
 | |
|                 stdout.append(line)
 | |
|                 if args.get('verbose', False):
 | |
|                     sys.stdout.write(line)
 | |
|                 # intercept asserts
 | |
|                 m = re.match(
 | |
|                     '^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
 | |
|                     .format('(?:\033\[[\d;]*.| )*', 'assert'),
 | |
|                     line)
 | |
|                 if m and assert_ is None:
 | |
|                     try:
 | |
|                         with open(m.group(1)) as f:
 | |
|                             lineno = int(m.group(2))
 | |
|                             line = (next(it.islice(f, lineno-1, None))
 | |
|                                 .strip('\n'))
 | |
|                         assert_ = {
 | |
|                             'path': m.group(1),
 | |
|                             'line': line,
 | |
|                             'lineno': lineno,
 | |
|                             'message': m.group(3)}
 | |
|                     except:
 | |
|                         pass
 | |
|         except KeyboardInterrupt:
 | |
|             raise TestFailure(self, 1, stdout, None)
 | |
|         proc.wait()
 | |
| 
 | |
|         # did we pass?
 | |
|         if proc.returncode != 0:
 | |
|             raise TestFailure(self, proc.returncode, stdout, assert_)
 | |
|         else:
 | |
|             return PASS
 | |
| 
 | |
| class ValgrindTestCase(TestCase):
 | |
|     def __init__(self, config, **args):
 | |
|         self.leaky = config.get('leaky', False)
 | |
|         super().__init__(config, **args)
 | |
| 
 | |
|     def shouldtest(self, **args):
 | |
|         return not self.leaky and super().shouldtest(**args)
 | |
| 
 | |
|     def test(self, exec=[], **args):
 | |
|         verbose = args.get('verbose', False)
 | |
|         uninit = (self.defines.get('LFS_ERASE_VALUE', None) == -1)
 | |
|         exec = [
 | |
|             'valgrind',
 | |
|             '--leak-check=full',
 | |
|             ] + (['--undef-value-errors=no'] if uninit else []) + [
 | |
|             ] + (['--track-origins=yes'] if not uninit else []) + [
 | |
|             '--error-exitcode=4',
 | |
|             '--error-limit=no',
 | |
|             ] + (['--num-callers=1'] if not verbose else []) + [
 | |
|             '-q'] + exec
 | |
|         return super().test(exec=exec, **args)
 | |
| 
 | |
| class ReentrantTestCase(TestCase):
 | |
|     def __init__(self, config, **args):
 | |
|         self.reentrant = config.get('reentrant', False)
 | |
|         super().__init__(config, **args)
 | |
| 
 | |
|     def shouldtest(self, **args):
 | |
|         return self.reentrant and super().shouldtest(**args)
 | |
| 
 | |
|     def test(self, persist=False, gdb=False, failure=None, **args):
 | |
|         for cycles in it.count(1):
 | |
|             # clear disk first?
 | |
|             if cycles == 1 and persist != 'noerase':
 | |
|                 persist = 'erase'
 | |
|             else:
 | |
|                 persist = 'noerase'
 | |
| 
 | |
|             # exact cycle we should drop into debugger?
 | |
|             if gdb and failure and failure.cycleno == cycles:
 | |
|                 return super().test(gdb=gdb, persist=persist, cycles=cycles,
 | |
|                     failure=failure, **args)
 | |
| 
 | |
|             # run tests, but kill the program after prog/erase has
 | |
|             # been hit n cycles. We exit with a special return code if the
 | |
|             # program has not finished, since this isn't a test failure.
 | |
|             try:
 | |
|                 return super().test(persist=persist, cycles=cycles, **args)
 | |
|             except TestFailure as nfailure:
 | |
|                 if nfailure.returncode == 33:
 | |
|                     continue
 | |
|                 else:
 | |
|                     nfailure.cycleno = cycles
 | |
|                     raise
 | |
| 
 | |
| class TestSuite:
 | |
|     def __init__(self, path, classes=[TestCase], defines={},
 | |
|             filter=None, **args):
 | |
|         self.name = os.path.basename(path)
 | |
|         if self.name.endswith('.toml'):
 | |
|             self.name = self.name[:-len('.toml')]
 | |
|         self.path = path
 | |
|         self.classes = classes
 | |
|         self.defines = defines.copy()
 | |
|         self.filter = filter
 | |
| 
 | |
|         with open(path) as f:
 | |
|             # load tests
 | |
|             config = toml.load(f)
 | |
| 
 | |
|             # find line numbers
 | |
|             f.seek(0)
 | |
|             linenos = []
 | |
|             code_linenos = []
 | |
|             for i, line in enumerate(f):
 | |
|                 if re.match(r'\[\[\s*case\s*\]\]', line):
 | |
|                     linenos.append(i+1)
 | |
|                 if re.match(r'code\s*=\s*(\'\'\'|""")', line):
 | |
|                     code_linenos.append(i+2)
 | |
| 
 | |
|             code_linenos.reverse()
 | |
| 
 | |
|         # grab global config
 | |
|         for k, v in config.get('define', {}).items():
 | |
|             if k not in self.defines:
 | |
|                 self.defines[k] = v
 | |
|         self.code = config.get('code', None)
 | |
|         if self.code is not None:
 | |
|             self.code_lineno = code_linenos.pop()
 | |
| 
 | |
|         # create initial test cases
 | |
|         self.cases = []
 | |
|         for i, (case, lineno) in enumerate(zip(config['case'], linenos)):
 | |
|             # code lineno?
 | |
|             if 'code' in case:
 | |
|                 case['code_lineno'] = code_linenos.pop()
 | |
|             # merge conditions if necessary
 | |
|             if 'if' in config and 'if' in case:
 | |
|                 case['if'] = '(%s) && (%s)' % (config['if'], case['if'])
 | |
|             elif 'if' in config:
 | |
|                 case['if'] = config['if']
 | |
|             # initialize test case
 | |
|             self.cases.append(TestCase(case, filter=filter,
 | |
|                 suite=self, caseno=i+1, lineno=lineno, **args))
 | |
| 
 | |
|     def __str__(self):
 | |
|         return self.name
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         return self.name < other.name
 | |
| 
 | |
|     def permute(self, **args):
 | |
|         for case in self.cases:
 | |
|             # lets find all parameterized definitions, in one of [args.D,
 | |
|             # suite.defines, case.defines, DEFINES]. Note that each of these
 | |
|             # can be either a dict of defines, or a list of dicts, expressing
 | |
|             # an initial set of permutations.
 | |
|             pending = [{}]
 | |
|             for inits in [self.defines, case.defines, DEFINES]:
 | |
|                 if not isinstance(inits, list):
 | |
|                     inits = [inits]
 | |
| 
 | |
|                 npending = []
 | |
|                 for init, pinit in it.product(inits, pending):
 | |
|                     ninit = pinit.copy()
 | |
|                     for k, v in init.items():
 | |
|                         if k not in ninit:
 | |
|                             try:
 | |
|                                 ninit[k] = eval(v)
 | |
|                             except:
 | |
|                                 ninit[k] = v
 | |
|                     npending.append(ninit)
 | |
| 
 | |
|                 pending = npending
 | |
| 
 | |
|             # expand permutations
 | |
|             pending = list(reversed(pending))
 | |
|             expanded = []
 | |
|             while pending:
 | |
|                 perm = pending.pop()
 | |
|                 for k, v in sorted(perm.items()):
 | |
|                     if not isinstance(v, str) and isinstance(v, abc.Iterable):
 | |
|                         for nv in reversed(v):
 | |
|                             nperm = perm.copy()
 | |
|                             nperm[k] = nv
 | |
|                             pending.append(nperm)
 | |
|                         break
 | |
|                 else:
 | |
|                     expanded.append(perm)
 | |
| 
 | |
|             # generate permutations
 | |
|             case.perms = []
 | |
|             for i, (class_, defines) in enumerate(
 | |
|                     it.product(self.classes, expanded)):
 | |
|                 case.perms.append(case.permute(
 | |
|                     class_, defines, permno=i+1, **args))
 | |
| 
 | |
|             # also track non-unique defines
 | |
|             case.defines = {}
 | |
|             for k, v in case.perms[0].defines.items():
 | |
|                 if all(perm.defines[k] == v for perm in case.perms):
 | |
|                     case.defines[k] = v
 | |
| 
 | |
|         # track all perms and non-unique defines
 | |
|         self.perms = []
 | |
|         for case in self.cases:
 | |
|             self.perms.extend(case.perms)
 | |
| 
 | |
|         self.defines = {}
 | |
|         for k, v in self.perms[0].defines.items():
 | |
|             if all(perm.defines.get(k, None) == v for perm in self.perms):
 | |
|                 self.defines[k] = v
 | |
| 
 | |
|         return self.perms
 | |
| 
 | |
|     def build(self, **args):
 | |
|         # build test files
 | |
|         tf = open(self.path + '.test.c.t', 'w')
 | |
|         tf.write(GLOBALS)
 | |
|         if self.code is not None:
 | |
|             tf.write('#line %d "%s"\n' % (self.code_lineno, self.path))
 | |
|             tf.write(self.code)
 | |
| 
 | |
|         tfs = {None: tf}
 | |
|         for case in self.cases:
 | |
|             if case.in_ not in tfs:
 | |
|                 tfs[case.in_] = open(self.path+'.'+
 | |
|                     case.in_.replace('/', '.')+'.t', 'w')
 | |
|                 tfs[case.in_].write('#line 1 "%s"\n' % case.in_)
 | |
|                 with open(case.in_) as f:
 | |
|                     for line in f:
 | |
|                         tfs[case.in_].write(line)
 | |
|                 tfs[case.in_].write('\n')
 | |
|                 tfs[case.in_].write(GLOBALS)
 | |
| 
 | |
|             tfs[case.in_].write('\n')
 | |
|             case.build(tfs[case.in_], **args)
 | |
| 
 | |
|         tf.write('\n')
 | |
|         tf.write('const char *lfs_testbd_path;\n')
 | |
|         tf.write('uint32_t lfs_testbd_cycles;\n')
 | |
|         tf.write('int main(int argc, char **argv) {\n')
 | |
|         tf.write(4*' '+'int case_         = (argc > 1) ? atoi(argv[1]) : 0;\n')
 | |
|         tf.write(4*' '+'int perm          = (argc > 2) ? atoi(argv[2]) : 0;\n')
 | |
|         tf.write(4*' '+'lfs_testbd_path   = (argc > 3) ? argv[3] : NULL;\n')
 | |
|         tf.write(4*' '+'lfs_testbd_cycles = (argc > 4) ? atoi(argv[4]) : 0;\n')
 | |
|         for perm in self.perms:
 | |
|             # test declaration
 | |
|             tf.write(4*' '+'extern void test_case%d(%s);\n' % (
 | |
|                 perm.caseno, ', '.join(
 | |
|                     'intmax_t %s' % k for k in sorted(perm.defines)
 | |
|                     if k not in perm.case.defines)))
 | |
|             # test call
 | |
|             tf.write(4*' '+
 | |
|                 'if (argc < 3 || (case_ == %d && perm == %d)) {'
 | |
|                 ' test_case%d(%s); '
 | |
|                 '}\n' % (perm.caseno, perm.permno, perm.caseno, ', '.join(
 | |
|                     str(v) for k, v in sorted(perm.defines.items())
 | |
|                     if k not in perm.case.defines)))
 | |
|         tf.write('}\n')
 | |
| 
 | |
|         for tf in tfs.values():
 | |
|             tf.close()
 | |
| 
 | |
|         # write makefiles
 | |
|         with open(self.path + '.mk', 'w') as mk:
 | |
|             mk.write(RULES.replace(4*' ', '\t'))
 | |
|             mk.write('\n')
 | |
| 
 | |
|             # add truely global defines globally
 | |
|             for k, v in sorted(self.defines.items()):
 | |
|                 mk.write('%s: override CFLAGS += -D%s=%r\n' % (
 | |
|                     self.path+'.test', k, v))
 | |
| 
 | |
|             for path in tfs:
 | |
|                 if path is None:
 | |
|                     mk.write('%s: %s | %s\n' % (
 | |
|                         self.path+'.test.c',
 | |
|                         self.path,
 | |
|                         self.path+'.test.c.t'))
 | |
|                 else:
 | |
|                     mk.write('%s: %s %s | %s\n' % (
 | |
|                         self.path+'.'+path.replace('/', '.'),
 | |
|                         self.path, path,
 | |
|                         self.path+'.'+path.replace('/', '.')+'.t'))
 | |
|                 mk.write('\t./scripts/explode_asserts.py $| -o $@\n')
 | |
| 
 | |
|         self.makefile = self.path + '.mk'
 | |
|         self.target = self.path + '.test'
 | |
|         return self.makefile, self.target
 | |
| 
 | |
|     def test(self, **args):
 | |
|         # run test suite!
 | |
|         if not args.get('verbose', True):
 | |
|             sys.stdout.write(self.name + ' ')
 | |
|             sys.stdout.flush()
 | |
|         for perm in self.perms:
 | |
|             if not perm.shouldtest(**args):
 | |
|                 continue
 | |
| 
 | |
|             try:
 | |
|                 result = perm.test(**args)
 | |
|             except TestFailure as failure:
 | |
|                 perm.result = failure
 | |
|                 if not args.get('verbose', True):
 | |
|                     sys.stdout.write(FAIL)
 | |
|                     sys.stdout.flush()
 | |
|                 if not args.get('keep_going', False):
 | |
|                     if not args.get('verbose', True):
 | |
|                         sys.stdout.write('\n')
 | |
|                     raise
 | |
|             else:
 | |
|                 perm.result = PASS
 | |
|                 if not args.get('verbose', True):
 | |
|                     sys.stdout.write(PASS)
 | |
|                     sys.stdout.flush()
 | |
| 
 | |
|         if not args.get('verbose', True):
 | |
|             sys.stdout.write('\n')
 | |
| 
 | |
| def main(**args):
 | |
|     # figure out explicit defines
 | |
|     defines = {}
 | |
|     for define in args['D']:
 | |
|         k, v, *_ = define.split('=', 2) + ['']
 | |
|         defines[k] = v
 | |
| 
 | |
|     # and what class of TestCase to run
 | |
|     classes = []
 | |
|     if args.get('normal', False):
 | |
|         classes.append(TestCase)
 | |
|     if args.get('reentrant', False):
 | |
|         classes.append(ReentrantTestCase)
 | |
|     if args.get('valgrind', False):
 | |
|         classes.append(ValgrindTestCase)
 | |
|     if not classes:
 | |
|         classes = [TestCase]
 | |
| 
 | |
|     suites = []
 | |
|     for testpath in args['testpaths']:
 | |
|         # optionally specified test case/perm
 | |
|         testpath, *filter = testpath.split('#')
 | |
|         filter = [int(f) for f in filter]
 | |
| 
 | |
|         # figure out the suite's toml file
 | |
|         if os.path.isdir(testpath):
 | |
|             testpath = testpath + '/test_*.toml'
 | |
|         elif os.path.isfile(testpath):
 | |
|             testpath = testpath
 | |
|         elif testpath.endswith('.toml'):
 | |
|             testpath = TESTDIR + '/' + testpath
 | |
|         else:
 | |
|             testpath = TESTDIR + '/' + testpath + '.toml'
 | |
| 
 | |
|         # find tests
 | |
|         for path in glob.glob(testpath):
 | |
|             suites.append(TestSuite(path, classes, defines, filter, **args))
 | |
| 
 | |
|     # sort for reproducability
 | |
|     suites = sorted(suites)
 | |
| 
 | |
|     # generate permutations
 | |
|     for suite in suites:
 | |
|         suite.permute(**args)
 | |
| 
 | |
|     # build tests in parallel
 | |
|     print('====== building ======')
 | |
|     makefiles = []
 | |
|     targets = []
 | |
|     for suite in suites:
 | |
|         makefile, target = suite.build(**args)
 | |
|         makefiles.append(makefile)
 | |
|         targets.append(target)
 | |
| 
 | |
|     cmd = (['make', '-f', 'Makefile'] +
 | |
|         list(it.chain.from_iterable(['-f', m] for m in makefiles)) +
 | |
|         [target for target in targets])
 | |
|     mpty, spty = pty.openpty()
 | |
|     if args.get('verbose', False):
 | |
|         print(' '.join(shlex.quote(c) for c in cmd))
 | |
|     proc = sp.Popen(cmd, stdout=spty, stderr=spty)
 | |
|     os.close(spty)
 | |
|     mpty = os.fdopen(mpty, 'r', 1)
 | |
|     stdout = []
 | |
|     while True:
 | |
|         try:
 | |
|             line = mpty.readline()
 | |
|         except OSError as e:
 | |
|             if e.errno == errno.EIO:
 | |
|                 break
 | |
|             raise
 | |
|         stdout.append(line)
 | |
|         if args.get('verbose', False):
 | |
|             sys.stdout.write(line)
 | |
|         # intercept warnings
 | |
|         m = re.match(
 | |
|             '^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
 | |
|             .format('(?:\033\[[\d;]*.| )*', 'warning'),
 | |
|             line)
 | |
|         if m and not args.get('verbose', False):
 | |
|             try:
 | |
|                 with open(m.group(1)) as f:
 | |
|                     lineno = int(m.group(2))
 | |
|                     line = next(it.islice(f, lineno-1, None)).strip('\n')
 | |
|                 sys.stdout.write(
 | |
|                     "\033[01m{path}:{lineno}:\033[01;35mwarning:\033[m "
 | |
|                     "{message}\n{line}\n\n".format(
 | |
|                         path=m.group(1), line=line, lineno=lineno,
 | |
|                         message=m.group(3)))
 | |
|             except:
 | |
|                 pass
 | |
|     proc.wait()
 | |
| 
 | |
|     if proc.returncode != 0:
 | |
|         if not args.get('verbose', False):
 | |
|             for line in stdout:
 | |
|                 sys.stdout.write(line)
 | |
|         sys.exit(-3)
 | |
| 
 | |
|     print('built %d test suites, %d test cases, %d permutations' % (
 | |
|         len(suites),
 | |
|         sum(len(suite.cases) for suite in suites),
 | |
|         sum(len(suite.perms) for suite in suites)))
 | |
| 
 | |
|     filtered = 0
 | |
|     for suite in suites:
 | |
|         for perm in suite.perms:
 | |
|             filtered += perm.shouldtest(**args)
 | |
|     if filtered != sum(len(suite.perms) for suite in suites):
 | |
|         print('filtered down to %d permutations' % filtered)
 | |
| 
 | |
|     # only requested to build?
 | |
|     if args.get('build', False):
 | |
|         return 0
 | |
| 
 | |
|     print('====== testing ======')
 | |
|     try:
 | |
|         for suite in suites:
 | |
|             suite.test(**args)
 | |
|     except TestFailure:
 | |
|         pass
 | |
| 
 | |
|     print('====== results ======')
 | |
|     passed = 0
 | |
|     failed = 0
 | |
|     for suite in suites:
 | |
|         for perm in suite.perms:
 | |
|             if not hasattr(perm, 'result'):
 | |
|                 continue
 | |
| 
 | |
|             if perm.result == PASS:
 | |
|                 passed += 1
 | |
|             else:
 | |
|                 sys.stdout.write(
 | |
|                     "\033[01m{path}:{lineno}:\033[01;31mfailure:\033[m "
 | |
|                     "{perm} failed with {returncode}\n".format(
 | |
|                         perm=perm, path=perm.suite.path, lineno=perm.lineno,
 | |
|                         returncode=perm.result.returncode or 0))
 | |
|                 if perm.result.stdout:
 | |
|                     if perm.result.assert_:
 | |
|                         stdout = perm.result.stdout[:-1]
 | |
|                     else:
 | |
|                         stdout = perm.result.stdout
 | |
|                     for line in stdout[-5:]:
 | |
|                         sys.stdout.write(line)
 | |
|                 if perm.result.assert_:
 | |
|                     sys.stdout.write(
 | |
|                         "\033[01m{path}:{lineno}:\033[01;31massert:\033[m "
 | |
|                         "{message}\n{line}\n".format(
 | |
|                             **perm.result.assert_))
 | |
|                 sys.stdout.write('\n')
 | |
|                 failed += 1
 | |
| 
 | |
|     if args.get('gdb', False):
 | |
|         failure = None
 | |
|         for suite in suites:
 | |
|             for perm in suite.perms:
 | |
|                 if getattr(perm, 'result', PASS) != PASS:
 | |
|                     failure = perm.result
 | |
|         if failure is not None:
 | |
|             print('======= gdb ======')
 | |
|             # drop into gdb
 | |
|             failure.case.test(failure=failure, **args)
 | |
|             sys.exit(0)
 | |
| 
 | |
|     print('tests passed: %d' % passed)
 | |
|     print('tests failed: %d' % failed)
 | |
|     return 1 if failed > 0 else 0
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     import argparse
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description="Run parameterized tests in various configurations.")
 | |
|     parser.add_argument('testpaths', nargs='*', default=[TESTDIR],
 | |
|         help="Description of test(s) to run. By default, this is all tests \
 | |
|             found in the \"{0}\" directory. Here, you can specify a different \
 | |
|             directory of tests, a specific file, a suite by name, and even a \
 | |
|             specific test case by adding brackets. For example \
 | |
|             \"test_dirs[0]\" or \"{0}/test_dirs.toml[0]\".".format(TESTDIR))
 | |
|     parser.add_argument('-D', action='append', default=[],
 | |
|         help="Overriding parameter definitions.")
 | |
|     parser.add_argument('-v', '--verbose', action='store_true',
 | |
|         help="Output everything that is happening.")
 | |
|     parser.add_argument('-k', '--keep-going', action='store_true',
 | |
|         help="Run all tests instead of stopping on first error. Useful for CI.")
 | |
|     parser.add_argument('-p', '--persist', choices=['erase', 'noerase'],
 | |
|         nargs='?', const='erase',
 | |
|         help="Store disk image in a file.")
 | |
|     parser.add_argument('-b', '--build', action='store_true',
 | |
|         help="Only build the tests, do not execute.")
 | |
|     parser.add_argument('-g', '--gdb', choices=['init', 'main', 'assert'],
 | |
|         nargs='?', const='assert',
 | |
|         help="Drop into gdb on test failure.")
 | |
|     parser.add_argument('--no-internal', action='store_true',
 | |
|         help="Don't run tests that require internal knowledge.")
 | |
|     parser.add_argument('-n', '--normal', action='store_true',
 | |
|         help="Run tests normally.")
 | |
|     parser.add_argument('-r', '--reentrant', action='store_true',
 | |
|         help="Run reentrant tests with simulated power-loss.")
 | |
|     parser.add_argument('-V', '--valgrind', action='store_true',
 | |
|         help="Run non-leaky tests under valgrind to check for memory leaks.")
 | |
|     parser.add_argument('-e', '--exec', default=[], type=lambda e: e.split(' '),
 | |
|         help="Run tests with another executable prefixed on the command line.")
 | |
|     parser.add_argument('-d', '--disk',
 | |
|         help="Specify a file to use for persistent/reentrant tests.")
 | |
|     sys.exit(main(**vars(parser.parse_args())))
 |