Files
thirdparty-littlefs/scripts/test_.py
Christopher Haster ecc2857c0e Migrated bad-block tests
Even with adding better reentrance testing, the bad-block tests are
still very useful at isolating the block eviction logic.

This also required rewriting a bit of the internal testing wirework
to allow custom block devices which opens up quite a bit more straegies
for testing.
2020-01-14 12:04:20 -06:00

807 lines
27 KiB
Python
Executable File

#!/usr/bin/env python3
# This script manages littlefs tests, which are configured with
# .toml files stored in the tests directory.
#
# TODO
# x nargs > 1?
# x show perm config on failure
# x filtering
# n show perm config on verbose?
# x better lineno tracking for cases?
# n non-int perms?
# x different path format?
# - suite.prologue, suite.epilogue
# x in
# x change BLOCK_CYCLES to -1 by default
# x change persist behaviour
# x config chaining correct
# - why can't gdb see my defines?
# - say no to internal?
# - buffering stdout issues?
import toml
import glob
import re
import os
import io
import itertools as it
import collections.abc as abc
import subprocess as sp
import base64
import sys
import copy
import shlex
TESTDIR = 'tests_'
RULES = """
define FLATTEN
tests_/%$(subst /,.,$(target)): $(target)
./scripts/explode_asserts.py $$< -o $$@
endef
$(foreach target,$(SRC),$(eval $(FLATTEN)))
-include tests_/*.d
.SECONDARY:
%.test: override CFLAGS += -fdiagnostics-color=always
%.test: override CFLAGS += -ggdb
%.test: %.test.o $(foreach f,$(subst /,.,$(SRC:.c=.o)),%.$f)
$(CC) $(CFLAGS) $^ $(LFLAGS) -o $@
"""
GLOBALS = """
//////////////// AUTOGENERATED TEST ////////////////
#include "lfs.h"
#include "filebd/lfs_filebd.h"
#include "rambd/lfs_rambd.h"
#include <stdio.h>
extern const char *lfs_testbd_disk;
typedef union {
lfs_filebd_t filebd;
lfs_rambd_t rambd;
} lfs_testbd_t;
struct lfs_testbd_config {
struct lfs_filebd_config filecfg;
struct lfs_rambd_config ramcfg;
};
__attribute__((unused))
static int lfs_testbd_createcfg(const struct lfs_config *cfg,
const struct lfs_testbd_config *bdcfg) {
if (lfs_testbd_disk) {
return lfs_filebd_createcfg(cfg, lfs_testbd_disk, &bdcfg->filecfg);
} else {
return lfs_rambd_createcfg(cfg, &bdcfg->ramcfg);
}
}
__attribute__((unused))
static void lfs_testbd_destroy(const struct lfs_config *cfg) {
if (lfs_testbd_disk) {
lfs_filebd_destroy(cfg);
} else {
lfs_rambd_destroy(cfg);
}
}
__attribute__((unused))
static int lfs_testbd_read(const struct lfs_config *cfg, lfs_block_t block,
lfs_off_t off, void *buffer, lfs_size_t size) {
if (lfs_testbd_disk) {
return lfs_filebd_read(cfg, block, off, buffer, size);
} else {
return lfs_rambd_read(cfg, block, off, buffer, size);
}
}
__attribute__((unused))
static int lfs_testbd_prog(const struct lfs_config *cfg, lfs_block_t block,
lfs_off_t off, const void *buffer, lfs_size_t size) {
if (lfs_testbd_disk) {
return lfs_filebd_prog(cfg, block, off, buffer, size);
} else {
return lfs_rambd_prog(cfg, block, off, buffer, size);
}
}
__attribute__((unused))
static int lfs_testbd_erase(const struct lfs_config *cfg, lfs_block_t block) {
if (lfs_testbd_disk) {
return lfs_filebd_erase(cfg, block);
} else {
return lfs_rambd_erase(cfg, block);
}
}
__attribute__((unused))
static int lfs_testbd_sync(const struct lfs_config *cfg) {
if (lfs_testbd_disk) {
return lfs_filebd_sync(cfg);
} else {
return lfs_rambd_sync(cfg);
}
}
"""
DEFINES = {
"LFS_BD_READ": "lfs_testbd_read",
"LFS_BD_PROG": "lfs_testbd_prog",
"LFS_BD_ERASE": "lfs_testbd_erase",
"LFS_BD_SYNC": "lfs_testbd_sync",
"LFS_READ_SIZE": 16,
"LFS_PROG_SIZE": "LFS_READ_SIZE",
"LFS_BLOCK_SIZE": 512,
"LFS_BLOCK_COUNT": 1024,
"LFS_BLOCK_CYCLES": -1,
"LFS_CACHE_SIZE": "(64 % LFS_PROG_SIZE == 0 ? 64 : LFS_PROG_SIZE)",
"LFS_LOOKAHEAD_SIZE": 16,
"LFS_ERASE_VALUE": 0xff,
}
PROLOGUE = """
// prologue
__attribute__((unused)) lfs_t lfs;
__attribute__((unused)) lfs_testbd_t bd;
__attribute__((unused)) lfs_file_t file;
__attribute__((unused)) lfs_dir_t dir;
__attribute__((unused)) struct lfs_info info;
__attribute__((unused)) char path[1024];
__attribute__((unused)) uint8_t buffer[1024];
__attribute__((unused)) lfs_size_t size;
__attribute__((unused)) int err;
__attribute__((unused)) const struct lfs_config cfg = {
.context = &bd,
.read = LFS_BD_READ,
.prog = LFS_BD_PROG,
.erase = LFS_BD_ERASE,
.sync = LFS_BD_SYNC,
.read_size = LFS_READ_SIZE,
.prog_size = LFS_PROG_SIZE,
.block_size = LFS_BLOCK_SIZE,
.block_count = LFS_BLOCK_COUNT,
.block_cycles = LFS_BLOCK_CYCLES,
.cache_size = LFS_CACHE_SIZE,
.lookahead_size = LFS_LOOKAHEAD_SIZE,
};
__attribute__((unused)) const struct lfs_testbd_config bdcfg = {
.filecfg.erase_value = LFS_ERASE_VALUE,
.ramcfg.erase_value = LFS_ERASE_VALUE,
};
lfs_testbd_createcfg(&cfg, &bdcfg) => 0;
"""
EPILOGUE = """
// epilogue
lfs_testbd_destroy(&cfg);
"""
PASS = '\033[32m✓\033[0m'
FAIL = '\033[31m✗\033[0m'
class TestFailure(Exception):
def __init__(self, case, returncode=None, stdout=None, assert_=None):
self.case = case
self.returncode = returncode
self.stdout = stdout
self.assert_ = assert_
class TestCase:
def __init__(self, config, filter=filter,
suite=None, caseno=None, lineno=None, **_):
self.filter = filter
self.suite = suite
self.caseno = caseno
self.lineno = lineno
self.code = config['code']
self.code_lineno = config['code_lineno']
self.defines = config.get('define', {})
self.if_ = config.get('if', None)
self.in_ = config.get('in', None)
def __str__(self):
if hasattr(self, 'permno'):
if any(k not in self.case.defines for k in self.defines):
return '%s#%d#%d (%s)' % (
self.suite.name, self.caseno, self.permno, ', '.join(
'%s=%s' % (k, v) for k, v in self.defines.items()
if k not in self.case.defines))
else:
return '%s#%d#%d' % (
self.suite.name, self.caseno, self.permno)
else:
return '%s#%d' % (
self.suite.name, self.caseno)
def permute(self, defines, permno=None, **_):
ncase = copy.copy(self)
ncase.case = self
ncase.perms = [ncase]
ncase.permno = permno
ncase.defines = defines
return ncase
def build(self, f, **_):
# prologue
f.write('void test_case%d(%s) {\n' % (self.caseno, ','.join(
'\n'+8*' '+'__attribute__((unused)) intmax_t %s' % k
for k in sorted(self.perms[0].defines)
if k not in self.defines)))
for k, v in sorted(self.defines.items()):
if k not in self.suite.defines:
f.write(4*' '+'#define %s %s\n' % (k, v))
f.write(PROLOGUE)
f.write('\n')
f.write(4*' '+'// test case %d\n' % self.caseno)
f.write(4*' '+'#line %d "%s"\n' % (self.code_lineno, self.suite.path))
# test case goes here
f.write(self.code)
# epilogue
f.write(EPILOGUE)
f.write('\n')
for k, v in sorted(self.defines.items()):
if k not in self.suite.defines:
f.write(4*' '+'#undef %s\n' % k)
f.write('}\n')
def shouldtest(self, **args):
if (self.filter is not None and
len(self.filter) >= 1 and
self.filter[0] != self.caseno):
return False
elif (self.filter is not None and
len(self.filter) >= 2 and
self.filter[1] != self.permno):
return False
elif self.if_ is not None:
return eval(self.if_, None, self.defines.copy())
else:
return True
def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
# build command
cmd = exec + ['./%s.test' % self.suite.path,
repr(self.caseno), repr(self.permno)]
# persist disk or keep in RAM for speed?
if persist:
if persist != 'noerase':
try:
os.remove(self.suite.path + '.disk')
except FileNotFoundError:
pass
cmd.append(self.suite.path + '.disk')
# failed? drop into debugger?
if gdb and failure:
ncmd = ['gdb']
if gdb == 'assert':
ncmd.extend(['-ex', 'r'])
if failure.assert_:
ncmd.extend(['-ex', 'up'])
elif gdb == 'start':
ncmd.extend([
'-ex', 'b %s:%d' % (self.suite.path, self.code_lineno),
'-ex', 'r'])
ncmd.extend(['--args'] + cmd)
if args.get('verbose', False):
print(' '.join(shlex.quote(c) for c in ncmd))
sys.exit(sp.call(ncmd))
# run test case!
stdout = []
assert_ = None
if args.get('verbose', False):
print(' '.join(shlex.quote(c) for c in cmd))
proc = sp.Popen(cmd,
universal_newlines=True,
bufsize=1,
stdout=sp.PIPE,
stderr=sp.STDOUT)
for line in iter(proc.stdout.readline, ''):
stdout.append(line)
if args.get('verbose', False):
sys.stdout.write(line)
# intercept asserts
m = re.match(
'^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
.format('(?:\033\[[\d;]*.| )*', 'assert'),
line)
if m and assert_ is None:
try:
with open(m.group(1)) as f:
lineno = int(m.group(2))
line = next(it.islice(f, lineno-1, None)).strip('\n')
assert_ = {
'path': m.group(1),
'line': line,
'lineno': lineno,
'message': m.group(3)}
except:
pass
proc.wait()
# did we pass?
if proc.returncode != 0:
raise TestFailure(self, proc.returncode, stdout, assert_)
else:
return PASS
class ValgrindTestCase(TestCase):
def __init__(self, config, **args):
self.leaky = config.get('leaky', False)
super().__init__(config, **args)
def shouldtest(self, **args):
return not self.leaky and super().shouldtest(**args)
def test(self, exec=[], **args):
exec = exec + [
'valgrind',
'--leak-check=full',
'--error-exitcode=4',
'-q']
return super().test(exec=exec, **args)
class ReentrantTestCase(TestCase):
def __init__(self, config, **args):
self.reentrant = config.get('reentrant', False)
super().__init__(config, **args)
def shouldtest(self, **args):
return self.reentrant and super().shouldtest(**args)
def test(self, exec=[], persist=False, gdb=False, failure=None, **args):
# clear disk first?
if persist != 'noerase':
try:
os.remove(self.suite.path + '.disk')
except FileNotFoundError:
pass
for cycles in it.count(1):
# exact cycle we should drop into debugger?
if gdb and failure and failure.cycleno == cycles:
return super().test(exec=exec, persist='noerase',
gdb=gdb, failure=failure, **args)
# run tests, but kill the program after prog/erase has
# been hit n cycles. We exit with a special return code if the
# program has not finished, since this isn't a test failure.
nexec = exec + [
'gdb', '-batch-silent',
'-ex', 'handle all nostop',
'-ex', 'b lfs_filebd_prog',
'-ex', 'b lfs_filebd_erase',
'-ex', 'r',
] + cycles*['-ex', 'c'] + [
'-ex', 'q '
'!$_isvoid($_exitsignal) ? $_exitsignal : '
'!$_isvoid($_exitcode) ? $_exitcode : '
'33',
'--args']
try:
return super().test(exec=nexec, persist='noerase', **args)
except TestFailure as nfailure:
if nfailure.returncode == 33:
continue
else:
nfailure.cycleno = cycles
raise
class TestSuite:
def __init__(self, path, filter=None, TestCase=TestCase, **args):
self.name = os.path.basename(path)
if self.name.endswith('.toml'):
self.name = self.name[:-len('.toml')]
self.path = path
self.filter = filter
self.TestCase = TestCase
with open(path) as f:
# load tests
config = toml.load(f)
# find line numbers
f.seek(0)
linenos = []
code_linenos = []
for i, line in enumerate(f):
if re.match(r'\[\[\s*case\s*\]\]', line):
linenos.append(i+1)
if re.match(r'code\s*=\s*(\'\'\'|""")', line):
code_linenos.append(i+2)
code_linenos.reverse()
# grab global config
self.defines = config.get('define', {})
self.code = config.get('code', None)
if self.code is not None:
self.code_lineno = code_linenos.pop()
# create initial test cases
self.cases = []
for i, (case, lineno) in enumerate(zip(config['case'], linenos)):
# code lineno?
if 'code' in case:
case['code_lineno'] = code_linenos.pop()
# give our case's config a copy of our "global" config
for k, v in config.items():
if k not in case:
case[k] = v
# initialize test case
self.cases.append(self.TestCase(case, filter=filter,
suite=self, caseno=i, lineno=lineno, **args))
def __str__(self):
return self.name
def __lt__(self, other):
return self.name < other.name
def permute(self, defines={}, **args):
for case in self.cases:
# lets find all parameterized definitions, in one of [args.D,
# suite.defines, case.defines, DEFINES]. Note that each of these
# can be either a dict of defines, or a list of dicts, expressing
# an initial set of permutations.
pending = [{}]
for inits in [defines, self.defines, case.defines, DEFINES]:
if not isinstance(inits, list):
inits = [inits]
npending = []
for init, pinit in it.product(inits, pending):
ninit = pinit.copy()
for k, v in init.items():
if k not in ninit:
try:
ninit[k] = eval(v)
except:
ninit[k] = v
npending.append(ninit)
pending = npending
# expand permutations
pending = list(reversed(pending))
expanded = []
while pending:
perm = pending.pop()
for k, v in sorted(perm.items()):
if not isinstance(v, str) and isinstance(v, abc.Iterable):
for nv in reversed(v):
nperm = perm.copy()
nperm[k] = nv
pending.append(nperm)
break
else:
expanded.append(perm)
# generate permutations
case.perms = []
for i, perm in enumerate(expanded):
case.perms.append(case.permute(perm, permno=i, **args))
# also track non-unique defines
case.defines = {}
for k, v in case.perms[0].defines.items():
if all(perm.defines[k] == v for perm in case.perms):
case.defines[k] = v
# track all perms and non-unique defines
self.perms = []
for case in self.cases:
self.perms.extend(case.perms)
self.defines = {}
for k, v in self.perms[0].defines.items():
if all(perm.defines.get(k, None) == v for perm in self.perms):
self.defines[k] = v
return self.perms
def build(self, **args):
# build test files
tf = open(self.path + '.test.c.t', 'w')
tf.write(GLOBALS)
if self.code is not None:
tf.write('#line %d "%s"\n' % (self.code_lineno, self.path))
tf.write(self.code)
tfs = {None: tf}
for case in self.cases:
if case.in_ not in tfs:
tfs[case.in_] = open(self.path+'.'+
case.in_.replace('/', '.')+'.t', 'w')
tfs[case.in_].write('#line 1 "%s"\n' % case.in_)
with open(case.in_) as f:
for line in f:
tfs[case.in_].write(line)
tfs[case.in_].write('\n')
tfs[case.in_].write(GLOBALS)
tfs[case.in_].write('\n')
case.build(tfs[case.in_], **args)
tf.write('\n')
tf.write('const char *lfs_testbd_disk;\n')
tf.write('int main(int argc, char **argv) {\n')
tf.write(4*' '+'int case_ = (argc >= 2) ? atoi(argv[1]) : 0;\n')
tf.write(4*' '+'int perm = (argc >= 3) ? atoi(argv[2]) : 0;\n')
tf.write(4*' '+'lfs_testbd_disk = (argc >= 4) ? argv[3] : NULL;\n')
for perm in self.perms:
# test declaration
tf.write(4*' '+'extern void test_case%d(%s);\n' % (
perm.caseno, ', '.join(
'intmax_t %s' % k for k in sorted(perm.defines)
if k not in perm.case.defines)))
# test call
tf.write(4*' '+
'if (argc < 3 || (case_ == %d && perm == %d)) {'
' test_case%d(%s); '
'}\n' % (perm.caseno, perm.permno, perm.caseno, ', '.join(
str(v) for k, v in sorted(perm.defines.items())
if k not in perm.case.defines)))
tf.write('}\n')
for tf in tfs.values():
tf.close()
# write makefiles
with open(self.path + '.mk', 'w') as mk:
mk.write(RULES.replace(4*' ', '\t'))
mk.write('\n')
# add truely global defines globally
for k, v in sorted(self.defines.items()):
mk.write('%s: override CFLAGS += -D%s=%r\n' % (
self.path+'.test', k, v))
for path in tfs:
if path is None:
mk.write('%s: %s | %s\n' % (
self.path+'.test.c',
self.path,
self.path+'.test.c.t'))
else:
mk.write('%s: %s %s | %s\n' % (
self.path+'.'+path.replace('/', '.'),
self.path, path,
self.path+'.'+path.replace('/', '.')+'.t'))
mk.write('\t./scripts/explode_asserts.py $| -o $@\n')
self.makefile = self.path + '.mk'
self.target = self.path + '.test'
return self.makefile, self.target
def test(self, caseno=None, permno=None, **args):
# run test suite!
if not args.get('verbose', True):
sys.stdout.write(self.name + ' ')
sys.stdout.flush()
for perm in self.perms:
if caseno is not None and perm.caseno != caseno:
continue
if permno is not None and perm.permno != permno:
continue
if not perm.shouldtest(**args):
continue
try:
result = perm.test(**args)
except TestFailure as failure:
perm.result = failure
if not args.get('verbose', True):
sys.stdout.write(FAIL)
sys.stdout.flush()
if not args.get('keep_going', False):
if not args.get('verbose', True):
sys.stdout.write('\n')
raise
else:
perm.result = PASS
if not args.get('verbose', True):
sys.stdout.write(PASS)
sys.stdout.flush()
if not args.get('verbose', True):
sys.stdout.write('\n')
def main(**args):
suites = []
for testpath in args['testpaths']:
# optionally specified test case/perm
testpath, *filter = testpath.split('#')
filter = [int(f) for f in filter]
# figure out the suite's toml file
if os.path.isdir(testpath):
testpath = testpath + '/test_*.toml'
elif os.path.isfile(testpath):
testpath = testpath
elif testpath.endswith('.toml'):
testpath = TESTDIR + '/' + testpath
else:
testpath = TESTDIR + '/' + testpath + '.toml'
# find tests
for path in glob.glob(testpath):
if args.get('valgrind', False):
TestCase_ = ValgrindTestCase
elif args.get('reentrant', False):
TestCase_ = ReentrantTestCase
else:
TestCase_ = TestCase
suites.append(TestSuite(path,
filter=filter, TestCase=TestCase_, **args))
# sort for reproducability
suites = sorted(suites)
# generate permutations
defines = {}
for define in args['D']:
k, v, *_ = define.split('=', 2) + ['']
defines[k] = v
for suite in suites:
suite.permute(defines, **args)
# build tests in parallel
print('====== building ======')
makefiles = []
targets = []
for suite in suites:
makefile, target = suite.build(**args)
makefiles.append(makefile)
targets.append(target)
cmd = (['make', '-f', 'Makefile'] +
list(it.chain.from_iterable(['-f', m] for m in makefiles)) +
[target for target in targets])
stdout = []
if args.get('verbose', False):
print(' '.join(shlex.quote(c) for c in cmd))
proc = sp.Popen(cmd,
universal_newlines=True,
bufsize=1,
stdout=sp.PIPE,
stderr=sp.STDOUT)
for line in iter(proc.stdout.readline, ''):
stdout.append(line)
if args.get('verbose', False):
sys.stdout.write(line)
# intercept warnings
m = re.match(
'^{0}([^:]+):(\d+):(?:\d+:)?{0}{1}:{0}(.*)$'
.format('(?:\033\[[\d;]*.| )*', 'warning'),
line)
if m and not args.get('verbose', False):
try:
with open(m.group(1)) as f:
lineno = int(m.group(2))
line = next(it.islice(f, lineno-1, None)).strip('\n')
sys.stdout.write(
"\033[01m{path}:{lineno}:\033[01;35mwarning:\033[m "
"{message}\n{line}\n\n".format(
path=m.group(1), line=line, lineno=lineno,
message=m.group(3)))
except:
pass
proc.wait()
if proc.returncode != 0:
if not args.get('verbose', False):
for line in stdout:
sys.stdout.write(line)
sys.exit(-3)
print('built %d test suites, %d test cases, %d permutations' % (
len(suites),
sum(len(suite.cases) for suite in suites),
sum(len(suite.perms) for suite in suites)))
filtered = 0
for suite in suites:
for perm in suite.perms:
filtered += perm.shouldtest(**args)
if filtered != sum(len(suite.perms) for suite in suites):
print('filtered down to %d permutations' % filtered)
print('====== testing ======')
try:
for suite in suites:
suite.test(**args)
except TestFailure:
pass
print('====== results ======')
passed = 0
failed = 0
for suite in suites:
for perm in suite.perms:
if not hasattr(perm, 'result'):
continue
if perm.result == PASS:
passed += 1
else:
sys.stdout.write(
"\033[01m{path}:{lineno}:\033[01;31mfailure:\033[m "
"{perm} failed with {returncode}\n".format(
perm=perm, path=perm.suite.path, lineno=perm.lineno,
returncode=perm.result.returncode or 0))
if perm.result.stdout:
for line in (perm.result.stdout
if not perm.result.assert_
else perm.result.stdout[:-1]):
sys.stdout.write(line)
if perm.result.assert_:
sys.stdout.write(
"\033[01m{path}:{lineno}:\033[01;31massert:\033[m "
"{message}\n{line}\n".format(
**perm.result.assert_))
else:
for line in perm.result.stdout:
sys.stdout.write(line)
sys.stdout.write('\n')
failed += 1
if args.get('gdb', False):
failure = None
for suite in suites:
for perm in suite.perms:
if getattr(perm, 'result', PASS) != PASS:
failure = perm.result
if failure is not None:
print('======= gdb ======')
# drop into gdb
failure.case.test(failure=failure, **args)
sys.exit(0)
print('tests passed: %d' % passed)
print('tests failed: %d' % failed)
return 1 if failed > 0 else 0
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Run parameterized tests in various configurations.")
parser.add_argument('testpaths', nargs='*', default=[TESTDIR],
help="Description of test(s) to run. By default, this is all tests \
found in the \"{0}\" directory. Here, you can specify a different \
directory of tests, a specific file, a suite by name, and even a \
specific test case by adding brackets. For example \
\"test_dirs[0]\" or \"{0}/test_dirs.toml[0]\".".format(TESTDIR))
parser.add_argument('-D', action='append', default=[],
help="Overriding parameter definitions.")
parser.add_argument('-v', '--verbose', action='store_true',
help="Output everything that is happening.")
parser.add_argument('-k', '--keep-going', action='store_true',
help="Run all tests instead of stopping on first error. Useful for CI.")
parser.add_argument('-p', '--persist', choices=['erase', 'noerase'],
nargs='?', const='erase',
help="Store disk image in a file.")
parser.add_argument('-g', '--gdb', choices=['init', 'start', 'assert'],
nargs='?', const='assert',
help="Drop into gdb on test failure.")
parser.add_argument('--valgrind', action='store_true',
help="Run non-leaky tests under valgrind to check for memory leaks.")
parser.add_argument('--reentrant', action='store_true',
help="Run reentrant tests with simulated power-loss.")
parser.add_argument('-e', '--exec', default=[], type=lambda e: e.split(' '),
help="Run tests with another executable prefixed on the command line.")
sys.exit(main(**vars(parser.parse_args())))