From dd37a782a8637bdee703a13c949b222b9ba8b95d Mon Sep 17 00:00:00 2001 From: Uwe Hermann Date: Mon, 1 Sep 2014 18:23:54 +0200 Subject: Add initial set of PD tests. This is a slightly modified version of the PD test suite that was part of libsigrokdecode previously. --- decoder/pdtest | 574 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 574 insertions(+) create mode 100755 decoder/pdtest (limited to 'decoder/pdtest') diff --git a/decoder/pdtest b/decoder/pdtest new file mode 100755 index 0000000..fa72934 --- /dev/null +++ b/decoder/pdtest @@ -0,0 +1,574 @@ +#!/usr/bin/env python3 +## +## This file is part of the sigrok-test project. +## +## Copyright (C) 2013 Bert Vermeulen +## +## This program is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program. If not, see . +## + +import os +import sys +import re +from getopt import getopt +from tempfile import mkstemp +from subprocess import Popen, PIPE +from difflib import Differ +from hashlib import md5 +from shutil import copy + +DEBUG = 0 +VERBOSE = False + + +class E_syntax(Exception): + pass +class E_badline(Exception): + pass + +def INFO(msg, end='\n'): + if VERBOSE: + print(msg, end=end) + sys.stdout.flush() + + +def DBG(msg): + if DEBUG: + print(msg) + + +def ERR(msg): + print(msg, file=sys.stderr) + + +def usage(msg=None): + if msg: + print(msg.strip() + '\n') + print("""Usage: testpd [-dvarslR] [test, ...] + -d Turn on debugging + -v Verbose + -a All tests + -l List all tests + -s Show test(s) + -r Run test(s) + -f Fix failed test(s) + -c Report decoder code coverage + -R Save test reports to + Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""") + sys.exit() + + +def check_tclist(tc): + if 'pdlist' not in tc or not tc['pdlist']: + return("No protocol decoders") + if 'input' not in tc or not tc['input']: + return("No input") + if 'output' not in tc or not tc['output']: + return("No output") + for op in tc['output']: + if 'match' not in op: + return("No match in output") + + return None + + +def parse_testfile(path, pd, tc, op_type, op_class): + DBG("Opening '%s'" % path) + tclist = [] + for line in open(path).read().split('\n'): + try: + line = line.strip() + if len(line) == 0 or line[0] == "#": + continue + f = line.split() + if not tclist and f[0] != "test": + # That can't be good. + raise E_badline + key = f.pop(0) + if key == 'test': + if len(f) != 1: + raise E_syntax + # new testcase + tclist.append({ + 'pd': pd, + 'name': f[0], + 'pdlist': [], + 'output': [], + }) + elif key == 'protocol-decoder': + if len(f) < 1: + raise E_syntax + pd_spec = { + 'name': f.pop(0), + 'channels': [], + 'options': [], + } + while len(f): + if len(f) == 1: + # Always needs + raise E_syntax + a, b = f[:2] + f = f[2:] + if '=' not in b: + raise E_syntax + opt, val = b.split('=') + if a == 'channel': + try: + val = int(val) + except: + raise E_syntax + pd_spec['channels'].append([opt, val]) + elif a == 'option': + pd_spec['options'].append([opt, val]) + else: + raise E_syntax + tclist[-1]['pdlist'].append(pd_spec) + elif key == 'stack': + if len(f) < 2: + raise E_syntax + tclist[-1]['stack'] = f + elif key == 'input': + if len(f) != 1: + raise E_syntax + tclist[-1]['input'] = f[0] + elif key == 'output': + op_spec = { + 'pd': f.pop(0), + 'type': f.pop(0), + } + while len(f): + if len(f) == 1: + # Always needs + raise E_syntax + a, b = f[:2] + f = f[2:] + if a == 'class': + op_spec['class'] = b + elif a == 'match': + op_spec['match'] = b + else: + raise E_syntax + tclist[-1]['output'].append(op_spec) + else: + raise E_badline + except E_badline as e: + ERR("Invalid syntax in %s: line '%s'" % (path, line)) + return [] + except E_syntax as e: + ERR("Unable to parse %s: unknown line '%s'" % (path, line)) + return [] + + # If a specific testcase was requested, keep only that one. + if tc is not None: + target_tc = None + for t in tclist: + if t['name'] == tc: + target_tc = t + break + # ...and a specific output type + if op_type is not None: + target_oplist = [] + for op in target_tc['output']: + if op['type'] == op_type: + # ...and a specific output class + if op_class is None or ('class' in op and op['class'] == op_class): + target_oplist.append(op) + DBG("match on [%s]" % str(op)) + target_tc['output'] = target_oplist + if target_tc is None: + tclist = [] + else: + tclist = [target_tc] + for t in tclist: + error = check_tclist(t) + if error: + ERR("Error in %s: %s" % (path, error)) + return [] + + return tclist + + +def get_tests(testnames): + tests = {} + for testspec in testnames: + # Optional testspec in the form pd/testcase/type/class + tc = op_type = op_class = None + ts = testspec.strip("/").split("/") + pd = ts.pop(0) + tests[pd] = [] + if ts: + tc = ts.pop(0) + if ts: + op_type = ts.pop(0) + if ts: + op_class = ts.pop(0) + path = os.path.join(tests_dir, pd) + if not os.path.isdir(path): + # User specified non-existent PD + raise Exception("%s not found." % path) + path = os.path.join(tests_dir, pd, "test.conf") + if not os.path.exists(path): + # PD doesn't have any tests yet + continue + tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class)) + + return tests + + +def diff_text(f1, f2): + t1 = open(f1).readlines() + t2 = open(f2).readlines() + diff = [] + d = Differ() + for line in d.compare(t1, t2): + if line[:2] in ('- ', '+ '): + diff.append(line.strip()) + + return diff + + +def compare_binary(f1, f2): + h1 = md5() + h1.update(open(f1, 'rb').read()) + h2 = md5() + h2.update(open(f2, 'rb').read()) + if h1.digest() == h2.digest(): + result = None + else: + result = ["Binary output does not match."] + + return result + + +# runtc's stdout can have lines like: +# coverage: lines=161 missed=2 coverage=99% +def parse_stats(text): + stats = {} + for line in text.strip().split('\n'): + fields = line.split() + key = fields.pop(0).strip(':') + if key not in stats: + stats[key] = [] + stats[key].append({}) + for f in fields: + k, v = f.split('=') + stats[key][-1][k] = v + + return stats + + +# take result set of all tests in a PD, and summarize which lines +# were not covered by any of the tests. +def coverage_sum(cvglist): + lines = 0 + missed = 0 + missed_lines = {} + for record in cvglist: + lines = int(record['lines']) + missed += int(record['missed']) + if 'missed_lines' not in record: + continue + for linespec in record['missed_lines'].split(','): + if linespec not in missed_lines: + missed_lines[linespec] = 1 + else: + missed_lines[linespec] += 1 + + # keep only those lines that didn't show up in every non-summary record + final_missed = [] + for linespec in missed_lines: + if missed_lines[linespec] != len(cvglist): + continue + final_missed.append(linespec) + + return lines, final_missed + + +def run_tests(tests, fix=False): + errors = 0 + results = [] + cmd = [os.path.join(runtc_dir, 'runtc')] + if opt_coverage: + fd, coverage = mkstemp() + os.close(fd) + cmd.extend(['-c', coverage]) + else: + coverage = None + for pd in sorted(tests.keys()): + pd_cvg = [] + for tclist in tests[pd]: + for tc in tclist: + args = cmd[:] + if DEBUG > 1: + args.append('-d') + # Set up PD stack for this test. + for spd in tc['pdlist']: + args.extend(['-P', spd['name']]) + for label, channel in spd['channels']: + args.extend(['-p', "%s=%d" % (label, channel)]) + for option, value in spd['options']: + args.extend(['-o', "%s=%s" % (option, value)]) + args.extend(['-i', os.path.join(dumps_dir, tc['input'])]) + for op in tc['output']: + name = "%s/%s/%s" % (pd, tc['name'], op['type']) + opargs = ['-O', "%s:%s" % (op['pd'], op['type'])] + if 'class' in op: + opargs[-1] += ":%s" % op['class'] + name += "/%s" % op['class'] + if VERBOSE: + dots = '.' * (60 - len(name) - 2) + INFO("%s %s " % (name, dots), end='') + results.append({ + 'testcase': name, + }) + try: + fd, outfile = mkstemp() + os.close(fd) + opargs.extend(['-f', outfile]) + DBG("Running %s" % (' '.join(args + opargs))) + p = Popen(args + opargs, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + if stdout: + # statistics and coverage data on stdout + results[-1].update(parse_stats(stdout.decode('utf-8'))) + if stderr: + results[-1]['error'] = stderr.decode('utf-8').strip() + errors += 1 + elif p.returncode != 0: + # runtc indicated an error, but didn't output a + # message on stderr about it + results[-1]['error'] = "Unknown error: runtc %d" % p.returncode + if 'error' not in results[-1]: + matchfile = os.path.join(tests_dir, op['pd'], op['match']) + DBG("Comparing with %s" % matchfile) + try: + diff = diff_error = None + if op['type'] in ('annotation', 'python'): + diff = diff_text(matchfile, outfile) + elif op['type'] == 'binary': + diff = compare_binary(matchfile, outfile) + else: + diff = ["Unsupported output type '%s'." % op['type']] + except Exception as e: + diff_error = e + if fix: + if diff or diff_error: + copy(outfile, matchfile) + DBG("Wrote %s" % matchfile) + else: + if diff: + results[-1]['diff'] = diff + elif diff_error is not None: + raise diff_error + except Exception as e: + results[-1]['error'] = str(e) + finally: + if coverage: + results[-1]['coverage_report'] = coverage + os.unlink(outfile) + if op['type'] == 'exception' and 'error' in results[-1]: + # filter out the exception we were looking for + reg = "^Error: srd: %s:" % op['match'] + if re.match(reg, results[-1]['error']): + # found it, not an error + results[-1].pop('error') + if VERBOSE: + if 'diff' in results[-1]: + INFO("Output mismatch") + elif 'error' in results[-1]: + error = results[-1]['error'] + if len(error) > 20: + error = error[:17] + '...' + INFO(error) + elif 'coverage' in results[-1]: + # report coverage of this PD + for record in results[-1]['coverage']: + # but not others used in the stack + # as part of the test. + if record['scope'] == pd: + INFO(record['coverage']) + break + else: + INFO("OK") + gen_report(results[-1]) + if coverage: + os.unlink(coverage) + # only keep track of coverage records for this PD, + # not others in the stack just used for testing. + for cvg in results[-1]['coverage']: + if cvg['scope'] == pd: + pd_cvg.append(cvg) + if opt_coverage and len(pd_cvg) > 1: + # report total coverage of this PD, across all the tests + # that were done on it. + total_lines, missed_lines = coverage_sum(pd_cvg) + pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100) + if VERBOSE: + dots = '.' * (54 - len(pd) - 2) + INFO("%s total %s %d%%" % (pd, dots, pd_coverage)) + if report_dir: + # generate a missing lines list across all the files in + # the PD + files = {} + for entry in missed_lines: + filename, line = entry.split(':') + if filename not in files: + files[filename] = [] + files[filename].append(line) + text = '' + for filename in sorted(files.keys()): + line_list = ','.join(sorted(files[filename], key=int)) + text += "%s: %s\n" % (filename, line_list) + open(os.path.join(report_dir, pd + "_total"), 'w').write(text) + + + return results, errors + + +def gen_report(result): + out = [] + if 'error' in result: + out.append("Error:") + out.append(result['error']) + out.append('') + if 'diff' in result: + out.append("Test output mismatch:") + out.extend(result['diff']) + out.append('') + if 'coverage_report' in result: + out.append(open(result['coverage_report'], 'r').read()) + out.append('') + + if out: + text = "Testcase: %s\n" % result['testcase'] + text += '\n'.join(out) + else: + return + + if report_dir: + filename = result['testcase'].replace('/', '_') + open(os.path.join(report_dir, filename), 'w').write(text) + else: + print(text) + + +def show_tests(tests): + for pd in sorted(tests.keys()): + for tclist in tests[pd]: + for tc in tclist: + print("Testcase: %s/%s" % (tc['pd'], tc['name'])) + for pd in tc['pdlist']: + print(" Protocol decoder: %s" % pd['name']) + for label, channel in pd['channels']: + print(" Channel %s=%d" % (label, channel)) + for option, value in pd['options']: + print(" Option %s=%d" % (option, value)) + if 'stack' in tc: + print(" Stack: %s" % ' '.join(tc['stack'])) + print(" Input: %s" % tc['input']) + for op in tc['output']: + print(" Output:\n Protocol decoder: %s" % op['pd']) + print(" Type: %s" % op['type']) + if 'class' in op: + print(" Class: %s" % op['class']) + print(" Match: %s" % op['match']) + print() + + +def list_tests(tests): + for pd in sorted(tests.keys()): + for tclist in tests[pd]: + for tc in tclist: + for op in tc['output']: + line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type']) + if 'class' in op: + line += "/%s" % op['class'] + print(line) + + +# +# main +# + +# project root +runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0])) +base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir)) +dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps')) +tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test')) + +if len(sys.argv) == 1: + usage() + +opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False +report_dir = None +opts, args = getopt(sys.argv[1:], "dvarslfcR:S:") +for opt, arg in opts: + if opt == '-d': + DEBUG += 1 + if opt == '-v': + VERBOSE = True + elif opt == '-a': + opt_all = True + elif opt == '-r': + opt_run = True + elif opt == '-s': + opt_show = True + elif opt == '-l': + opt_list = True + elif opt == '-f': + opt_fix = True + elif opt == '-c': + opt_coverage = True + elif opt == '-R': + report_dir = arg + elif opt == '-S': + dumps_dir = arg + +if opt_run and opt_show: + usage("Use either -s or -r, not both.") +if args and opt_all: + usage("Specify either -a or tests, not both.") +if report_dir is not None and not os.path.isdir(report_dir): + usage("%s is not a directory" % report_dir) + +ret = 0 +try: + if args: + testlist = get_tests(args) + elif opt_all: + testlist = get_tests(os.listdir(tests_dir)) + else: + usage("Specify either -a or tests.") + + if opt_run: + if not os.path.isdir(dumps_dir): + ERR("Could not find sigrok-dumps repository at %s" % dumps_dir) + sys.exit(1) + results, errors = run_tests(testlist, fix=opt_fix) + ret = errors + elif opt_show: + show_tests(testlist) + elif opt_list: + list_tests(testlist) + elif opt_fix: + run_tests(testlist, fix=True) + else: + usage() +except Exception as e: + print("Error: %s" % str(e)) + if DEBUG: + raise + +sys.exit(ret) + -- cgit v1.2.3-54-g00ecf