| #!/usr/bin/env python3 |
| # |
| # Copyright 2018 The Hafnium Authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Script which drives invocation of tests and parsing their output to produce |
| a results report. |
| """ |
| |
| from __future__ import print_function |
| |
| import xml.etree.ElementTree as ET |
| |
| import argparse |
| import collections |
| import datetime |
| import importlib |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| |
| HFTEST_LOG_PREFIX = "[hftest] " |
| HFTEST_LOG_FAILURE_PREFIX = "Failure:" |
| HFTEST_LOG_FINISHED = "FINISHED" |
| |
| HFTEST_CTRL_GET_COMMAND_LINE = "[hftest_ctrl:get_command_line]" |
| HFTEST_CTRL_FINISHED = "[hftest_ctrl:finished]" |
| |
| HF_ROOT = os.path.dirname(os.path.dirname(os.path.dirname( |
| os.path.abspath(__file__)))) |
| DTC_SCRIPT = os.path.join(HF_ROOT, "build", "image", "dtc.py") |
| FVP_BINARY = os.path.join( |
| os.path.dirname(HF_ROOT), "fvp", "Base_RevC_AEMv8A_pkg", "models", |
| "Linux64_GCC-4.9", "FVP_Base_RevC-2xAEMv8A") |
| FVP_PREBUILTS_ROOT = os.path.join( |
| HF_ROOT, "prebuilts", "linux-aarch64", "arm-trusted-firmware", "fvp") |
| FVP_PREBUILT_DTS = os.path.join( |
| FVP_PREBUILTS_ROOT, "fvp-base-gicv3-psci-1t.dts") |
| FVP_PREBUILT_BL31 = os.path.join(FVP_PREBUILTS_ROOT, "bl31.bin") |
| |
| def read_file(path): |
| with open(path, "r") as f: |
| return f.read() |
| |
| def write_file(path, to_write, append=False): |
| with open(path, "a" if append else "w") as f: |
| f.write(to_write) |
| |
| def append_file(path, to_write): |
| write_file(path, to_write, append=True) |
| |
| def join_if_not_None(*args): |
| return " ".join(filter(lambda x: x, args)) |
| |
| class ArtifactsManager: |
| """Class which manages folder with test artifacts.""" |
| |
| def __init__(self, log_dir): |
| self.created_files = [] |
| self.log_dir = log_dir |
| |
| # Create directory. |
| try: |
| os.makedirs(self.log_dir) |
| except OSError: |
| if not os.path.isdir(self.log_dir): |
| raise |
| print("Logs saved under", log_dir) |
| |
| # Create files expected by the Sponge test result parser. |
| self.sponge_log_path = self.create_file("sponge_log", ".log") |
| self.sponge_xml_path = self.create_file("sponge_log", ".xml") |
| |
| def gen_file_path(self, basename, extension): |
| """Generate path to a file in the log directory.""" |
| return os.path.join(self.log_dir, basename + extension) |
| |
| def create_file(self, basename, extension): |
| """Create and touch a new file in the log folder. Ensure that no other |
| file of the same name was created by this instance of ArtifactsManager. |
| """ |
| # Determine the path of the file. |
| path = self.gen_file_path(basename, extension) |
| |
| # Check that the path is unique. |
| assert(path not in self.created_files) |
| self.created_files += [ path ] |
| |
| # Touch file. |
| with open(path, "w") as f: |
| pass |
| |
| return path |
| |
| def get_file(self, basename, extension): |
| """Return path to a file in the log folder. Assert that it was created |
| by this instance of ArtifactsManager.""" |
| path = self.gen_file_path(basename, extension) |
| assert(path in self.created_files) |
| return path |
| |
| |
| # Tuple holding the arguments common to all driver constructors. |
| # This is to avoid having to pass arguments from subclasses to superclasses. |
| DriverArgs = collections.namedtuple("DriverArgs", [ |
| "artifacts", |
| "kernel", |
| "initrd", |
| "vm_args", |
| "cpu", |
| ]) |
| |
| |
| # State shared between the common Driver class and its subclasses during |
| # a single invocation of the target platform. |
| class DriverRunState: |
| def __init__(self, log_path): |
| self.log_path = log_path |
| self.ret_code = 0 |
| |
| def set_ret_code(self, ret_code): |
| self.ret_code = ret_code |
| |
| class DriverRunException(Exception): |
| """Exception thrown if subprocess invoked by a driver returned non-zero |
| status code. Used to fast-exit from a driver command sequence.""" |
| pass |
| |
| |
| class Driver: |
| """Parent class of drivers for all testable platforms.""" |
| |
| def __init__(self, args): |
| self.args = args |
| |
| def get_run_log(self, run_name): |
| """Return path to the main log of a given test run.""" |
| return self.args.artifacts.get_file(run_name, ".log") |
| |
| def start_run(self, run_name): |
| """Hook called by Driver subclasses before they invoke the target |
| platform.""" |
| return DriverRunState(self.args.artifacts.create_file(run_name, ".log")) |
| |
| def exec_logged(self, run_state, exec_args, cwd=None): |
| """Run a subprocess on behalf of a Driver subclass and append its |
| stdout and stderr to the main log.""" |
| assert(run_state.ret_code == 0) |
| with open(run_state.log_path, "a") as f: |
| f.write("$ {}\r\n".format(" ".join(exec_args))) |
| f.flush() |
| ret_code = subprocess.call(exec_args, stdout=f, stderr=f, cwd=cwd) |
| if ret_code != 0: |
| run_state.set_ret_code(ret_code) |
| raise DriverRunException() |
| |
| def finish_run(self, run_state): |
| """Hook called by Driver subclasses after they finished running the |
| target platform. `ret_code` argument is the return code of the main |
| command run by the driver. A corresponding log message is printed.""" |
| # Decode return code and add a message to the log. |
| with open(run_state.log_path, "a") as f: |
| if run_state.ret_code == 124: |
| f.write("\r\n{}{} timed out\r\n".format( |
| HFTEST_LOG_PREFIX, HFTEST_LOG_FAILURE_PREFIX)) |
| elif run_state.ret_code != 0: |
| f.write("\r\n{}{} process return code {}\r\n".format( |
| HFTEST_LOG_PREFIX, HFTEST_LOG_FAILURE_PREFIX, |
| run_state.ret_code)) |
| |
| # Append log of this run to full test log. |
| log_content = read_file(run_state.log_path) |
| append_file( |
| self.args.artifacts.sponge_log_path, |
| log_content + "\r\n\r\n") |
| return log_content |
| |
| |
| class QemuDriver(Driver): |
| """Driver which runs tests in QEMU.""" |
| |
| def __init__(self, args, qemu_wd, tfa): |
| Driver.__init__(self, args) |
| self.qemu_wd = qemu_wd |
| self.tfa = tfa |
| |
| def gen_exec_args(self, test_args, is_long_running): |
| """Generate command line arguments for QEMU.""" |
| time_limit = "120s" if is_long_running else "10s" |
| # If no CPU configuration is selected, then test against the maximum |
| # configuration, "max", supported by QEMU. |
| cpu = self.args.cpu or "max" |
| exec_args = [ |
| "timeout", "--foreground", time_limit, |
| os.path.abspath("prebuilts/linux-x64/qemu/qemu-system-aarch64"), |
| "-machine", "virt,virtualization=on,gic_version=3", |
| "-cpu", cpu, "-smp", "4", "-m", "1G", |
| "-nographic", "-nodefaults", "-serial", "stdio", |
| "-d", "unimp", "-kernel", os.path.abspath(self.args.kernel), |
| ] |
| |
| if self.tfa: |
| exec_args += ["-bios", |
| os.path.abspath( |
| "prebuilts/linux-aarch64/arm-trusted-firmware/qemu/bl1.bin" |
| ), "-machine", "secure=on", "-semihosting-config", |
| "enable,target=native"] |
| |
| if self.args.initrd: |
| exec_args += ["-initrd", os.path.abspath(self.args.initrd)] |
| |
| vm_args = join_if_not_None(self.args.vm_args, test_args) |
| if vm_args: |
| exec_args += ["-append", vm_args] |
| |
| return exec_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| """Run test given by `test_args` in QEMU.""" |
| run_state = self.start_run(run_name) |
| |
| try: |
| # Execute test in QEMU.. |
| exec_args = self.gen_exec_args(test_args, is_long_running) |
| self.exec_logged(run_state, exec_args, |
| cwd=self.qemu_wd) |
| except DriverRunException: |
| pass |
| |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| pass |
| |
| |
| class FvpDriver(Driver): |
| """Driver which runs tests in Arm FVP emulator.""" |
| |
| def __init__(self, args): |
| if args.cpu: |
| raise ValueError("FVP emulator does not support the --cpu option.") |
| Driver.__init__(self, args) |
| |
| def gen_dts(self, dts_path, test_args, initrd_start, initrd_end): |
| """Create a DeviceTree source which will be compiled into a DTB and |
| passed to FVP for a test run.""" |
| vm_args = join_if_not_None(self.args.vm_args, test_args) |
| write_file(dts_path, read_file(FVP_PREBUILT_DTS)) |
| append_file(dts_path, """ |
| / {{ |
| chosen {{ |
| bootargs = "{}"; |
| stdout-path = "serial0:115200n8"; |
| linux,initrd-start = <{}>; |
| linux,initrd-end = <{}>; |
| }}; |
| }}; |
| """.format(vm_args, initrd_start, initrd_end)) |
| |
| def gen_fvp_args( |
| self, is_long_running, initrd_start, uart0_log_path, uart1_log_path, |
| dtb_path): |
| """Generate command line arguments for FVP.""" |
| time_limit = "80s" if is_long_running else "40s" |
| fvp_args = [ |
| "timeout", "--foreground", time_limit, |
| FVP_BINARY, |
| "-C", "pctl.startup=0.0.0.0", |
| "-C", "bp.secure_memory=0", |
| "-C", "cluster0.NUM_CORES=4", |
| "-C", "cluster1.NUM_CORES=4", |
| "-C", "cache_state_modelled=0", |
| "-C", "bp.vis.disable_visualisation=true", |
| "-C", "bp.vis.rate_limit-enable=false", |
| "-C", "bp.terminal_0.start_telnet=false", |
| "-C", "bp.terminal_1.start_telnet=false", |
| "-C", "bp.terminal_2.start_telnet=false", |
| "-C", "bp.terminal_3.start_telnet=false", |
| "-C", "bp.pl011_uart0.untimed_fifos=1", |
| "-C", "bp.pl011_uart0.unbuffered_output=1", |
| "-C", "bp.pl011_uart0.out_file=" + uart0_log_path, |
| "-C", "bp.pl011_uart1.out_file=" + uart1_log_path, |
| "-C", "cluster0.cpu0.RVBAR=0x04020000", |
| "-C", "cluster0.cpu1.RVBAR=0x04020000", |
| "-C", "cluster0.cpu2.RVBAR=0x04020000", |
| "-C", "cluster0.cpu3.RVBAR=0x04020000", |
| "-C", "cluster1.cpu0.RVBAR=0x04020000", |
| "-C", "cluster1.cpu1.RVBAR=0x04020000", |
| "-C", "cluster1.cpu2.RVBAR=0x04020000", |
| "-C", "cluster1.cpu3.RVBAR=0x04020000", |
| "--data", "cluster0.cpu0=" + FVP_PREBUILT_BL31 + "@0x04020000", |
| "--data", "cluster0.cpu0=" + dtb_path + "@0x82000000", |
| "--data", "cluster0.cpu0=" + self.args.kernel + "@0x80000000", |
| "-C", "bp.ve_sysregs.mmbSiteDefault=0", |
| "-C", "bp.ve_sysregs.exit_on_shutdown=1", |
| ] |
| |
| if self.args.initrd: |
| fvp_args += [ |
| "--data", |
| "cluster0.cpu0={}@{}".format( |
| self.args.initrd, hex(initrd_start)) |
| ] |
| |
| return fvp_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| run_state = self.start_run(run_name) |
| |
| dts_path = self.args.artifacts.create_file(run_name, ".dts") |
| dtb_path = self.args.artifacts.create_file(run_name, ".dtb") |
| uart0_log_path = self.args.artifacts.create_file(run_name, ".uart0.log") |
| uart1_log_path = self.args.artifacts.create_file(run_name, ".uart1.log") |
| |
| initrd_start = 0x84000000 |
| if self.args.initrd: |
| initrd_end = initrd_start + os.path.getsize(self.args.initrd) |
| else: |
| initrd_end = 0x85000000 # Default value |
| |
| try: |
| # Create a DT to pass to FVP. |
| self.gen_dts(dts_path, test_args, initrd_start, initrd_end) |
| |
| # Compile DTS to DTB. |
| dtc_args = [ |
| DTC_SCRIPT, "compile", "-i", dts_path, "-o", dtb_path, |
| ] |
| self.exec_logged(run_state, dtc_args) |
| |
| # Run FVP. |
| fvp_args = self.gen_fvp_args( |
| is_long_running, initrd_start, uart0_log_path, uart1_log_path, |
| dtb_path) |
| self.exec_logged(run_state, fvp_args) |
| except DriverRunException: |
| pass |
| |
| # Append UART0 output to main log. |
| append_file(run_state.log_path, read_file(uart0_log_path)) |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| pass |
| |
| |
| class SerialDriver(Driver): |
| """Driver which communicates with a device over the serial port.""" |
| |
| def __init__(self, args, tty_file, baudrate, init_wait): |
| Driver.__init__(self, args) |
| self.tty_file = tty_file |
| self.baudrate = baudrate |
| self.pyserial = importlib.import_module("serial") |
| |
| if init_wait: |
| input("Press ENTER and then reset the device...") |
| |
| def connect(self): |
| return self.pyserial.Serial(self.tty_file, self.baudrate, timeout=10) |
| |
| def run(self, run_name, test_args, is_long_running): |
| """Communicate `test_args` to the device over the serial port.""" |
| run_state = self.start_run(run_name) |
| |
| with self.connect() as ser: |
| with open(run_state.log_path, "a") as f: |
| while True: |
| # Read one line from the serial port. |
| line = ser.readline().decode('utf-8') |
| if len(line) == 0: |
| # Timeout |
| run_state.set_ret_code(124) |
| input("Timeout. " + |
| "Press ENTER and then reset the device...") |
| break |
| # Write the line to the log file. |
| f.write(line) |
| if HFTEST_CTRL_GET_COMMAND_LINE in line: |
| # Device is waiting for `test_args`. |
| ser.write(test_args.encode('ascii')) |
| ser.write(b'\r') |
| elif HFTEST_CTRL_FINISHED in line: |
| # Device has finished running this test and will reboot. |
| break |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| with self.connect() as ser: |
| while True: |
| line = ser.readline().decode('utf-8') |
| if len(line) == 0: |
| input("Timeout. Press ENTER and then reset the device...") |
| elif HFTEST_CTRL_GET_COMMAND_LINE in line: |
| # Device is waiting for a command. Instruct it to exit |
| # the test environment. |
| ser.write("exit".encode('ascii')) |
| ser.write(b'\r') |
| break |
| |
| |
| # Tuple used to return information about the results of running a set of tests. |
| TestRunnerResult = collections.namedtuple("TestRunnerResult", [ |
| "tests_run", |
| "tests_failed", |
| ]) |
| |
| |
| class TestRunner: |
| """Class which communicates with a test platform to obtain a list of |
| available tests and driving their execution.""" |
| |
| def __init__(self, artifacts, driver, image_name, suite_regex, test_regex, |
| skip_long_running_tests): |
| self.artifacts = artifacts |
| self.driver = driver |
| self.image_name = image_name |
| self.skip_long_running_tests = skip_long_running_tests |
| |
| self.suite_re = re.compile(suite_regex or ".*") |
| self.test_re = re.compile(test_regex or ".*") |
| |
| def extract_hftest_lines(self, raw): |
| """Extract hftest-specific lines from a raw output from an invocation |
| of the test platform.""" |
| lines = [] |
| for line in raw.splitlines(): |
| if line.startswith("VM "): |
| line = line[len("VM 0: "):] |
| if line.startswith(HFTEST_LOG_PREFIX): |
| lines.append(line[len(HFTEST_LOG_PREFIX):]) |
| return lines |
| |
| def get_test_json(self): |
| """Invoke the test platform and request a JSON of available test and |
| test suites.""" |
| out = self.driver.run("json", "json", False) |
| hf_out = "\n".join(self.extract_hftest_lines(out)) |
| try: |
| return json.loads(hf_out) |
| except ValueError as e: |
| print(out) |
| raise e |
| |
| def collect_results(self, fn, it, xml_node): |
| """Run `fn` on every entry in `it` and collect their TestRunnerResults. |
| Insert "tests" and "failures" nodes to `xml_node`.""" |
| tests_run = 0 |
| tests_failed = 0 |
| for i in it: |
| sub_result = fn(i) |
| assert(sub_result.tests_run >= sub_result.tests_failed) |
| tests_run += sub_result.tests_run |
| tests_failed += sub_result.tests_failed |
| |
| xml_node.set("tests", str(tests_run)) |
| xml_node.set("failures", str(tests_failed)) |
| return TestRunnerResult(tests_run, tests_failed) |
| |
| def is_passed_test(self, test_out): |
| """Parse the output of a test and return True if it passed.""" |
| return \ |
| len(test_out) > 0 and \ |
| test_out[-1] == HFTEST_LOG_FINISHED and \ |
| not any(l.startswith(HFTEST_LOG_FAILURE_PREFIX) for l in test_out) |
| |
| def get_log_name(self, suite, test): |
| """Returns a string with a generated log name for the test.""" |
| log_name = "" |
| |
| cpu = self.driver.args.cpu |
| if cpu: |
| log_name += cpu + "." |
| |
| log_name += suite["name"] + "." + test["name"] |
| |
| return log_name |
| |
| def run_test(self, suite, test, suite_xml): |
| """Invoke the test platform and request to run a given `test` in given |
| `suite`. Create a new XML node with results under `suite_xml`. |
| Test only invoked if it matches the regex given to constructor.""" |
| if not self.test_re.match(test["name"]): |
| return TestRunnerResult(tests_run=0, tests_failed=0) |
| |
| if self.skip_long_running_tests and test["is_long_running"]: |
| print(" SKIP", test["name"]) |
| return TestRunnerResult(tests_run=0, tests_failed=0) |
| |
| print(" RUN", test["name"]) |
| log_name = self.get_log_name(suite, test) |
| |
| test_xml = ET.SubElement(suite_xml, "testcase") |
| test_xml.set("name", test["name"]) |
| test_xml.set("classname", suite["name"]) |
| test_xml.set("status", "run") |
| |
| out = self.extract_hftest_lines(self.driver.run( |
| log_name, "run {} {}".format(suite["name"], test["name"]), |
| test["is_long_running"])) |
| |
| if self.is_passed_test(out): |
| print(" PASS") |
| return TestRunnerResult(tests_run=1, tests_failed=0) |
| else: |
| print("[x] FAIL --", self.driver.get_run_log(log_name)) |
| failure_xml = ET.SubElement(test_xml, "failure") |
| # TODO: set a meaningful message and put log in CDATA |
| failure_xml.set("message", "Test failed") |
| return TestRunnerResult(tests_run=1, tests_failed=1) |
| |
| def run_suite(self, suite, xml): |
| """Invoke the test platform and request to run all matching tests in |
| `suite`. Create new XML nodes with results under `xml`. |
| Suite skipped if it does not match the regex given to constructor.""" |
| if not self.suite_re.match(suite["name"]): |
| return TestRunnerResult(tests_run=0, tests_failed=0) |
| |
| print(" SUITE", suite["name"]) |
| suite_xml = ET.SubElement(xml, "testsuite") |
| suite_xml.set("name", suite["name"]) |
| |
| return self.collect_results( |
| lambda test: self.run_test(suite, test, suite_xml), |
| suite["tests"], |
| suite_xml) |
| |
| def run_tests(self): |
| """Run all suites and tests matching regexes given to constructor. |
| Write results to sponge log XML. Return the number of run and failed |
| tests.""" |
| |
| test_spec = self.get_test_json() |
| timestamp = datetime.datetime.now().replace(microsecond=0).isoformat() |
| |
| xml = ET.Element("testsuites") |
| xml.set("name", self.image_name) |
| xml.set("timestamp", timestamp) |
| |
| result = self.collect_results( |
| lambda suite: self.run_suite(suite, xml), |
| test_spec["suites"], |
| xml) |
| |
| # Write XML to file. |
| ET.ElementTree(xml).write(self.artifacts.sponge_xml_path, |
| encoding='utf-8', xml_declaration=True) |
| |
| if result.tests_failed > 0: |
| print("[x] FAIL:", result.tests_failed, "of", result.tests_run, |
| "tests failed") |
| elif result.tests_run > 0: |
| print(" PASS: all", result.tests_run, "tests passed") |
| |
| # Let the driver clean up. |
| self.driver.finish() |
| |
| return result |
| |
| |
| def Main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("image") |
| parser.add_argument("--out", required=True) |
| parser.add_argument("--log", required=True) |
| parser.add_argument("--out_initrd") |
| parser.add_argument("--initrd") |
| parser.add_argument("--suite") |
| parser.add_argument("--test") |
| parser.add_argument("--vm_args") |
| parser.add_argument("--driver", default="qemu") |
| parser.add_argument("--serial-dev", default="/dev/ttyUSB0") |
| parser.add_argument("--serial-baudrate", type=int, default=115200) |
| parser.add_argument("--serial-no-init-wait", action="store_true") |
| parser.add_argument("--skip-long-running-tests", action="store_true") |
| parser.add_argument("--cpu", |
| help="Selects the CPU configuration for the run environment.") |
| parser.add_argument("--tfa", action="store_true") |
| args = parser.parse_args() |
| |
| # Resolve some paths. |
| image = os.path.join(args.out, args.image + ".bin") |
| initrd = None |
| image_name = args.image |
| if args.initrd: |
| initrd_dir = os.path.join(args.out_initrd, "obj", args.initrd) |
| initrd = os.path.join(initrd_dir, "initrd.img") |
| image_name += "_" + args.initrd |
| vm_args = args.vm_args or "" |
| |
| # Create class which will manage all test artifacts. |
| artifacts = ArtifactsManager(os.path.join(args.log, image_name)) |
| |
| # Create a driver for the platform we want to test on. |
| driver_args = DriverArgs(artifacts, image, initrd, vm_args, args.cpu) |
| |
| if args.driver == "qemu": |
| driver = QemuDriver(driver_args, args.out, args.tfa) |
| elif args.driver == "fvp": |
| driver = FvpDriver(driver_args) |
| elif args.driver == "serial": |
| driver = SerialDriver(driver_args, args.serial_dev, |
| args.serial_baudrate, not args.serial_no_init_wait) |
| else: |
| raise Exception("Unknown driver name: {}".format(args.driver)) |
| |
| # Create class which will drive test execution. |
| runner = TestRunner(artifacts, driver, image_name, args.suite, args.test, |
| args.skip_long_running_tests) |
| |
| # Run tests. |
| runner_result = runner.run_tests() |
| |
| # Print error message if no tests were run as this is probably unexpected. |
| # Return suitable error code. |
| if runner_result.tests_run == 0: |
| print("Error: no tests match") |
| return 10 |
| elif runner_result.tests_failed > 0: |
| return 1 |
| else: |
| return 0 |
| |
| if __name__ == "__main__": |
| sys.exit(Main()) |