#! /usr/bin/env python import getopt import os import Queue import re import signal import subprocess import sys import time import threading DEFAULT_COMPILER = 'g++' DEFAULT_PROJ_PATH = '../' DEFAULT_TIME_LIMIT = 300 DEFAULT_WORKER_NUM = 4 LOG_FILES_MAX = 1000 TIME_SLICE = 0.1 WAIT_TIMEOUT = 0.1 TERMINATE_TIMEOUT = 5 class TEST_STATE: COMPILED = 0 COMPILE_ERROR = 1 PASS = 2 FAILED = 3 TIMEOUT = 4 def ProcWaitTimeout(proc, timeout): time_sum = 0 while proc.poll() is None and time_sum < timeout: time.sleep(TIME_SLICE) time_sum += TIME_SLICE return proc.poll() class Testing(object): def __init__(self, path, filename): test_name = filename[:-4] self._test_name = test_name self._source_pathname = path + '/' + filename self._exec_pathname = path + '/' + test_name + '.test' log_pathname = path + '/' + test_name + '.log' if os.path.isfile(log_pathname): num = 2 while os.path.isfile(log_pathname + str(num)): num += 1 log_pathname += str(num) self._log_pathname = log_pathname def Compile(self, compiler): retcode = None try: retcode = subprocess.call(compiler + " -o '%s' " % self._exec_pathname + self._source_pathname + ' >%s' % self._log_pathname + ' 2>&1', shell=True) except: pass if retcode == 0: return TEST_STATE.COMPILED else: return TEST_STATE.COMPILE_ERROR def Test(self, timeout): proc = subprocess.Popen(self._exec_pathname + ' >>%s' % self._log_pathname + ' 2>&1', shell=True, preexec_fn=os.setsid) time_sum = 0 if ProcWaitTimeout(proc, timeout) is None: os.killpg(proc.pid, signal.SIGTERM) if ProcWaitTimeout(proc, TERMINATE_TIMEOUT) is None: os.killpg(proc.pid, signal.SIGKILL) return TEST_STATE.TIMEOUT else: if proc.returncode == 0: return TEST_STATE.PASS else: return TEST_STATE.FAILED @property def test_name(self): return self._test_name @property def log_filename(self): return self._log_pathname class StopTask(object): pass def Worker(queue, compiler, time_limit): while True: try: work = queue.get(True, WAIT_TIMEOUT) except: continue if isinstance(work, StopTask): break ret = work.Compile(compiler) if ret == TEST_STATE.COMPILE_ERROR: Fail(work.test_name, work.log_filename, 'Compile error.') continue ret = work.Test(time_limit) if ret == TEST_STATE.FAILED: Fail(work.test_name, work.log_filename, 'Test failure.') continue elif ret == TEST_STATE.TIMEOUT: Fail(work.test_name, work.log_filename, 'Timeout(%d).' % time_limit) continue else: Pass(work.test_name, work.log_filename) all_pass = True print_lock = threading.Lock() def Pass(test_name, log_filename): with print_lock: print('\033[32m%r >>> Passed (%s)\033[39m' % (test_name, log_filename)) def Fail(test_name, log_filename, reason): all_pass = False with print_lock: print('\033[31m%r >>> Failed for %r (%r)\033[39m' % (test_name, reason, log_filename)) def main(): compiler = DEFAULT_COMPILER proj_path = DEFAULT_PROJ_PATH time_limit = DEFAULT_TIME_LIMIT worker_num = DEFAULT_WORKER_NUM optlist, args = getopt.getopt(sys.argv[1:], 'c:hw:p:t:') for (opt, arg) in optlist: if opt == '-c': compiler = arg elif opt == '-h': print('Not implement yet.') return 1 elif opt == '-p': proj_path = arg elif opt == '-t': time_limit = int(arg) elif opt == '-w': worker_num = int(arg) else: sys.stderr.write('Warning: Unknown argument %r, ignored' % opt) compiler += " -I '%s'" % proj_path print('Compiler & its arguments: %s' % compiler) print('Time limit per test: %r' % time_limit) print('Number of workers: %r' % worker_num) testing_queue = Queue.Queue() workers = [] for i in range(worker_num): worker = threading.Thread(target=Worker, args=(testing_queue, compiler, time_limit)) worker.daemon = True worker.start() workers += [worker] for (path, unused_dirnames, filenames) in os.walk('.'): for filename in filenames: if filename.endswith('.cpp'): testing_queue.put(Testing(path, filename)) for i in range(worker_num): testing_queue.put(StopTask()) for worker in workers: worker.join() return (0 if all_pass else 1) if __name__ == '__main__': exit_code = main() exit(exit_code)