* [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management @ 2018-01-08 2:49 Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 01/16] framework: add external thread pool library Marvin Liu ` (16 more replies) 0 siblings, 17 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts This series of patches will support parallel multiple virtual machine management function. Qemu default initialzation process will be enhanced for parallel start. With new model, efficiency for VMs management can be significantly improved. In my environment, test suite can start and control 2 * 63 VMs in two minutes with this new module. Marvin Liu (16): framework: add external thread pool library framework/multiple_vm: add multiple VM management module framework/utils: support locks function in parallel model framework: add DUT index support framework/logger: optimize output format for child threads framework/dts: support multiple VMs module framework/debugger: support multiple VMs module framework/ssh_pexpect: support multiple VMs module framework/ssh_connection: support DUT index argument framework/settings: add parallel related settings framework/virt_resource: support multiple VMs module framework/virt_base: add attach/quick start/quit function for VM management framework/virt_dut: support multiple VMs module framework/qemu_kvm: support multiple VMs module conf/virt_global: add vm management related configuration doc: add descriptions for multiple virtual machine module conf/virt_global.cfg | 2 + doc/dts_gsg/index.rst | 1 + doc/dts_gsg/multiple_vm.rst | 87 ++++++ doc/dts_gsg/virtualization.rst | 40 +-- extra_libs/threadpool.py | 426 +++++++++++++++++++++++++++ framework/crb.py | 37 +-- framework/debugger.py | 24 +- framework/dts.py | 26 +- framework/dut.py | 46 ++- framework/logger.py | 37 +-- framework/multiple_vm.py | 304 +++++++++++++++++++ framework/project_dpdk.py | 6 +- framework/qemu_kvm.py | 641 ++++++++++++++++++++++++++++++++--------- framework/settings.py | 3 + framework/ssh_connection.py | 6 +- framework/ssh_pexpect.py | 82 ++++-- framework/utils.py | 107 +++++-- framework/virt_base.py | 108 +++++-- framework/virt_dut.py | 76 +++-- framework/virt_resource.py | 86 +++++- 20 files changed, 1797 insertions(+), 348 deletions(-) create mode 100644 doc/dts_gsg/multiple_vm.rst create mode 100644 extra_libs/threadpool.py create mode 100644 framework/multiple_vm.py -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 01/16] framework: add external thread pool library 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu ` (15 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Imported thread pool library as external library. Multiple VM module will create parallel threads based on this library. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/extra_libs/threadpool.py b/extra_libs/threadpool.py new file mode 100644 index 0000000..3839f26 --- /dev/null +++ b/extra_libs/threadpool.py @@ -0,0 +1,426 @@ +# -*- coding: UTF-8 -*- +"""Easy to use object-oriented thread pool framework. + +A thread pool is an object that maintains a pool of worker threads to perform +time consuming operations in parallel. It assigns jobs to the threads +by putting them in a work request queue, where they are picked up by the +next available thread. This then performs the requested operation in the +background and puts the results in another queue. + +The thread pool object can then collect the results from all threads from +this queue as soon as they become available or after all threads have +finished their work. It's also possible, to define callbacks to handle +each result as it comes in. + +The basic concept and some code was taken from the book "Python in a Nutshell, +2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section +14.5 "Threaded Program Architecture". I wrapped the main program logic in the +ThreadPool class, added the WorkRequest class and the callback system and +tweaked the code here and there. Kudos also to Florent Aide for the exception +handling mechanism. + +Basic usage:: + + >>> pool = ThreadPool(poolsize) + >>> requests = makeRequests(some_callable, list_of_args, callback) + >>> [pool.putRequest(req) for req in requests] + >>> pool.wait() + +See the end of the module code for a brief, annotated usage example. + +Website : http://chrisarndt.de/projects/threadpool/ + +""" +__docformat__ = "restructuredtext en" + +__all__ = [ + 'makeRequests', + 'NoResultsPending', + 'NoWorkersAvailable', + 'ThreadPool', + 'WorkRequest', + 'WorkerThread' +] + +__author__ = "Christopher Arndt" +__version__ = '1.3.2' +__license__ = "MIT license" + + +# standard library modules +import sys +import threading +import traceback + +try: + import Queue # Python 2 +except ImportError: + import queue as Queue # Python 3 + + +# exceptions +class NoResultsPending(Exception): + """All work requests have been processed.""" + pass + +class NoWorkersAvailable(Exception): + """No worker threads available to process remaining requests.""" + pass + + +# internal module helper functions +def _handle_thread_exception(request, exc_info): + """Default exception handler callback function. + + This just prints the exception info via ``traceback.print_exception``. + + """ + traceback.print_exception(*exc_info) + + +# utility functions +def makeRequests(callable_, args_list, callback=None, + exc_callback=_handle_thread_exception): + """Create several work requests for same callable with different arguments. + + Convenience function for creating several work requests for the same + callable where each invocation of the callable receives different values + for its arguments. + + ``args_list`` contains the parameters for each invocation of callable. + Each item in ``args_list`` should be either a 2-item tuple of the list of + positional arguments and a dictionary of keyword arguments or a single, + non-tuple argument. + + See docstring for ``WorkRequest`` for info on ``callback`` and + ``exc_callback``. + + """ + requests = [] + for item in args_list: + if isinstance(item, tuple): + requests.append( + WorkRequest(callable_, item[0], item[1], callback=callback, + exc_callback=exc_callback) + ) + else: + requests.append( + WorkRequest(callable_, [item], None, callback=callback, + exc_callback=exc_callback) + ) + return requests + + +# classes +class WorkerThread(threading.Thread): + """Background thread connected to the requests/results queues. + + A worker thread sits in the background and picks up work requests from + one queue and puts the results in another until it is dismissed. + + """ + + def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): + """Set up thread in daemonic mode and start it immediatedly. + + ``requests_queue`` and ``results_queue`` are instances of + ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a + new worker thread. + + """ + threading.Thread.__init__(self, **kwds) + self.setDaemon(1) + self._requests_queue = requests_queue + self._results_queue = results_queue + self._poll_timeout = poll_timeout + self._dismissed = threading.Event() + self.start() + + def run(self): + """Repeatedly process the job queue until told to exit.""" + while True: + if self._dismissed.isSet(): + # we are dismissed, break out of loop + break + # get next work request. If we don't get a new request from the + # queue after self._poll_timout seconds, we jump to the start of + # the while loop again, to give the thread a chance to exit. + try: + request = self._requests_queue.get(True, self._poll_timeout) + except Queue.Empty: + continue + else: + if self._dismissed.isSet(): + # we are dismissed, put back request in queue and exit loop + self._requests_queue.put(request) + break + try: + result = request.callable(*request.args, **request.kwds) + self._results_queue.put((request, result)) + except: + request.exception = True + self._results_queue.put((request, sys.exc_info())) + + def dismiss(self): + """Sets a flag to tell the thread to exit when done with current job. + """ + self._dismissed.set() + + +class WorkRequest: + """A request to execute a callable for putting in the request queue later. + + See the module function ``makeRequests`` for the common case + where you want to build several ``WorkRequest`` objects for the same + callable but with different arguments for each call. + + """ + + def __init__(self, callable_, args=None, kwds=None, requestID=None, + callback=None, exc_callback=_handle_thread_exception): + """Create a work request for a callable and attach callbacks. + + A work request consists of the a callable to be executed by a + worker thread, a list of positional arguments, a dictionary + of keyword arguments. + + A ``callback`` function can be specified, that is called when the + results of the request are picked up from the result queue. It must + accept two anonymous arguments, the ``WorkRequest`` object and the + results of the callable, in that order. If you want to pass additional + information to the callback, just stick it on the request object. + + You can also give custom callback for when an exception occurs with + the ``exc_callback`` keyword parameter. It should also accept two + anonymous arguments, the ``WorkRequest`` and a tuple with the exception + details as returned by ``sys.exc_info()``. The default implementation + of this callback just prints the exception info via + ``traceback.print_exception``. If you want no exception handler + callback, just pass in ``None``. + + ``requestID``, if given, must be hashable since it is used by + ``ThreadPool`` object to store the results of that work request in a + dictionary. It defaults to the return value of ``id(self)``. + + """ + if requestID is None: + self.requestID = id(self) + else: + try: + self.requestID = hash(requestID) + except TypeError: + raise TypeError("requestID must be hashable.") + self.exception = False + self.callback = callback + self.exc_callback = exc_callback + self.callable = callable_ + self.args = args or [] + self.kwds = kwds or {} + + def __str__(self): + return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ + (self.requestID, self.args, self.kwds, self.exception) + +class ThreadPool: + """A thread pool, distributing work requests and collecting results. + + See the module docstring for more information. + + """ + + def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): + """Set up the thread pool and start num_workers worker threads. + + ``num_workers`` is the number of worker threads to start initially. + + If ``q_size > 0`` the size of the work *request queue* is limited and + the thread pool blocks when the queue is full and it tries to put + more work requests in it (see ``putRequest`` method), unless you also + use a positive ``timeout`` value for ``putRequest``. + + If ``resq_size > 0`` the size of the *results queue* is limited and the + worker threads will block when the queue is full and they try to put + new results in it. + + .. warning: + If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is + the possibilty of a deadlock, when the results queue is not pulled + regularly and too many jobs are put in the work requests queue. + To prevent this, always set ``timeout > 0`` when calling + ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. + + """ + self._requests_queue = Queue.Queue(q_size) + self._results_queue = Queue.Queue(resq_size) + self.workers = [] + self.dismissedWorkers = [] + self.workRequests = {} + self.createWorkers(num_workers, poll_timeout) + + def createWorkers(self, num_workers, poll_timeout=5): + """Add num_workers worker threads to the pool. + + ``poll_timout`` sets the interval in seconds (int or float) for how + ofte threads should check whether they are dismissed, while waiting for + requests. + + """ + for i in range(num_workers): + self.workers.append(WorkerThread(self._requests_queue, + self._results_queue, poll_timeout=poll_timeout)) + + def dismissWorkers(self, num_workers, do_join=False): + """Tell num_workers worker threads to quit after their current task.""" + dismiss_list = [] + for i in range(min(num_workers, len(self.workers))): + worker = self.workers.pop() + worker.dismiss() + dismiss_list.append(worker) + + if do_join: + for worker in dismiss_list: + worker.join() + else: + self.dismissedWorkers.extend(dismiss_list) + + def joinAllDismissedWorkers(self): + """Perform Thread.join() on all worker threads that have been dismissed. + """ + for worker in self.dismissedWorkers: + worker.join() + self.dismissedWorkers = [] + + def putRequest(self, request, block=True, timeout=None): + """Put work request into work queue and save its id for later.""" + assert isinstance(request, WorkRequest) + # don't reuse old work requests + assert not getattr(request, 'exception', None) + self._requests_queue.put(request, block, timeout) + self.workRequests[request.requestID] = request + + def poll(self, block=False): + """Process any new results in the queue.""" + while True: + # still results pending? + if not self.workRequests: + raise NoResultsPending + # are there still workers to process remaining requests? + elif block and not self.workers: + raise NoWorkersAvailable + try: + # get back next results + request, result = self._results_queue.get(block=block) + # has an exception occured? + if request.exception and request.exc_callback: + request.exc_callback(request, result) + # hand results to callback, if any + if request.callback and not \ + (request.exception and request.exc_callback): + request.callback(request, result) + del self.workRequests[request.requestID] + except Queue.Empty: + break + except Exception as e: + traceback.print_exception(*sys.exc_info()) + # unexpected thing happened, need further dedbugging + import pdb + pdb.set_trace() + + def wait(self): + """Wait for results, blocking until all have arrived.""" + while 1: + try: + self.poll(True) + except NoResultsPending: + break + + +################ +# USAGE EXAMPLE +################ + +if __name__ == '__main__': + import random + import time + + # the work the threads will have to do (rather trivial in our example) + def do_something(data): + time.sleep(random.randint(1,5)) + result = round(random.random() * data, 5) + # just to show off, we throw an exception once in a while + if result > 5: + raise RuntimeError("Something extraordinary happened!") + return result + + # this will be called each time a result is available + def print_result(request, result): + print("**** Result from request #%s: %r" % (request.requestID, result)) + + # this will be called when an exception occurs within a thread + # this example exception handler does little more than the default handler + def handle_exception(request, exc_info): + if not isinstance(exc_info, tuple): + # Something is seriously wrong... + print(request) + print(exc_info) + raise SystemExit + print("**** Exception occured in request #%s: %s" % \ + (request.requestID, exc_info)) + + # assemble the arguments for each job to a list... + data = [random.randint(1,10) for i in range(20)] + # ... and build a WorkRequest object for each item in data + requests = makeRequests(do_something, data, print_result, handle_exception) + # to use the default exception handler, uncomment next line and comment out + # the preceding one. + #requests = makeRequests(do_something, data, print_result) + + # or the other form of args_lists accepted by makeRequests: ((,), {}) + data = [((random.randint(1,10),), {}) for i in range(20)] + requests.extend( + makeRequests(do_something, data, print_result, handle_exception) + #makeRequests(do_something, data, print_result) + # to use the default exception handler, uncomment next line and comment + # out the preceding one. + ) + + # we create a pool of 3 worker threads + print("Creating thread pool with 3 worker threads.") + main = ThreadPool(3) + + # then we put the work requests in the queue... + for req in requests: + main.putRequest(req) + print("Work request #%s added." % req.requestID) + # or shorter: + # [main.putRequest(req) for req in requests] + + # ...and wait for the results to arrive in the result queue + # by using ThreadPool.wait(). This would block until results for + # all work requests have arrived: + # main.wait() + + # instead we can poll for results while doing something else: + i = 0 + while True: + try: + time.sleep(0.5) + main.poll() + print("Main thread working...") + print("(active worker threads: %i)" % (threading.activeCount()-1, )) + if i == 10: + print("**** Adding 3 more worker threads...") + main.createWorkers(3) + if i == 20: + print("**** Dismissing 2 worker threads...") + main.dismissWorkers(2) + i += 1 + except KeyboardInterrupt: + print("**** Interrupted!") + break + except NoResultsPending: + print("**** No pending results.") + break + if main.dismissedWorkers: + print("Joining all dismissed worker threads...") + main.joinAllDismissedWorkers() -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 02/16] framework/multiple_vm: add multiple VM management module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 01/16] framework: add external thread pool library Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 03/16] framework/utils: support locks function in parallel model Marvin Liu ` (14 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Support parallel VMs start and executions in multiple threads. In the init function, will create thread pool instance. With add parallel task function, command 'start'|'cmd'|'migration'|'stop' related tasks will be added into task pool. If call function do_parallel_task, all tasks will be retrieved and do in parallel threads. If that function return, mean that all threads have been done and results saved. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/multiple_vm.py b/framework/multiple_vm.py new file mode 100644 index 0000000..04de6e4 --- /dev/null +++ b/framework/multiple_vm.py @@ -0,0 +1,304 @@ +#!/usr/bin/python +import time +import re +import threadpool +import traceback +import threading +from settings import DTS_ERR_TBL, save_global_setting, DTS_PARALLEL_SETTING +from utils import RED +from logger import getLogger + + +class MultipleVM(object): + """ + Module for handle VM related actions in parallel on mulitple DUTs + Supported actions: [start|command|migration] + Param max_vm: maximum number of threads + Param duts: list of DUT objects + """ + def __init__(self, max_vm, duts): + self.max_vm = max_vm + self.duts = duts + self.pool = threadpool.ThreadPool(max_vm) + self.pool_result = [dict() for _ in duts] + self._pool_requests = list() + self._pool_executors = dict() + self.logger = getLogger("multiple_vm") + + self.logger.info("Created MultipleVM instance with %d DUTs and %d VMs" % (len(duts), max_vm)) + + def parallel_vm_start(self, args): + """ + Start VMs in parallel. + Args format: + { + 'name': 'VM0', + 'dut_id': 1, + 'autodetect_topo': False, + 'virt_config': { 'suite_name': '', + 'vm_name': '', + } + 'virt_params' : { + 'qemu': [{'path': '/usr/local/bin/qemu-system-x86_64'}, + 'cpu': [{'model': 'host', 'number': '1'}, + 'mem': [{'size': '1024', 'hugepage': 'yes'}, + 'disk': [{'file': 'vm0.qcow2'}, + 'login': [{'user': 'root', 'password': 'root'}, + 'vnc' : [{'displayNum': '2'}, + 'device': [{'driver': 'vfio-pci', 'opt_host': '0000:82:00.1'}] + [{'driver': 'vhost-user', 'opt_path': '/tmp/vhost-user-0', + 'opt_mac': '', + 'opt_legacy': 'on' | 'off'}], + 'migration': [{'enable': 'yes'}] + } + + 'command': '' + } + + return format: + { + 'name': 'VM0', + 'dut_id' : 1, + 'vm_obj': vm_obj + } + """ + + result = {} + vm_name = args['name'] + dut_id = args['dut_id'] + + if 'autodetect_topo' in args: + autodetect_topo = args['autodetect_topo'] + else: + autodetect_topo = True + + self.logger.info("Parallel task start for DUT%d %s" % (dut_id, vm_name)) + threading.current_thread().name = vm_name + + from qemu_kvm import QEMUKvm + # VM configured by configuration file + if 'virt_config' in args: + suite_name = args['virt_config']['suite_name'] + vm_name = args['virt_config']['vm_name'] + vm_obj = QEMUKvm(self.duts[dut_id], vm_name, suite_name) + if 'virt_params' in args: + virt_params = args['virt_params'] + else: + virt_params = dict() + else: + # VM configured by parameters + vm_obj = QEMUKvm(self.duts[dut_id], vm_name, 'multi_vm') + virt_params = args['virt_params'] + # just save config, should be list + vm_obj.set_local_config([virt_params]) + + vm_dut = None + + if vm_obj.check_alive(): + self.logger.debug("Check VM[%s] is alive" % vm_name) + vm_obj.attach() + self.logger.debug("VM[%s] attach is done" % vm_name) + if 'migration' in virt_params: + self.logger.debug("Immigrated VM[%s] is ready" % vm_name) + else: + vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo) + self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name) + else: + vm_obj.quick_start() + self.duts[dut_id].logger.debug("VM[%s] quick start is done" % vm_name) + if 'migration' in virt_params: + self.logger.debug("Immigrated VM[%s] is ready" % vm_name) + else: + vm_obj._check_vm_status() + self.logger.debug("VM[%s] check status is done" % vm_name) + vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo) + self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name) + + result['name'] = vm_name + result['dut_id'] = dut_id + result['vm_obj'] = vm_obj + result['vm_dut'] = vm_dut + self.logger.info("Parallel task DUT%d %s Done and returned" % (dut_id, vm_name)) + return result + + def parallel_vm_stop(self, args): + NotImplemented + + def parallel_vm_command(self, args): + """ + Run commands in parallel. + Args format: + { + 'name': 'vm1', + 'vm_dut': self.vm_dut, + 'dut_id': 0, + 'commands': ['cd dpdk', 'make install T=x86_64-native-linuxapp-gcc'], + 'expects': ['#', "#"], + 'timeouts': [5, 120], + } + """ + result = {} + vm_name = args['name'] + vm_dut = args['vm_dut'] + dut_id = args['dut_id'] + commands = args['commands'] + expects = args['expects'] + timeouts = args['timeouts'] + outputs = [] + + if 'delay' in args: + time.sleep(args['delay']) + + self.logger.debug("Parallel task start for DUT%d %s" % (dut_id, vm_name)) + + combinations = zip(commands, expects, timeouts) + for combine in combinations: + command, expect, timeout = combine + # timeout value need enlarge if vm number increased + add_time = int(self.max_vm * 0.5) + timeout += add_time + if len(expect) == 0: + output = vm_dut.send_command(command, timeout) + else: + output = vm_dut.send_expect(command, expect, timeout) + outputs.append(output) + + result['name'] = vm_name + result['dut_id'] = dut_id + result['outputs'] = outputs + self.logger.debug("Parallel task for DUT%d %s has been done and returned" % (dut_id, vm_name)) + + return result + + def parallel_vm_migration(self, args): + """ + Do vm migartion action in parallel. + Args format: + { + 'name': 'vm1', + 'vm_obj': self.vm_obj, + 'remote_ip': host2_ip, + 'migrage_port': 6666, + } + """ + result = {} + vm_name = args['name'] + vm_obj = args['vm_obj'] + dut_id = args['dut_id'] + remote_ip = args['remote_ip'] + migrate_port = args['migrate_port'] + + vm_obj.start_migration(remote_ip, migrate_port) + vm_obj.wait_migration_done() + + result['name'] = vm_name + result['dut_id'] = dut_id + + return result + + def save_result(self, request, result): + """ + Save result in local variable, will be used later + """ + self.pool_result[result['dut_id']][result['name']] = result + self.pool_result[result['dut_id']][result['name']]['status'] = 0 + + def handle_vm_exception(self, request, exc_info): + """ + Handle exception when do parallel task + should check vm status in this function + """ + if not isinstance(exc_info, tuple): + # Something is seriously wrong... + print(request) + print(exc_info) + raise SystemExit + + # print traceback info for exception + name = request.args[0]['name'] + self.logger.error(("**** Exception occured DUT%d:%s" % (request.args[0]['dut_id'], name))) + exc_type, exc_value, exc_traceback = exc_info + self.logger.error(repr(traceback.format_tb(exc_traceback))) + + result = {'name': name, 'dut_id': request.args[0]['dut_id']} + self.pool_result[result['dut_id']][result['name']] = result + self.pool_result[result['dut_id']][result['name']]['status'] = DTS_ERR_TBL["PARALLEL_EXECUTE_ERR"] + + def add_parallel_task(self, action, config): + """ + Add task into parallel pool, will call corresponding function later + based on action type. + """ + if action == "start": + task = self.parallel_vm_start + data = config + elif action == "stop": + task = self.parallel_vm_stop + data = config['name'] + elif action == "cmd": + # just string command by now + task = self.parallel_vm_command + data = config + elif action == "migration": + task = self.parallel_vm_migration + data = config + + # due to threadpool request, one item + request = threadpool.makeRequests(task, [data], self.save_result, self.handle_vm_exception) + self._pool_requests.extend(request) + + def do_parallel_task(self): + """ + Do configured tasks in parallel, will return if all tasks finished + """ + # set parallel mode + save_global_setting(DTS_PARALLEL_SETTING, 'yes') + + self.pool_result = [dict() for _ in self.duts] + for req in self._pool_requests: + self.pool.putRequest(req) + + self.logger.info("All parallel tasks start at %s" % time.ctime()) + # clean the request queue + self._pool_requests = list() + + while True: + try: + time.sleep(0.5) + self.pool.poll() + except threadpool.NoResultsPending: + self.logger.info("All parallel tasks have been done at %s" % time.ctime()) + break + except Exception as e: + self.logger.error("Met exception %s" % (str(e))) + break + + # clear pool related queues, clean thread + self.pool._requests_queue.queue.clear() + self.pool._results_queue.queue.clear() + + time.sleep(2) + + # exit from parallel mode + save_global_setting(DTS_PARALLEL_SETTING, 'no') + + def get_parallel_result(self): + """ + Return result information for this parallel task + """ + return self.pool_result + + def list_threads(self): + main_thread = threading.currentThread() + for t in threading.enumerate(): + if t is main_thread: + continue + self.logger.error("thread [%s] is still activing" % t.getName()) + + def destroy_parallel(self): + """ + Destroy created threads otherwise threads may can't created + """ + self.pool.dismissWorkers(self.max_vm, do_join=True) + self.pool.wait() + self.list_threads() -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 03/16] framework/utils: support locks function in parallel model 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 01/16] framework: add external thread pool library Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 04/16] framework: add DUT index support Marvin Liu ` (13 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Added parallel lock support which can protect critical resources and actions. Parallel locks are function level and separated between DUTs. 2. Add user-defined serialzer function support in pprint function. 3. Remove rsa key action will only do once for all virtual machines Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/utils.py b/framework/utils.py index 1ecef09..762c927 100644 --- a/framework/utils.py +++ b/framework/utils.py @@ -35,9 +35,95 @@ import os import inspect import socket import struct +import threading +import types +from functools import wraps DTS_ENV_PAT = r"DTS_*" +def create_parallel_locks(num_duts): + """ + Create thread lock dictionary based on DUTs number + """ + global locks_info + locks_info = [] + for _ in range(num_duts): + lock_info = dict() + lock_info['update_lock'] = threading.RLock() + locks_info.append(lock_info) + + +def parallel_lock(num=1): + """ + Wrapper function for protect parallel threads, allow mulitple threads + share one lock. Locks are created based on function name. Thread locks are + separated between duts according to argument 'dut_id'. + Parameter: + num: Number of parallel threads for the lock + """ + global locks_info + + def decorate(func): + @wraps(func) + def wrapper(*args, **kwargs): + if 'dut_id' in kwargs: + dut_id = kwargs['dut_id'] + else: + dut_id = 0 + + # in case function arguments is not correct + if dut_id >= len(locks_info): + dut_id = 0 + + lock_info = locks_info[dut_id] + uplock = lock_info['update_lock'] + + name = func.__name__ + uplock.acquire() + + if name not in lock_info: + lock_info[name] = dict() + lock_info[name]['lock'] = threading.RLock() + lock_info[name]['current_thread'] = 1 + else: + lock_info[name]['current_thread'] += 1 + + lock = lock_info[name]['lock'] + + # make sure when owned global lock, should also own update lock + if lock_info[name]['current_thread'] >= num: + if lock._is_owned(): + print RED("DUT%d %s waiting for func lock %s" % (dut_id, + threading.current_thread().name, func.__name__)) + lock.acquire() + else: + uplock.release() + + try: + ret = func(*args, **kwargs) + except Exception as e: + if not uplock._is_owned(): + uplock.acquire() + + if lock._is_owned(): + lock.release() + lock_info[name]['current_thread'] = 0 + uplock.release() + raise e + + if not uplock._is_owned(): + uplock.acquire() + + if lock._is_owned(): + lock.release() + lock_info[name]['current_thread'] = 0 + + uplock.release() + + return ret + return wrapper + return decorate + def RED(text): return "\x1B[" + "31;1m" + str(text) + "\x1B[" + "0m" @@ -51,11 +137,11 @@ def GREEN(text): return "\x1B[" + "32;1m" + str(text) + "\x1B[" + "0m" -def pprint(some_dict): +def pprint(some_dict, serialzer=None): """ Print JSON format dictionary object. """ - return json.dumps(some_dict, sort_keys=True, indent=4) + return json.dumps(some_dict, sort_keys=True, indent=4, default=serialzer) def regexp(s, to_match, allString=False): @@ -83,26 +169,13 @@ def get_obj_funcs(obj, func_name_regex): yield func +@parallel_lock() def remove_old_rsa_key(crb, ip): """ Remove the old RSA key of specified IP on crb. """ - if ':' not in ip: - ip = ip.strip() - port = '' - else: - addr = ip.split(':') - ip = addr[0].strip() - port = addr[1].strip() - rsa_key_path = "~/.ssh/known_hosts" - if port: - remove_rsa_key_cmd = "sed -i '/^\[%s\]:%d/d' %s" % \ - (ip.strip(), int( - port), rsa_key_path) - else: - remove_rsa_key_cmd = "sed -i '/^%s/d' %s" % \ - (ip.strip(), rsa_key_path) + remove_rsa_key_cmd = "sed -i '/%s/d' %s" % (ip, rsa_key_path) crb.send_expect(remove_rsa_key_cmd, "# ") -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 04/16] framework: add DUT index support 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (2 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 03/16] framework/utils: support locks function in parallel model Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 05/16] framework/logger: optimize output format for child threads Marvin Liu ` (12 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. All CRBs will have index concept, the index value is assigned when CRB instantiating. 2. Alternative session is not must for virtual DUT. Thus can save lots of system resource when starting many vitual machines. 3. Virtual environment setup will remove all related ssh rsa keys. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/crb.py b/framework/crb.py index dd29a8b..7affce7 100644 --- a/framework/crb.py +++ b/framework/crb.py @@ -49,7 +49,8 @@ class Crb(object): CPU/PCI/NIC on the board and setup running environment for DPDK. """ - def __init__(self, crb, serializer, name): + def __init__(self, crb, serializer, name, alt_session=True, dut_id=0): + self.dut_id = dut_id self.crb = crb self.read_cache = False self.skip_setup = False @@ -62,14 +63,18 @@ class Crb(object): self.logger = getLogger(name) self.session = SSHConnection(self.get_ip_address(), name, self.get_username(), - self.get_password()) + self.get_password(), dut_id) self.session.init_log(self.logger) - self.alt_session = SSHConnection( - self.get_ip_address(), - name + '_alt', - self.get_username(), - self.get_password()) - self.alt_session.init_log(self.logger) + if alt_session: + self.alt_session = SSHConnection( + self.get_ip_address(), + name + '_alt', + self.get_username(), + self.get_password(), + dut_id) + self.alt_session.init_log(self.logger) + else: + self.alt_session = None def send_expect(self, cmds, expected, timeout=TIMEOUT, alt_session=False, verify=False): @@ -78,8 +83,8 @@ class Crb(object): there's no expected string found before timeout, TimeoutException will be raised. """ - - if alt_session: + # sometimes there will be no alt_session like VM dut + if alt_session and self.alt_session: return self.alt_session.session.send_expect(cmds, expected, timeout, verify) @@ -93,7 +98,8 @@ class Crb(object): session = SSHConnection(self.get_ip_address(), name, self.get_username(), - self.get_password()) + self.get_password(), + dut_id = self.dut_id) session.init_log(logger) self.sessions.append(session) return session @@ -104,7 +110,7 @@ class Crb(object): """ for save_session in self.sessions: if save_session == session: - save_session.close() + save_session.close(force=True) logger = getLogger(save_session.name) logger.logger_exit() self.sessions.remove(save_session) @@ -141,7 +147,7 @@ class Crb(object): Send commands to crb and return string before timeout. """ - if alt_session: + if alt_session and self.alt_session: return self.alt_session.session.send_command(cmds, timeout) return self.session.send_command(cmds, timeout) @@ -167,7 +173,7 @@ class Crb(object): "awk '/HugePages_Total/ { print $2 }' /proc/meminfo", "# ", alt_session=True) if huge_pages != "": - return int(huge_pages) + return int(huge_pages.split()[0]) return 0 def mount_huge_pages(self): @@ -220,9 +226,6 @@ class Crb(object): """ self.base_dir = base_dir - def set_virttype(self, virttype): - self.virttype = virttype - def admin_ports(self, port, status): """ Force set port's interface status. diff --git a/framework/dut.py b/framework/dut.py index 22ff0bb..258a6cc 100644 --- a/framework/dut.py +++ b/framework/dut.py @@ -39,7 +39,7 @@ from ssh_connection import SSHConnection from crb import Crb from net_device import GetNicObj from virt_resource import VirtResource -from utils import RED +from utils import RED, remove_old_rsa_key from uuid import uuid4 @@ -60,9 +60,9 @@ class Dut(Crb): CORE_LIST_CACHE_KEY = 'dut_core_list' PCI_DEV_CACHE_KEY = 'dut_pci_dev_info' - def __init__(self, crb, serializer): + def __init__(self, crb, serializer, dut_id): self.NAME = 'dut' + LOG_NAME_SEP + '%s' % crb['My IP'] - super(Dut, self).__init__(crb, serializer, self.NAME) + super(Dut, self).__init__(crb, serializer, self.NAME, alt_session=True, dut_id=dut_id) self.host_init_flag = False self.number_of_cores = 0 @@ -76,29 +76,35 @@ class Dut(Crb): # hypervisor pid list, used for cleanup self.virt_pids = [] - def init_host_session(self): - if self.host_init_flag: - pass - else: - self.host_session = SSHConnection( - self.get_ip_address(), - self.NAME + '_host', - self.get_username(), - self.get_password()) - self.host_session.init_log(self.logger) - self.host_init_flag = True + def init_host_session(self, vm_name): + """ + Create session for each VM, session will be handled by VM instance + """ + self.host_session = SSHConnection( + self.get_ip_address(), + vm_name + '_host', + self.get_username(), + self.get_password()) + self.host_session.init_log(self.logger) + self.logger.info("[%s] create new session for VM" % (threading.current_thread().name)) def new_session(self, suite=""): """ Create new session for dut instance. Session name will be unique. """ - session_name = self.NAME + '_' + str(uuid4()) + if len(suite): + session_name = self.NAME + '_' + suite + else: + session_name = self.NAME + '_' + str(uuid4()) session = self.create_session(name=session_name) if suite != "": session.logger.config_suite(suite, self.NAME) else: session.logger.config_execution(self.NAME) - session.send_expect("cd %s" % self.base_dir, "# ") + + if getattr(self, "base_dir", None): + session.send_expect("cd %s" % self.base_dir, "# ") + return session def close_session(self, session): @@ -788,6 +794,14 @@ class Dut(Crb): self.ports_info.append({'port': port, 'pci': pci_str, 'type': pci_id, 'intf': intf, 'mac': macaddr, 'ipv6': ipv6, 'numa': -1}) + def setup_virtenv(self, virttype): + """ + Setup current virtualization hypervisor type and remove elder VM ssh keys + """ + self.virttype = virttype + # remove VM ras keys from tester + remove_old_rsa_key(self.tester, self.crb['My IP']) + def generate_sriov_vfs_by_port(self, port_id, vf_num, driver='default'): """ Generate SRIOV VFs with default driver it is bound now or specifid driver. diff --git a/framework/project_dpdk.py b/framework/project_dpdk.py index 6bc47f2..a25e7a7 100644 --- a/framework/project_dpdk.py +++ b/framework/project_dpdk.py @@ -49,8 +49,8 @@ class DPDKdut(Dut): build, memory and kernel module. """ - def __init__(self, crb, serializer): - super(DPDKdut, self).__init__(crb, serializer) + def __init__(self, crb, serializer, dut_id): + super(DPDKdut, self).__init__(crb, serializer, dut_id) self.testpmd = None def set_target(self, target, bind_dev=True): @@ -439,7 +439,7 @@ class DPDKtester(Tester): interface and generate port map. """ - def __init__(self, crb, serializer): + def __init__(self, crb, serializer, dut_id): self.NAME = "tester" super(DPDKtester, self).__init__(crb, serializer) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 05/16] framework/logger: optimize output format for child threads 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (3 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 04/16] framework: add DUT index support Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 06/16] framework/dts: support multiple VMs module Marvin Liu ` (11 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/logger.py b/framework/logger.py index 1018674..78e90d6 100644 --- a/framework/logger.py +++ b/framework/logger.py @@ -35,7 +35,7 @@ import sys import inspect import re -from settings import LOG_NAME_SEP, FOLDERS +from settings import LOG_NAME_SEP, FOLDERS, load_global_setting, DTS_PARALLEL_SETTING from utils import RED """ @@ -87,10 +87,9 @@ logging.addLevelName(logging.SUITE_TESTER_OUTPUT, 'SUITE_TESTER_OUTPUT') logging.addLevelName(logging.DTS_IXIA_CMD, 'DTS_IXIA_CMD') logging.addLevelName(logging.DTS_IXIA_OUTPUT, 'DTS_IXIA_OUTPUT') -message_fmt = '%(asctime)s %(levelname)20s: %(message)s' date_fmt = '%d/%m/%Y %H:%M:%S' RESET_COLOR = '\033[0m' -stream_fmt = '%(color)s%(levelname)20s: %(message)s' + RESET_COLOR +stream_fmt = '%(color)s%(name)30s: %(message)s' + RESET_COLOR log_dir = None @@ -99,13 +98,6 @@ def set_verbose(): verbose = True -def add_salt(salt, msg): - if not salt: - return msg - else: - return '[%s] ' % salt + str(msg) - - class BaseLoggerAdapter(logging.LoggerAdapter): """ Upper layer of original logging module. @@ -212,8 +204,6 @@ class DTSLOG(BaseLoggerAdapter): self.crb = crb super(DTSLOG, self).__init__(self.logger, dict(crb=self.crb)) - self.salt = '' - self.fh = None self.ch = None @@ -226,6 +216,11 @@ class DTSLOG(BaseLoggerAdapter): """ Config stream handler and file handler. """ + if load_global_setting(DTS_PARALLEL_SETTING) == 'yes': + message_fmt = '%(asctime)s %(name)30s %(threadName)s: %(message)s' + else: + message_fmt = '%(asctime)s %(name)30s: %(message)s' + fh.setFormatter(logging.Formatter(message_fmt, date_fmt)) ch.setFormatter(logging.Formatter(stream_fmt, date_fmt)) @@ -251,28 +246,24 @@ class DTSLOG(BaseLoggerAdapter): """ DTS warnning level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.warn_lvl, message) def info(self, message): """ DTS information level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.info_lvl, message) def error(self, message): """ DTS error level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.error_lvl, message) def debug(self, message): """ DTS debug level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.debug_lvl, message) def set_logfile_path(self, path): @@ -304,34 +295,20 @@ class DTSLOG(BaseLoggerAdapter): ch = ColorHandler() self.__log_handler(fh, ch) - def set_salt(crb, start_flag): - if LOG_NAME_SEP in crb: - old = '%s%s' % (start_flag, LOG_NAME_SEP) - if not self.salt: - self.salt = crb.replace(old, '', 1) - if crb.startswith('dut'): self.info_lvl = logging.DTS_DUT_CMD self.debug_lvl = logging.DTS_DUT_OUTPUT self.warn_lvl = logging.DTS_DUT_RESULT - - set_salt(crb, 'dut') elif crb.startswith('tester'): self.info_lvl = logging.DTS_TESTER_CMD self.debug_lvl = logging.DTS_TESTER_OUTPUT self.warn_lvl = logging.DTS_TESTER_RESULT - - set_salt(crb, 'tester') elif crb.startswith('ixia'): self.info_lvl = logging.DTS_IXIA_CMD self.debug_lvl = logging.DTS_IXIA_OUTPUT - - set_salt(crb, 'ixia') elif crb.startswith('virtdut'): self.info_lvl = logging.DTS_VIRTDUT_CMD self.debug_lvl = logging.DTS_VIRTDUT_OUTPUT - - set_salt(crb, 'virtdut') else: self.error_lvl = logging.ERROR self.warn_lvl = logging.WARNING -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 06/16] framework/dts: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (4 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 05/16] framework/logger: optimize output format for child threads Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 07/16] framework/debugger: " Marvin Liu ` (10 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Added required external library system path 2. Initialized global parallel lock structure 3. Init DUTs and tester with index argument Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/dts.py b/framework/dts.py index 581d282..0b2240c 100644 --- a/framework/dts.py +++ b/framework/dts.py @@ -58,7 +58,7 @@ import logger import debugger from config import CrbsConf from checkCase import CheckCase -from utils import get_subclasses, copy_instance_attr +from utils import get_subclasses, copy_instance_attr, create_parallel_locks import sys reload(sys) sys.setdefaultencoding('UTF8') @@ -211,7 +211,7 @@ def dts_run_commands(crb, dts_commands): raise VerifyFailure("Command execution failed") -def get_project_obj(project_name, super_class, crbInst, serializer): +def get_project_obj(project_name, super_class, crbInst, serializer, dut_id): """ Load project module and return crb instance. """ @@ -221,12 +221,12 @@ def get_project_obj(project_name, super_class, crbInst, serializer): project_module = __import__(PROJECT_MODULE_PREFIX + project_name) for project_subclassname, project_subclass in get_subclasses(project_module, super_class): - project_obj = project_subclass(crbInst, serializer) + project_obj = project_subclass(crbInst, serializer, dut_id) if project_obj is None: - project_obj = super_class(crbInst, serializer) + project_obj = super_class(crbInst, serializer, dut_id) except Exception as e: log_handler.info("LOAD PROJECT MODULE INFO: " + str(e)) - project_obj = super_class(crbInst, serializer) + project_obj = super_class(crbInst, serializer, dut_id) return project_obj @@ -280,13 +280,15 @@ def dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serialize testInst = copy.copy(crbInsts[0]) testInst['My IP'] = crbInsts[0]['tester IP'] - tester = get_project_obj(project, Tester, testInst, serializer) + tester = get_project_obj(project, Tester, testInst, serializer, dut_id=0) + dut_id = 0 for crbInst in crbInsts: dutInst = copy.copy(crbInst) dutInst['My IP'] = crbInst['IP'] - dutobj = get_project_obj(project, Dut, dutInst, serializer) + dutobj = get_project_obj(project, Dut, dutInst, serializer, dut_id=dut_id) duts.append(dutobj) + dut_id += 1 dts_log_execution(duts, tester, log_handler) @@ -298,7 +300,7 @@ def dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serialize nic = settings.load_global_setting(settings.HOST_NIC_SETTING) for dutobj in duts: dutobj.tester = tester - dutobj.set_virttype(virttype) + dutobj.setup_virtenv(virttype) dutobj.set_speedup_options(read_cache, skip_setup) dutobj.set_directory(base_dir) # save execution nic setting @@ -463,6 +465,11 @@ def run_all(config_file, pkgName, git, patch, skip_setup, if not os.path.exists(output_dir): os.mkdir(output_dir) + # add external library + exec_file = os.path.realpath(__file__) + extra_libs_path = exec_file.replace('framework/dts.py', '') + 'extra_libs' + sys.path.insert(1, extra_libs_path) + # add python module search path sys.path.append(suite_dir) @@ -537,6 +544,9 @@ def run_all(config_file, pkgName, git, patch, skip_setup, result.dut = duts[0] + # init global lock + create_parallel_locks(len(duts)) + # init dut, tester crb duts, tester = dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serializer, virttype) tester.set_re_run(re_run) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 07/16] framework/debugger: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (5 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 06/16] framework/dts: support multiple VMs module Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 08/16] framework/ssh_pexpect: " Marvin Liu ` (9 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Disable signal when running in parallel mode since signal is not support in child thread 2. Support connect to session by integer index argument Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/debugger.py b/framework/debugger.py index 5c02414..8018f55 100644 --- a/framework/debugger.py +++ b/framework/debugger.py @@ -28,8 +28,8 @@ import signal import code import time import imp -from settings import load_global_setting, DEBUG_SETTING -from utils import get_subclasses, copy_instance_attr +from settings import load_global_setting, DEBUG_SETTING, DTS_PARALLEL_SETTING +from utils import get_subclasses, copy_instance_attr, GREEN from test_case import TestCase @@ -71,10 +71,16 @@ def connect_command(connect): Connect to ssh session and give control to user. """ from ssh_connection import CONNECTIONS - for connection in CONNECTIONS: - for name, session in connection.items(): - if name == connect: - session.session.interact() + if type(connect) == int: + name, session = CONNECTIONS[connect].items()[0] + print GREEN("Connecting to session[%s]" % name) + session.session.interact() + else: + for connection in CONNECTIONS: + for name, session in connection.items(): + if name == connect: + print GREEN("Connecting to session[%s]" % name) + session.session.interact() def rerun_command(): @@ -164,7 +170,8 @@ def ignore_keyintr(): """ Temporary disable interrupt handler. """ - if load_global_setting(DEBUG_SETTING) != 'yes': + # signal can't be used in thread + if load_global_setting(DEBUG_SETTING) != 'yes' or load_global_setting(DTS_PARALLEL_SETTING) == 'yes': return global debug_cmd @@ -180,7 +187,8 @@ def aware_keyintr(): """ Reenable interrupt handler. """ - if load_global_setting(DEBUG_SETTING) != 'yes': + # signal can't be used in thread + if load_global_setting(DEBUG_SETTING) != 'yes' or load_global_setting(DTS_PARALLEL_SETTING) == 'yes': return return signal.signal(signal.SIGINT, keyboard_handle) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 08/16] framework/ssh_pexpect: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (6 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 07/16] framework/debugger: " Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 09/16] framework/ssh_connection: support DUT index argument Marvin Liu ` (8 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Protect ssh connection action by parallel lock 2. Enlarge terminal column for serial output 3. Enhance error handling when exception happened 4. Add DUT index argument which can separate lock by DUTs Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/ssh_pexpect.py b/framework/ssh_pexpect.py index 8d6c3dd..3c988b7 100644 --- a/framework/ssh_pexpect.py +++ b/framework/ssh_pexpect.py @@ -3,7 +3,7 @@ import pexpect from pexpect import pxssh from debugger import ignore_keyintr, aware_keyintr from exception import TimeoutException, SSHConnectionException, SSHSessionDeadException -from utils import RED, GREEN +from utils import RED, GREEN, parallel_lock """ Module handle ssh sessions between tester and DUT. @@ -14,31 +14,47 @@ Aslo support transfer files to tester or DUT. class SSHPexpect(object): - def __init__(self, host, username, password): + def __init__(self, host, username, password, dut_id): self.magic_prompt = "MAGIC PROMPT" + self.logger = None + + self.host = host + self.username = username + self.password = password + + self._connect_host(dut_id=dut_id) + + @parallel_lock(num=8) + def _connect_host(self, dut_id=0): + """ + Create connection to assigned crb, parameter dut_id will be used in + parallel_lock thus can assure isolated locks for each crb. + Parallel ssh connections are limited to MaxStartups option in SSHD + configuration file. By default concurrent number is 10, so default + threads number is limited to 8 which less than 10. Lock number can + be modified along with MaxStartups value. + """ try: self.session = pxssh.pxssh() - self.host = host - self.username = username - self.password = password - if ':' in host: - self.ip = host.split(':')[0] - self.port = int(host.split(':')[1]) + if ':' in self.host: + self.ip = self.host.split(':')[0] + self.port = int(self.host.split(':')[1]) self.session.login(self.ip, self.username, self.password, original_prompt='[$#>]', port=self.port, login_timeout=20) else: self.session.login(self.host, self.username, self.password, original_prompt='[$#>]') - self.send_expect('stty -echo', '# ', timeout=2) - except Exception, e: + self.send_expect('stty -echo', '#') + self.send_expect('stty columns 1000', "#") + except Exception as e: print RED(e) if getattr(self, 'port', None): suggestion = "\nSuggession: Check if the fireware on [ %s ] " % \ self.ip + "is stoped\n" print GREEN(suggestion) - raise SSHConnectionException(host) + raise SSHConnectionException(self.host) def init_log(self, logger, name): self.logger = logger @@ -56,24 +72,33 @@ class SSHPexpect(object): return before def send_expect(self, command, expected, timeout=15, verify=False): - ret = self.send_expect_base(command, expected, timeout) - if verify: - ret_status = self.send_expect_base("echo $?", expected, timeout) - if not int(ret_status): - return ret + + try: + ret = self.send_expect_base(command, expected, timeout) + if verify: + ret_status = self.send_expect_base("echo $?", expected, timeout) + if not int(ret_status): + return ret + else: + self.logger.error("Command: %s failure!" % command) + self.logger.error(ret) + return int(ret_status) else: - self.logger.error("Command: %s failure!" % command) - self.logger.error(ret) - return int(ret_status) - else: - return ret + return ret + except Exception as e: + print RED("Exception happened in [%s] and output is [%s]" % (command, self.get_output_before())) + raise(e) def send_command(self, command, timeout=1): - ignore_keyintr() - self.clean_session() - self.__sendline(command) - aware_keyintr() - return self.get_session_before(timeout) + try: + ignore_keyintr() + self.clean_session() + self.__sendline(command) + aware_keyintr() + except Exception as e: + raise(e) + + return self.get_session_before(timeout=timeout) def clean_session(self): self.get_session_before(timeout=0.01) @@ -90,8 +115,9 @@ class SSHPexpect(object): pass aware_keyintr() - before = self.get_output_before() + before = self.get_output_all() self.__flush() + return before def __flush(self): @@ -116,7 +142,6 @@ class SSHPexpect(object): def get_output_before(self): if not self.isalive(): raise SSHSessionDeadException(self.host) - self.session.flush() before = self.session.before.rsplit('\r\n', 1) if before[0] == "[PEXPECT]": before[0] = "" @@ -124,7 +149,6 @@ class SSHPexpect(object): return before[0] def get_output_all(self): - self.session.flush() output = self.session.before output.replace("[PEXPECT]", "") return output -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 09/16] framework/ssh_connection: support DUT index argument 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (7 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 08/16] framework/ssh_pexpect: " Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 10/16] framework/settings: add parallel related settings Marvin Liu ` (7 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/ssh_connection.py b/framework/ssh_connection.py index 675279b..ce12816 100644 --- a/framework/ssh_connection.py +++ b/framework/ssh_connection.py @@ -44,8 +44,8 @@ class SSHConnection(object): Implement send_expect/copy function upper SSHPexpet module. """ - def __init__(self, host, session_name, username, password=''): - self.session = SSHPexpect(host, username, password) + def __init__(self, host, session_name, username, password='', dut_id=0): + self.session = SSHPexpect(host, username, password, dut_id) self.name = session_name connection = {} connection[self.name] = self.session @@ -63,7 +63,7 @@ class SSHConnection(object): def send_command(self, cmds, timeout=1): self.logger.info(cmds) - out = self.session.send_command(cmds) + out = self.session.send_command(cmds, timeout) self.logger.debug(out) return out -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 10/16] framework/settings: add parallel related settings 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (8 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 09/16] framework/ssh_connection: support DUT index argument Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 11/16] framework/virt_resource: support multiple VMs module Marvin Liu ` (6 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/settings.py b/framework/settings.py index e5b7746..b897299 100644 --- a/framework/settings.py +++ b/framework/settings.py @@ -211,6 +211,8 @@ DEBUG_CASE_SETTING = "DTS_DEBUGCASE_ENABLE" DPDK_RXMODE_SETTING = "DTS_DPDK_RXMODE" DTS_ERROR_ENV = "DTS_RUNNING_ERROR" DTS_CFG_FOLDER = "DTS_CFG_FOLDER" +DTS_PARALLEL_SETTING = "DTS_PARALLEL_ENABLE" +MKS_LM_ENABLING = "DTS_MKS_LM_ENABLE" """ @@ -223,6 +225,7 @@ DTS_ERR_TBL = { "TESTER_SETUP_ERR" : 4, "SUITE_SETUP_ERR": 5, "SUITE_EXECUTE_ERR": 6, + "PARALLEL_EXECUTE_ERR": 7 } def get_nic_name(type): -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 11/16] framework/virt_resource: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (9 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 10/16] framework/settings: add parallel related settings Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu ` (5 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Added serial and migrate port allocation support 2. Added lock for virtualzation resource allocation functions 3. Quick scan free port in port allocation function Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_resource.py b/framework/virt_resource.py index b830f4e..2f1b680 100644 --- a/framework/virt_resource.py +++ b/framework/virt_resource.py @@ -31,10 +31,16 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from random import randint -from utils import get_obj_funcs +from utils import get_obj_funcs, parallel_lock, RED -INIT_FREE_PORT = 6060 +#global INIT_FREE_PORT +#global INIT_SERIAL_PORT +#global INIT_MIGRATE_PORT +INIT_FREE_PORT = 6000 +INIT_SERIAL_PORT = 7000 +INIT_MIGRATE_PORT = 8000 +QuickScan = True class VirtResource(object): @@ -149,6 +155,7 @@ class VirtResource(object): for cpu in cpus: self.__core_used(cpu) + @parallel_lock() def alloc_cpu(self, vm='', number=-1, socket=-1, corelist=None): """ There're two options for request cpu resouce for vm. @@ -207,12 +214,14 @@ class VirtResource(object): return False return True + @parallel_lock() def free_cpu(self, vm): if self.__vm_has_resource(vm, 'cores'): for core in self.allocated_info[vm]['cores']: self.__core_unused(core) self.allocated_info[vm].pop('cores') + @parallel_lock() def alloc_pf(self, vm='', number=-1, socket=-1, pflist=[]): """ There're two options for request pf devices for vm. @@ -246,12 +255,14 @@ class VirtResource(object): self.allocated_info[vm]['ports'] = ports return ports + @parallel_lock() def free_pf(self, vm): if self.__vm_has_resource(vm, 'ports'): for pci in self.allocated_info[vm]['ports']: self.__port_unused(pci) self.allocated_info[vm].pop('ports') + @parallel_lock() def alloc_vf_from_pf(self, vm='', pf_pci='', number=-1, vflist=[]): """ There're two options for request vf devices of pf device. @@ -286,12 +297,14 @@ class VirtResource(object): self.allocated_info[vm]['vfs'] = vfs return vfs + @parallel_lock() def free_vf(self, vm): if self.__vm_has_resource(vm, 'vfs'): for pci in self.allocated_info[vm]['vfs']: self.__vf_unused(pci) self.allocated_info[vm].pop('vfs') + @parallel_lock() def add_vf_on_pf(self, pf_pci='', vflist=[]): """ Add vf devices generated by specified pf devices. @@ -307,6 +320,7 @@ class VirtResource(object): self.used_vfs += used_vfs self.vfs += vfs + @parallel_lock() def del_vf_on_pf(self, pf_pci='', vflist=[]): """ Remove vf devices generated by specified pf devices. @@ -327,38 +341,78 @@ class VirtResource(object): del self.used_vfs[index] del self.vfs[index] - def alloc_port(self, vm=''): + @parallel_lock() + def _check_port_allocated(self, port): + """ + Check whether port has been pre-allocated + """ + for vm_info in self.allocated_info.values(): + if vm_info.has_key('hostport') and port == vm_info['hostport']: + return True + if vm_info.has_key('serialport') and port == vm_info['serialport']: + return True + if vm_info.has_key('migrateport') and port == vm_info['migrateport']: + return True + return False + + @parallel_lock() + def alloc_port(self, vm='', port_type='connect'): """ Allocate unused host port for vm """ + global INIT_FREE_PORT + global INIT_SERIAL_PORT + global INIT_MIGRATE_PORT + if vm == '': print "Alloc host port request vitual machine name!!!" return None - port_start = INIT_FREE_PORT + randint(1, 100) - port_step = randint(1, 10) - port = None - count = 20 + if port_type == 'connect': + port = INIT_FREE_PORT + elif port_type == 'serial': + port = INIT_SERIAL_PORT + elif port_type == 'migrate': + port = INIT_MIGRATE_PORT + while True: - if self.dut.check_port_occupied(port_start) is False: - port = port_start - break - count -= 1 - if count < 0: - print 'No available port on the host!!!' + if self.dut.check_port_occupied(port) is False and self._check_port_allocated(port) is False: break - port_start += port_step + else: + port += 1 + continue if vm not in self.allocated_info: self.allocated_info[vm] = {} - self.allocated_info[vm]['hostport'] = port + if port_type == 'connect': + self.allocated_info[vm]['hostport'] = port + elif port_type == 'serial': + self.allocated_info[vm]['serialport'] = port + elif port_type == 'migrate': + self.allocated_info[vm]['migrateport'] = port + + # do not scan port from the begining + if QuickScan: + if port_type == 'connect': + INIT_FREE_PORT = port + elif port_type == 'serial': + INIT_SERIAL_PORT = port + elif port_type == 'migrate': + INIT_MIGRATE_PORT = port + return port + @parallel_lock() def free_port(self, vm): if self.__vm_has_resource(vm, 'hostport'): self.allocated_info[vm].pop('hostport') + if self.__vm_has_resource(vm, 'serialport'): + self.allocated_info[vm].pop('serialport') + if self.__vm_has_resource(vm, 'migrateport'): + self.allocated_info[vm].pop('migrateport') + @parallel_lock() def alloc_vnc_num(self, vm=''): """ Allocate unused host VNC display number for VM. @@ -377,10 +431,12 @@ class VirtResource(object): return free_vnc_display_num + @parallel_lock() def free_vnc_num(self, vm): if self.__vm_has_resource(vm, 'vnc_display_num'): self.allocated_info[vm].pop('vnc_display_num') + @parallel_lock() def free_all_resource(self, vm): """ Free all resource VM has been allocated. -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 12/16] framework/virt_base: add attach/quick start/quit function for VM management 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (10 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 11/16] framework/virt_resource: support multiple VMs module Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 13/16] framework/virt_dut: support multiple VMs module Marvin Liu ` (4 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Added attach function, virtual machine can be auto-detected and connected. There's no need to stop and restart the virtual machine if it has been started. 2. Added quit function, just close the ssh connection to virtual machine. The virutal machine will still alive afte quit function. 3. Adeed quick start function, just start virtual machine and will not initialize it. 4. Added local configuration set function, user can assigned virtual machine parameters without any configuration file. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_base.py b/framework/virt_base.py index 1838ca1..e93cbab 100644 --- a/framework/virt_base.py +++ b/framework/virt_base.py @@ -42,7 +42,6 @@ from config import VIRTCONF from logger import getLogger from settings import CONFIG_ROOT_PATH from virt_dut import VirtDut -from utils import remove_old_rsa_key ST_NOTSTART = "NOTSTART" ST_PAUSE = "PAUSE" @@ -67,11 +66,9 @@ class VirtBase(object): self.vm_name = vm_name self.suite = suite_name - # init the host session and logger for VM - self.host_dut.init_host_session() + # create self used host session, need close it later + self.host_session = self.host_dut.new_session(self.vm_name) - # replace dut session - self.host_session = self.host_dut.host_session self.host_logger = self.host_dut.logger # base_dir existed for host dut has prepared it self.host_session.send_expect("cd %s" % self.host_dut.base_dir, "# ") @@ -87,6 +84,7 @@ class VirtBase(object): self.virt_type = self.get_virt_type() self.params = [] + self.local_conf = [] # default call back function is None self.callback = None @@ -124,22 +122,32 @@ class VirtBase(object): if self.find_option_index(key) is None: self.__save_local_config(key, param[key]) + def set_local_config(self, local_conf): + """ + Configure VM configuration from user input + """ + self.local_conf = local_conf + def load_local_config(self, suite_name): """ Load local configure in the path DTS_ROOT_PATH/conf. """ # load local configuration by suite and vm name - conf = VirtConf(CONFIG_ROOT_PATH + os.sep + suite_name + '.cfg') - conf.load_virt_config(self.vm_name) - local_conf = conf.get_virt_config() + try: + conf = VirtConf(CONFIG_ROOT_PATH + os.sep + suite_name + '.cfg') + conf.load_virt_config(self.vm_name) + self.local_conf = conf.get_virt_config() + except: + # when met exception in load VM config + # just leave local conf untouched + pass + # replace global configurations with local configurations - for param in local_conf: + for param in self.local_conf: if 'mem' in param.keys(): self.__save_local_config('mem', param['mem']) - continue if 'cpu' in param.keys(): self.__save_local_config('cpu', param['cpu']) - continue # save local configurations for key in param.keys(): self.__save_local_config(key, param[key]) @@ -165,13 +173,16 @@ class VirtBase(object): try: param_func = getattr(self, 'add_vm_' + key) if callable(param_func): - for option in value: - param_func(**option) + if type(value) is list: + for option in value: + param_func(**option) else: print utils.RED("Virt %s function not callable!!!" % key) except AttributeError: + self.host_logger.error(traceback.print_exception(*sys.exc_info())) print utils.RED("Virt %s function not implemented!!!" % key) except Exception: + self.host_logger.error(traceback.print_exception(*sys.exc_info())) raise exception.VirtConfigParamException(key) def find_option_index(self, option): @@ -232,6 +243,21 @@ class VirtBase(object): self.load_global_config() self.load_local_config(self.suite) + def attach(self): + # load configuration + self.load_config() + + # change login user/password + index = self.find_option_index("login") + if index: + value = self.params[index]["login"] + for option in value: + self.add_vm_login(**option) + + # attach real vm + self._attach_vm() + return None + def start(self, load_config=True, set_target=True, cpu_topo=''): """ Start VM and instantiate the VM with VirtDut. @@ -247,7 +273,7 @@ class VirtBase(object): if self.vm_status is ST_RUNNING: # connect vm dut and init running environment - vm_dut = self.instantiate_vm_dut(set_target, cpu_topo) + vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, autodetect_topo=True) else: vm_dut = None @@ -263,6 +289,28 @@ class VirtBase(object): return None return vm_dut + def quick_start(self, load_config=True, set_target=True, cpu_topo=''): + """ + Only Start VM and not do anything else, will be helpful in multiple VMs + """ + try: + if load_config is True: + self.load_config() + # compose boot command for different hypervisors + self.compose_boot_param() + + # start virutal machine + self._quick_start_vm() + + except Exception as vm_except: + if self.handle_exception(vm_except): + print utils.RED("Handled expection " + str(type(vm_except))) + else: + print utils.RED("Unhandled expection " + str(type(vm_except))) + + if callable(self.callback): + self.callback() + def migrated_start(self, set_target=True, cpu_topo=''): """ Instantiate the VM after migration done @@ -271,7 +319,7 @@ class VirtBase(object): try: if self.vm_status is ST_PAUSE: # connect backup vm dut and it just inherited from host - vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, bind_dev=False) + vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, bind_dev=False, autodetect_topo=False) except Exception as vm_except: if self.handle_exception(vm_except): print utils.RED("Handled expection " + str(type(vm_except))) @@ -324,7 +372,7 @@ class VirtBase(object): """ NotImplemented - def instantiate_vm_dut(self, set_target=True, cpu_topo='', bind_dev=True): + def instantiate_vm_dut(self, set_target=True, cpu_topo='', bind_dev=True, autodetect_topo=True): """ Instantiate the Dut class for VM. """ @@ -337,9 +385,6 @@ class VirtBase(object): crb['user'] = username crb['pass'] = password - # remove default key - remove_old_rsa_key(self.host_dut.tester, crb['IP']) - serializer = self.host_dut.serializer try: @@ -350,8 +395,10 @@ class VirtBase(object): self.virt_type, self.vm_name, self.suite, - cpu_topo) - except: + cpu_topo, + dut_id=self.host_dut.dut_id) + except Exception as vm_except: + self.handle_exception(vm_except) raise exception.VirtDutConnectException return None @@ -374,7 +421,7 @@ class VirtBase(object): try: # setting up dpdk in vm, must call at last - vm_dut.prerequisites(self.host_dut.package, self.host_dut.patches) + vm_dut.prerequisites(self.host_dut.package, self.host_dut.patches, autodetect_topo) if set_target: target = self.host_dut.target vm_dut.set_target(target, bind_dev) @@ -389,6 +436,19 @@ class VirtBase(object): """ Stop the VM. """ + self._stop_vm() + self.quit() + + self.virt_pool.free_all_resource(self.vm_name) + + def quit(self): + """ + Just quit connection to the VM + """ + if getattr(self, 'host_session', None): + self.host_session.close() + self.host_session = None + # vm_dut may not init in migration case if getattr(self, 'vm_dut', None): if self.vm_status is ST_RUNNING: @@ -400,10 +460,6 @@ class VirtBase(object): self.vm_dut.logger.logger_exit() self.vm_dut = None - self._stop_vm() - - self.virt_pool.free_all_resource(self.vm_name) - def register_exit_callback(self, callback): """ Call register exit call back function -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 13/16] framework/virt_dut: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (11 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 14/16] framework/qemu_kvm: " Marvin Liu ` (3 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Added DUT index argument and protect tester interactive process by lock 2. Only create one ssh connection to virtual machine 3. Support disable auto-detect function which must be locked, thus can save virtual machine instantiate time 4. Without alternative session, kill_all function should be handled by control session Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_dut.py b/framework/virt_dut.py index 348279d..83f661d 100644 --- a/framework/virt_dut.py +++ b/framework/virt_dut.py @@ -32,8 +32,8 @@ import os import re import time -import utils import settings +from utils import RED, parallel_lock from config import PortConf from settings import NICS, LOG_NAME_SEP, get_netdev from project_dpdk import DPDKdut @@ -53,14 +53,16 @@ class VirtDut(DPDKdut): or CRBBareMetal. """ - def __init__(self, hyper, crb, serializer, virttype, vm_name, suite, cpu_topo): + def __init__(self, hyper, crb, serializer, virttype, vm_name, suite, cpu_topo, dut_id): self.vm_name = vm_name self.hyper = hyper self.cpu_topo = cpu_topo + self.dut_id = dut_id self.vm_ip = crb['IP'] self.NAME = 'virtdut' + LOG_NAME_SEP + '%s' % self.vm_ip - super(Dut, self).__init__(crb, serializer, self.NAME) + # do not create addition alt_session + super(Dut, self).__init__(crb, serializer, self.NAME, alt_session=False, dut_id=self.dut_id) # load port config from suite cfg self.suite = suite @@ -73,15 +75,13 @@ class VirtDut(DPDKdut): self.virttype = virttype def init_log(self): - self.logger.config_suite(self.host_dut.test_classname, 'virtdut') + if hasattr(self.host_dut, "test_classname"): + self.logger.config_suite(self.host_dut.test_classname, 'virtdut') def close(self, force=False): if self.session: self.session.close(force) self.session = None - if self.alt_session: - self.alt_session.close(force) - self.alt_session = None RemoveNicObj(self) def set_nic_type(self, nic_type): @@ -91,6 +91,7 @@ class VirtDut(DPDKdut): self.nic_type = nic_type # vm_dut config will load from vm configuration file + @parallel_lock() def load_portconf(self): """ Load port config for this virtual machine @@ -99,12 +100,24 @@ class VirtDut(DPDKdut): self.conf.load_ports_config(self.vm_name) self.ports_cfg = self.conf.get_ports_config() - return + @parallel_lock() + def detect_portmap(self, dut_id): + """ + Detect port mapping with ping6 message, should be locked for protect + tester operations. + """ + # enable tester port ipv6 + self.host_dut.enable_tester_ipv6() + + self.map_available_ports() - def create_portmap(self): - # if not config ports in vm port config file, used ping6 get portmap - if not self.ports_cfg: - self.map_available_ports() + # disable tester port ipv6 + self.host_dut.disable_tester_ipv6() + + def load_portmap(self): + """ + Generate port mapping base on loaded port configuration + """ port_num = len(self.ports_info) self.ports_map = [-1] * port_num for key in self.ports_cfg.keys(): @@ -142,7 +155,7 @@ class VirtDut(DPDKdut): if bind_dev: self.bind_interfaces_linux('igb_uio') - def prerequisites(self, pkgName, patch): + def prerequisites(self, pkgName, patch, autodetect_topo): """ Prerequest function should be called before execute any test case. Will call function to scan all lcore's information which on DUT. @@ -152,7 +165,10 @@ class VirtDut(DPDKdut): if not self.skip_setup: self.prepare_package() - self.send_expect("cd %s" % self.base_dir, "# ") + out = self.send_expect("cd %s" % self.base_dir, "# ") + if 'No such file or directory' in out: + self.logger.error("Can't switch to dpdk folder!!!") + self.send_expect("alias ls='ls --color=none'", "#") if self.get_os_type() == 'freebsd': @@ -180,14 +196,14 @@ class VirtDut(DPDKdut): # load port infor from config file self.load_portconf() - # enable tester port ipv6 - self.host_dut.enable_tester_ipv6() self.mount_procfs() - self.create_portmap() - - # disable tester port ipv6 - self.host_dut.disable_tester_ipv6() + if self.ports_cfg: + self.load_portmap() + else: + # if no config ports in port config file, will auto-detect portmap + if autodetect_topo: + self.detect_portmap(dut_id=self.dut_id) # print latest ports_info for port_info in self.ports_info: @@ -196,7 +212,7 @@ class VirtDut(DPDKdut): def init_core_list(self): self.cores = [] cpuinfo = self.send_expect("grep --color=never \"processor\"" - " /proc/cpuinfo", "#", alt_session=False) + " /proc/cpuinfo", "#") cpuinfo = cpuinfo.split('\r\n') if self.cpu_topo != '': topo_reg = r"(\d)S/(\d)C/(\d)T" @@ -394,3 +410,21 @@ class VirtDut(DPDKdut): self.ports_map[vmPort] = remotePort hits[remotePort] = True continue + + def kill_all(self, alt_session=False): + """ + Kill all dpdk applications on VM + """ + control = getattr(self.hyper, 'control_session', None) + if callable(control): + out = control("lsof -Fp /var/run/.rte_config") + pids = [] + pid_reg = r'p(\d+)' + if len(out): + lines = out.split('\r\n') + for line in lines: + m = re.match(pid_reg, line) + if m: + pids.append(m.group(1)) + for pid in pids: + control('kill -9 %s' % pid) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 14/16] framework/qemu_kvm: support multiple VMs module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (12 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 13/16] framework/virt_dut: support multiple VMs module Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 15/16] conf/virt_global: add vm management related configuration Marvin Liu ` (2 subsequent siblings) 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Remove qga and utilize serial port for control session, two types 'telnet'|'socket' are supported for serial port 2. Protect qemu start action with parallel lock 3. Support attach function 4. Control virtual machine start process by start time/logout time/login prompt/passwork prompt variables. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/qemu_kvm.py b/framework/qemu_kvm.py index 84f961b..82933de 100644 --- a/framework/qemu_kvm.py +++ b/framework/qemu_kvm.py @@ -37,7 +37,8 @@ import os from virt_base import VirtBase from virt_base import ST_NOTSTART, ST_PAUSE, ST_RUNNING, ST_UNKNOWN from exception import StartVMFailedException -from settings import get_host_ip +from settings import get_host_ip, load_global_setting, DTS_PARALLEL_SETTING +from utils import parallel_lock, RED # This name is derictly defined in the qemu guest serivce # So you can not change it except it is changed by the service @@ -65,6 +66,17 @@ class QEMUKvm(VirtBase): "fi" QEMU_IFUP_PATH = '/etc/qemu-ifup' + # Default login session timeout value + LOGIN_TIMEOUT = 60 + # By default will wait 120 seconds for VM start + # If VM not ready in this period, will try restart it once + START_TIMEOUT = 120 + # Default timeout value for operation when VM starting + OPERATION_TIMEOUT = 20 + # Default login prompt + LOGIN_PROMPT = "login:" + # Default password prompt + PASSWORD_PROMPT = "Password:" def __init__(self, dut, vm_name, suite_name): super(QEMUKvm, self).__init__(dut, vm_name, suite_name) @@ -79,9 +91,6 @@ class QEMUKvm(VirtBase): # initialize some resource used by guest. self.init_vm_request_resource() - QGA_CLI_PATH = '-r dep/QMP/' - self.host_session.copy_file_to(QGA_CLI_PATH) - # charater and network device default index self.char_idx = 0 self.netdev_idx = 0 @@ -105,15 +114,41 @@ class QEMUKvm(VirtBase): # if there is not the values of the specified options self.set_vm_default() + self.am_attached = False + + # allow restart VM when can't login + self.restarted = False + + def check_alive(self): + """ + Check whether VM is alive for has been start up + """ + pid_regx = r'p(\d+)' + out = self.host_session.send_expect("lsof -Fp /tmp/.%s.pid" % self.vm_name, "#", timeout=30) + for line in out.splitlines(): + m = re.match(pid_regx, line) + if m: + self.host_logger.info("Found VM %s already running..." % m.group(0)) + return True + return False + + def kill_alive(self): + pid_regx = r'p(\d+)' + out = self.host_session.send_expect("lsof -Fp /tmp/.%s.pid" % self.vm_name, "# ") + for line in out.splitlines(): + m = re.match(pid_regx, line) + if m: + self.host_session.send_expect("kill -9 %s" % m.group(0)[1:], "# ") + def set_vm_default(self): self.set_vm_name(self.vm_name) if self.arch == 'aarch64': self.set_vm_machine('virt') self.set_vm_enable_kvm() self.set_vm_pid_file() - self.set_vm_qga() self.set_vm_daemon() self.set_vm_monitor() + self.set_vm_serial() if not self.__default_nic: # add default control interface @@ -375,8 +410,7 @@ class QEMUKvm(VirtBase): options['opt_media']: disk_boot_line += separator + 'media=%s' % options['opt_media'] - if self.__string_has_multi_fields(disk_boot_line, separator): - self.__add_boot_line(disk_boot_line) + self.__add_boot_line(disk_boot_line) def add_vm_pflash(self, **options): """ @@ -386,6 +420,19 @@ class QEMUKvm(VirtBase): pflash_boot_line = '-pflash %s' % options['file'] self.__add_boot_line(pflash_boot_line) + def add_vm_start(self, **options): + """ + Update VM start and login related settings + """ + if 'wait_seconds' in options.keys(): + self.START_TIMEOUT = int(options['wait_seconds']) + if 'login_timeout' in options.keys(): + self.LOGIN_TIMEOUT = int(options['login_timeout']) + if 'login_prompt' in options.keys(): + self.LOGIN_PROMPT = options['login_prompt'] + if 'password_prompt' in options.keys(): + self.PASSWORD_PROMPT = options['password_prompt'] + def add_vm_login(self, **options): """ user: login username of virtual machine @@ -538,8 +585,11 @@ class QEMUKvm(VirtBase): # get the host port in the option host_port = field(opt_hostfwd, 2).split('-')[0] + + # if no host assigned, just allocate it if not host_port: - host_port = str(self.virt_pool.alloc_port(self.vm_name)) + host_port = str(self.virt_pool.alloc_port(self.vm_name, port_type='connect')) + self.redir_port = host_port # get the guest addr @@ -637,6 +687,9 @@ class QEMUKvm(VirtBase): else: self.params.append({'device': [opts]}) + # start up time may increase after add device + self.START_TIMEOUT += 8 + def add_vm_device(self, **options): """ driver: [pci-assign | virtio-net-pci | ...] @@ -653,8 +706,8 @@ class QEMUKvm(VirtBase): self.__add_vm_virtio_user_pci(**options) elif options['driver'] == 'vhost-cuse': self.__add_vm_virtio_cuse_pci(**options) - if options['driver'] == 'vfio-pci': - self.__add_vm_pci_vfio(**options) + elif options['driver'] == 'vfio-pci': + self.__add_vm_pci_vfio(**options) def __add_vm_pci_vfio(self, **options): """ @@ -708,7 +761,17 @@ class QEMUKvm(VirtBase): """ separator = ',' # chardev parameter - if 'opt_path' in options.keys() and options['opt_path']: + netdev_id = 'netdev%d' % self.netdev_idx + if 'opt_script' in options.keys() and options['opt_script']: + if 'opt_br' in options.keys() and \ + options['opt_br']: + bridge = options['opt_br'] + else: + bridge = self.DEFAULT_BRIDGE + self.__generate_net_config_script(str(bridge)) + dev_boot_line = '-netdev tap,id=%s,script=%s' % (netdev_id, options['opt_script']) + self.netdev_idx += 1 + elif 'opt_path' in options.keys() and options['opt_path']: dev_boot_line = '-chardev socket' char_id = 'char%d' % self.char_idx if 'opt_server' in options.keys() and options['opt_server']: @@ -734,12 +797,16 @@ class QEMUKvm(VirtBase): netdev_id, char_id) self.__add_boot_line(dev_boot_line) # device parameter - opts = {'opt_netdev': '%s' % netdev_id} - if 'opt_mac' in options.keys() and \ - options['opt_mac']: - opts['opt_mac'] = options['opt_mac'] - if 'opt_settings' in options.keys() and options['opt_settings']: - opts['opt_settings'] = options['opt_settings'] + opts = {'opt_netdev': '%s' % netdev_id} + if 'opt_mac' in options.keys() and \ + options['opt_mac']: + opts['opt_mac'] = options['opt_mac'] + if 'opt_settings' in options.keys() and options['opt_settings']: + opts['opt_settings'] = options['opt_settings'] + if 'opt_legacy' in options.keys() and options['opt_legacy']: + opts['opt_legacy'] = options['opt_legacy'] + if 'opt_settings' in options.keys() and options['opt_settings']: + opts['opt_settings'] = options['opt_settings'] self.__add_vm_virtio_net_pci(**opts) def __add_vm_virtio_cuse_pci(self, **options): @@ -795,6 +862,9 @@ class QEMUKvm(VirtBase): if 'opt_addr' in options.keys() and \ options['opt_addr']: dev_boot_line += separator + 'addr=%s' % options['opt_addr'] + if 'opt_legacy' in options.keys() and \ + options['opt_legacy']: + dev_boot_line += separator + 'disable-modern=%s' % options['opt_legacy'] if 'opt_settings' in options.keys() and \ options['opt_settings']: dev_boot_line += separator + '%s' % options['opt_settings'] @@ -841,41 +911,6 @@ class QEMUKvm(VirtBase): else: self.monitor_sock_path = None - def set_vm_qga(self, enable='yes'): - """ - Set VM qemu-guest-agent. - """ - index = self.find_option_index('qga') - if index: - self.params[index] = {'qga': [{'enable': '%s' % enable}]} - else: - self.params.append({'qga': [{'enable': '%s' % enable}]}) - QGA_SOCK_PATH = QGA_SOCK_PATH_TEMPLATE % {'vm_name': self.vm_name} - self.qga_sock_path = QGA_SOCK_PATH - - def add_vm_qga(self, **options): - """ - enable: 'yes' - Make sure qemu-guest-agent servie up in vm - """ - QGA_DEV_ID = '%(vm_name)s_qga0' % {'vm_name': self.vm_name} - QGA_SOCK_PATH = QGA_SOCK_PATH_TEMPLATE % {'vm_name': self.vm_name} - - separator = ' ' - - if 'enable' in options.keys(): - if options['enable'] == 'yes': - qga_boot_block = '-chardev socket,path=%(SOCK_PATH)s,server,nowait,id=%(ID)s' + \ - separator + '-device virtio-serial' + separator + \ - '-device virtserialport,chardev=%(ID)s,name=%(DEV_NAME)s' - qga_boot_line = qga_boot_block % {'SOCK_PATH': QGA_SOCK_PATH, - 'DEV_NAME': QGA_DEV_NAME, - 'ID': QGA_DEV_ID} - self.__add_boot_line(qga_boot_line) - self.qga_sock_path = QGA_SOCK_PATH - else: - self.qga_sock_path = '' - def add_vm_migration(self, **options): """ enable: yes @@ -889,49 +924,135 @@ class QEMUKvm(VirtBase): self.migrate_port = options['port'] else: self.migrate_port = str( - self.virt_pool.alloc_port(self.vm_name)) + self.virt_pool.alloc_port(self.vm_name), port_type="migrate") migrate_boot_line = migrate_cmd % { 'migrate_port': self.migrate_port} self.__add_boot_line(migrate_boot_line) - def add_vm_serial_port(self, **options): + + def set_vm_serial(self): """ - enable: 'yes' + Set serial device into options """ - if 'enable' in options.keys(): - if options['enable'] == 'yes': - self.serial_path = "/tmp/%s_serial.sock" % self.vm_name - serial_boot_line = '-serial unix:%s,server,nowait' % self.serial_path - self.__add_boot_line(serial_boot_line) + index = self.find_option_index('serial') + if index: + self.params[index] = {'serial': [{'type': 'telnet'}]} + else: + self.params.append({'serial': [{'type': 'telnet'}]}) + + def add_vm_serial(self, **options): + """ + type : 'telnet' | 'socket' + """ + self.serial_type = options['type'] + if self.serial_type == 'telnet': + if 'port' in options: + self.serial_port = int(options['port']) else: - pass + self.serial_port = self.virt_pool.alloc_port(self.vm_name, port_type="serial") + serial_boot_line = '-display none -serial telnet::%d,server,nowait' % self.serial_port + else: + self.serial_path = "/tmp/%s_serial.sock" % self.vm_name + serial_boot_line = '-display none -serial unix:%s,server,nowait' % self.serial_path + + self.__add_boot_line(serial_boot_line) - def connect_serial_port(self, name="", first=True): + def connect_serial_port(self, name=""): """ Connect to serial port and return connected session for usage if connected failed will return None """ - if getattr(self, 'serial_path', None): - self.serial_session = self.host_dut.new_session(suite=name) - self.serial_session.send_command("nc -U %s" % self.serial_path) - if first: - # login into Fedora os, not sure can work on all distributions - self.serial_session.send_expect("", "login:") - self.serial_session.send_expect( - "%s" % self.username, "Password:") - self.serial_session.send_expect("%s" % self.password, "# ") - return self.serial_session + shell_reg = r"(\s*)\[(.*)\]# " + try: + if getattr(self, 'serial_session', None) is None: + self.serial_session = self.host_session - return None + self.serial_session.send_command("nc -U %s" % self.serial_path) + + # login message not ouput if timeout too small + out = self.serial_session.send_command("", timeout=5).replace('\r', '').replace('\n', '') - def close_serial_port(self): + if len(out) == 0: + raise StartVMFailedException("Can't get output from [%s:%s]" % (self.host_dut.crb['My IP'], self.vm_name)) + + m = re.match(shell_reg, out) + if m: + # dmidecode output contain #, so use other matched string + out = self.serial_session.send_expect("dmidecode -t system", "Product Name", timeout=self.OPERATION_TIMEOUT) + # cleanup previous output + self.serial_session.get_session_before(timeout=0.1) + + # if still on host, need reconnect + if 'QEMU' not in out: + raise StartVMFailedException("Not real login [%s]" % self.vm_name) + else: + # has enter into VM shell + return True + + # login into Redhat os, not sure can work on all distributions + if self.LOGIN_PROMPT not in out: + raise StartVMFailedException("Can't login [%s] now!!!" % self.vm_name) + else: + self.serial_session.send_expect("%s" % self.username, self.PASSWORD_PROMPT, timeout=self.LOGIN_TIMEOUT) + # system maybe busy here, enlarge timeout equal to login timeout + self.serial_session.send_expect("%s" % self.password, "#", timeout=self.LOGIN_TIMEOUT) + return self.serial_session + except Exception as e: + # when exception happened, force close serial connection and reconnect + print RED("[%s:%s] exception [%s] happened" % (self.host_dut.crb['My IP'], self.vm_name, str(e))) + self.close_serial_session(dut_id=self.host_dut.dut_id) + return False + + def connect_telnet_port(self, name=""): """ - Close serial session if it existed + Connect to serial port and return connected session for usage + if connected failed will return None """ - if getattr(self, 'serial_session', None): - # exit from nc first - self.serial_session.send_expect("^C", "# ") - self.host_dut.close_session(self.serial_session) + shell_reg = r"(\s*)\[(.*)\]# " + scan_cmd = "lsof -i:%d | grep telnet | awk '{print $2}'" % self.serial_port + + try: + # assume serial is not connect + if getattr(self, 'serial_session', None) is None: + self.serial_session = self.host_session + + self.serial_session.send_expect("telnet localhost %d" % self.serial_port, "Connected to localhost", timeout=self.OPERATION_TIMEOUT) + + # output will be empty if timeout too small + out = self.serial_session.send_command("", timeout=5).replace('\r', '').replace('\n', '') + + # if no output from serial port, either connection close or system hang + if len(out) == 0: + raise StartVMFailedException("Can't get output from [%s]" % self.vm_name) + + # if enter into shell + m = re.match(shell_reg, out) + if m: + # dmidecode output contain #, so use other matched string + out = self.serial_session.send_expect("dmidecode -t system", "Product Name", timeout=self.OPERATION_TIMEOUT) + # cleanup previous output + self.serial_session.get_session_before(timeout=0.1) + + # if still on host, need reconnect + if 'QEMU' not in out: + raise StartVMFailedException("Not real login [%s]" % self.vm_name) + else: + # has enter into VM shell + return True + + # login into Redhat os, not sure can work on all distributions + if "x86_64 on an x86_64" not in out: + print RED("[%s:%s] not ready for login" % (self.host_dut.crb['My IP'], self.vm_name)) + return False + else: + self.serial_session.send_expect("%s" % self.username, "Password:", timeout=self.LOGIN_TIMEOUT) + self.serial_session.send_expect("%s" % self.password, "#", timeout=self.LOGIN_TIMEOUT) + return True + except Exception as e: + # when exception happened, force close serial connection and reconnect + print RED("[%s:%s] exception [%s] happened" % (self.host_dut.crb['My IP'], self.vm_name, str(e))) + self.close_serial_session(dut_id=self.host_dut.dut_id) + return False def add_vm_vnc(self, **options): """ @@ -979,6 +1100,79 @@ class QEMUKvm(VirtBase): cmd = options['cmd'] self.__add_boot_line(cmd) + def _check_vm_status(self): + """ + Check and restart QGA if not ready, wait for network ready + """ + self.__wait_vm_ready() + + self.__wait_vmnet_ready() + + def _attach_vm(self): + """ + Attach VM + Collected information : serial/monitor/qga sock file + : hostfwd address + """ + self.am_attached = True + + if not self._query_pid(): + raise StartVMFailedException("Can't strip process pid!!!") + + cmdline = self.host_session.send_expect('cat /proc/%d/cmdline' % self.pid, '# ') + qemu_boot_line = cmdline.replace('\x00', ' ') + self.qemu_boot_line = qemu_boot_line.split(' ', 1)[1] + self.qemu_emulator = qemu_boot_line.split(' ', 1)[0] + + serial_reg = ".*serial\x00unix:(.*?)," + telnet_reg = ".*serial\x00telnet::(\d+)," + monitor_reg = ".*monitor\x00unix:(.*?)," + hostfwd_reg = ".*hostfwd=tcp:(.*):(\d+)-:" + migrate_reg = ".*incoming\x00tcp::(\d+)" + + # support both telnet and unix domain socket serial device + m = re.match(serial_reg, cmdline) + if not m: + m1 = re.match(telnet_reg, cmdline) + if not m1: + raise StartVMFailedException("No serial sock available!!!") + else: + self.serial_port = int(m1.group(1)) + self.serial_type = "telnet" + else: + self.serial_path = m.group(1) + self.serial_type = "socket" + + m = re.match(monitor_reg, cmdline) + if not m: + raise StartVMFailedException("No monitor sock available!!!") + self.monitor_sock_path = m.group(1) + + m = re.match(hostfwd_reg, cmdline) + if not m: + raise StartVMFailedException("No host fwd config available!!!") + + self.net_type = 'hostfwd' + self.host_port = m.group(2) + self.hostfwd_addr = m.group(1) + ':' + self.host_port + + # record start time, need call before check_vm_status + self.start_time = time.time() + + try: + self.update_status() + except: + self.host_logger.error("Can't query vm status!!!") + + if self.vm_status is not ST_PAUSE: + self._check_vm_status() + else: + m = re.match(migrate_reg, cmdline) + if not m: + raise StartVMFailedException("No migrate port available!!!") + + self.migrate_port = int(m.group(1)) + def _start_vm(self): """ Start VM. @@ -987,25 +1181,86 @@ class QEMUKvm(VirtBase): qemu_boot_line = self.generate_qemu_boot_line() - # Start VM using the qemu command - ret = self.host_session.send_expect(qemu_boot_line, '# ', verify=True) - if type(ret) is int and ret != 0: - raise StartVMFailedException('Start VM failed!!!') + self.__send_qemu_cmd(qemu_boot_line, dut_id=self.host_dut.dut_id) self.__get_pci_mapping() # query status self.update_status() + # sleep few seconds for bios/grub + time.sleep(10) + # when vm is waiting for migration, can't ping if self.vm_status is not ST_PAUSE: - # if VM waiting for migration, can't return ping - out = self.__control_session('ping', '120') - if "Not responded" in out: - raise StartVMFailedException('Not response in 120 seconds!!!') + self.__wait_vm_ready() self.__wait_vmnet_ready() + # Start VM using the qemu command + # lock critical action like start qemu + @parallel_lock(num=4) + def __send_qemu_cmd(self, qemu_boot_line, dut_id): + # add more time for qemu start will be slow when system is busy + ret = self.host_session.send_expect(qemu_boot_line, '# ', verify=True, timeout=30) + + # record start time + self.start_time = time.time() + + # wait for qemu process ready + time.sleep(2) + if type(ret) is int and ret != 0: + raise StartVMFailedException('Start VM failed!!!') + + def _quick_start_vm(self): + self.__alloc_assigned_pcis() + + qemu_boot_line = self.generate_qemu_boot_line() + + self.__send_qemu_cmd(qemu_boot_line, dut_id=self.host_dut.dut_id) + + self.__get_pci_mapping() + + # query status + self.update_status() + + # sleep few seconds for bios and grub + time.sleep(10) + + def __ping_vm(self): + logged_in = False + cur_time = time.time() + time_diff = cur_time - self.start_time + try_times = 0 + while (time_diff < self.START_TIMEOUT): + if self.control_session('ping') == "Success": + logged_in = True + break + + # update time consume + cur_time = time.time() + time_diff = cur_time - self.start_time + + self.host_logger.warning("Can't login [%s] on [%s], retry %d times!!!" % (self.vm_name, self.host_dut.crb['My IP'], try_times + 1)) + time.sleep(self.OPERATION_TIMEOUT) + try_times += 1 + continue + + return logged_in + + def __wait_vm_ready(self): + logged_in = self.__ping_vm() + if not logged_in: + if not self.restarted: + # make sure serial session has been quit + self.close_serial_session(dut_id=self.host_dut.dut_id) + self.vm_status = ST_NOTSTART + self._stop_vm() + self.restarted = True + self._start_vm() + else: + raise StartVMFailedException('Not response in %d seconds!!!' % self.START_TIMEOUT) + def start_migration(self, remote_ip, remote_port): """ Send migration command to host and check whether start migration @@ -1047,18 +1302,14 @@ class QEMUKvm(VirtBase): """ Generate the whole QEMU boot line. """ - qemu_emulator = self.qemu_emulator - - if self.vcpus_pinned_to_vm.strip(): - vcpus = self.__alloc_vcpus() - - if vcpus.strip(): - qemu_boot_line = 'taskset -c %s ' % vcpus + \ - qemu_emulator + ' ' + \ - self.qemu_boot_line + if self.vcpus_pinned_to_vm: + vcpus = self.vcpus_pinned_to_vm.replace(' ', ',') + qemu_boot_line = 'taskset -c %s ' % vcpus + \ + self.qemu_emulator + ' ' + \ + self.qemu_boot_line else: - qemu_boot_line = qemu_emulator + ' ' + \ - self.qemu_boot_line + qemu_boot_line = self.qemu_emulator + ' ' + \ + self.qemu_boot_line return qemu_boot_line @@ -1067,16 +1318,12 @@ class QEMUKvm(VirtBase): wait for 120 seconds for vm net ready 10.0.2.* is the default ip address allocated by qemu """ - count = 40 - while count: - out = self.__control_session('ifconfig') - if "10.0.2" in out: - return True - time.sleep(6) - count -= 1 - - raise StartVMFailedException( - 'Virtual machine control net not ready in 120 seconds!!!') + ret = self.control_session("network") + # network has been ready, just return + if ret == "Success": + return True + else: + raise StartVMFailedException('Virtual machine control net not ready!!!') def __alloc_vcpus(self): """ @@ -1201,10 +1448,10 @@ class QEMUKvm(VirtBase): """ Get IP which VM is connected by bridge. """ - out = self.__control_session('ping', '60') + out = self.control_session('ping', '60') if not out: time.sleep(10) - out = self.__control_session('ifconfig') + out = self.control_session('ifconfig') ips = re.findall(r'inet (\d+\.\d+\.\d+\.\d+)', out) if '127.0.0.1' in ips: @@ -1267,7 +1514,7 @@ class QEMUKvm(VirtBase): Query and update VM status """ out = self.__monitor_session('info', 'status') - self.host_logger.info("Virtual machine status: %s" % out) + self.host_logger.warning("Virtual machine status: %s" % out) if 'paused' in out: self.vm_status = ST_PAUSE @@ -1278,12 +1525,23 @@ class QEMUKvm(VirtBase): info = self.host_session.send_expect('cat %s' % self.__pid_file, "# ") try: - pid = int(info) + pid = int(info.split()[0]) # save pid into dut structure self.host_dut.virt_pids.append(pid) except: self.host_logger.info("Failed to capture pid!!!") + def _query_pid(self): + info = self.host_session.send_expect('cat %s' % self.__pid_file, "# ") + try: + # sometimes saw to lines in pid file + pid = int(info.splitlines()[0]) + # save pid into dut structure + self.pid = pid + return True + except: + return False + def __strip_guest_pci(self): """ Strip all pci-passthrough device information, based on qemu monitor @@ -1315,43 +1573,154 @@ class QEMUKvm(VirtBase): return pcis - def __control_session(self, command, *args): + def __strip_guest_core(self): + """ + Strip all lcore-thread binding information + Return array will be [thread0, thread1, ...] + """ + cores = [] + # CPU #0: pc=0xffffffff8104c416 (halted) thread_id=40677 + core_reg = r'^.*CPU #(\d+): (.*) thread_id=(\d+)' + out = self.__monitor_session('info', 'cpus') + + if out is None: + return cores + + lines = out.split("\r\n") + for line in lines: + m = re.match(core_reg, line) + if m: + cores.append(int(m.group(3))) + + return cores + + def handle_serial_session(func): + """ + Wrapper function to handle serial port, must return serial to host session + """ + def _handle_serial_session(self, command): + # just raise error if connect failed, for func can't all any more + try: + if self.serial_type == 'socket': + assert (self.connect_serial_port(name=self.vm_name)), "Can't connect to serial socket" + elif self.serial_type == 'telnet': + assert (self.connect_telnet_port(name=self.vm_name)), "Can't connect to serial port" + except: + return 'Failed' + + try: + out = func(self, command) + self.quit_serial_session() + return out + except Exception as e: + print RED("Exception happend on [%s] serial with cmd [%s]" % (self.vm_name, command)) + print RED(e) + self.close_serial_session(dut_id=self.host_dut.dut_id) + return 'Failed' + + return _handle_serial_session + + def quit_serial_session(self): + """ + Quit from serial session gracefully """ - Use the qemu guest agent service to control VM. + if self.serial_type == 'socket': + self.serial_session.send_expect("^C", "# ") + elif self.serial_type == 'telnet': + self.serial_session.send_command("^]") + self.serial_session.send_command("quit") + self.serial_session = None + + @parallel_lock() + def close_serial_session(self, dut_id): + """ + Force kill serial connection from DUT when exception happened + """ + # return serial_session to host_session + if self.serial_type == 'socket': + scan_cmd = "ps -e -o pid,cmd |grep 'nc -U %s' |grep -v grep" % self.serial_path + out = self.host_dut.send_expect(scan_cmd, "#") + proc_info = out.strip().split() + try: + pid = int(proc_info[0]) + self.host_dut.send_expect('kill %d' % pid, "#") + except: + pass + self.host_dut.send_expect("", "# ") + elif self.serial_type == 'telnet': + scan_cmd = "lsof -i:%d | grep telnet | awk '{print $2}'" % self.serial_port + proc_info = self.host_dut.send_expect(scan_cmd, "#") + try: + pid = int(proc_info) + self.host_dut.send_expect('kill %d' % pid, "#") + except: + pass + + self.serial_session = None + return + + @handle_serial_session + def control_session(self, command): + """ + Use the serial port to control VM. Note: :command: there are these commands as below: - cat, fsfreeze, fstrim, halt, ifconfig, info,\ - ping, powerdown, reboot, shutdown, suspend + ping, network, powerdown :args: give different args by the different commands. """ - if not self.qga_sock_path: - self.host_logger.info( - "No QGA service between host [ %s ] and guest [ %s ]" % - (self.host_dut.NAME, self.vm_name)) - return None - - cmd_head = '~/QMP/' + \ - "qemu-ga-client " + \ - "--address=%s %s" % \ - (self.qga_sock_path, command) - - cmd = cmd_head - for arg in args: - cmd = cmd_head + ' ' + str(arg) - if command is "ping": - out = self.host_session.send_expect(cmd, '# ', int(args[0])) + if command == "ping": + # disable stty input characters for send_expect function + self.serial_session.send_expect("stty -echo", "#", timeout=self.OPERATION_TIMEOUT) + return "Success" + elif command == "network": + intf = self.serial_session.send_expect("ls -1 /sys/bus/pci/devices/0000:00:1f.0/net", "#", timeout=self.OPERATION_TIMEOUT) + out = self.serial_session.send_expect("ifconfig %s" % intf, "#", timeout=self.OPERATION_TIMEOUT) + if "10.0.2" not in out: + self.serial_session.send_expect("dhclient %s -timeout 10" % intf, "#", timeout=30) + else: + return "Success" + + out = self.serial_session.send_expect("ifconfig", "#", timeout=self.OPERATION_TIMEOUT) + if "10.0.2" not in out: + return "Failed" + + return "Success" + elif command == "powerdown": + self.serial_session.send_command("init 0") + # conflict with handle_serial_session + if self.serial_type == "socket": + self.serial_session.send_expect("^C", "# ") + elif self.serial_type == "telnet": + self.serial_session.send_command("^]") + self.serial_session.send_command("quit") + time.sleep(10) + self.kill_alive() + return "Success" else: - out = self.host_session.send_expect(cmd, '# ') - - return out + out = self.serial_session.send_command(command) + return out def _stop_vm(self): """ Stop VM. """ if self.vm_status is ST_RUNNING: - self.__control_session('powerdown') + self.control_session('powerdown') else: self.__monitor_session('quit') time.sleep(5) + # remove temporary file + self.host_session.send_expect("rm -f %s" % self.__pid_file, "#") + + def pin_threads(self, lcores): + """ + Pin thread to assigned cores + """ + thread_reg = r'CPU #(\d+): .* thread_id=(\d+)' + output = self.__monitor_session('info', 'cpus') + thread_cores = re.findall(thread_reg, output) + cores_map = zip(thread_cores, lcores) + for thread_info, core_id in cores_map: + cpu_id, thread_id = thread_info + self.host_session.send_expect("taskset -pc %d %s" % (core_id, thread_id), "#") -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 15/16] conf/virt_global: add vm management related configuration 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (13 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 14/16] framework/qemu_kvm: " Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 16/16] doc: add descriptions for multiple virtual machine module Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/conf/virt_global.cfg b/conf/virt_global.cfg index 176650a..d4c03e2 100644 --- a/conf/virt_global.cfg +++ b/conf/virt_global.cfg @@ -12,6 +12,8 @@ cpu = model=host,number=4,cpupin=3 4 5 6; mem = size=2048; +start = + wait_seconds=120,login_timeout=60,login_prompt=login:,password_prompt=Password:; [XEN] cpu = number=4,cpupin=3 4 5 6; -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v1 16/16] doc: add descriptions for multiple virtual machine module 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (14 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 15/16] conf/virt_global: add vm management related configuration Marvin Liu @ 2018-01-08 2:49 ` Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu 16 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-08 2:49 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/doc/dts_gsg/index.rst b/doc/dts_gsg/index.rst index 05587af..fbd16dc 100644 --- a/doc/dts_gsg/index.rst +++ b/doc/dts_gsg/index.rst @@ -41,3 +41,4 @@ Getting Started Guide review virtualization scenario + multiple_vm diff --git a/doc/dts_gsg/multiple_vm.rst b/doc/dts_gsg/multiple_vm.rst new file mode 100644 index 0000000..85eca26 --- /dev/null +++ b/doc/dts_gsg/multiple_vm.rst @@ -0,0 +1,87 @@ +Multiple Virtual Machines Management +==================================== + +When managing multiple virtual machines, waiting around 2 minutes for each VM will be exhausted. So DTS imported parallel threads model into multiple VMs management scenario. + +.. note:: + Critical resources and actions which can't be handled in parallel have been protected by function level lock. + +Command arguments +----------------- + +Multiple VMs module support start VMs or send commands to VMs in parallel with specified arguments format. + +Arguments for "start" command: + +.. table:: + + +-----------------+----------------------------------+----------------+-------------+ + | name | Description | Default value | Must have | + | | | | | + +-----------------+----------------------------------+----------------+-------------+ + | name | virtual machine name | N/A | Yes | + +-----------------+----------------------------------+----------------+-------------+ + | dut_id | index of DUT | 0 | No | + +-----------------+----------------------------------+----------------+-------------+ + | autodetect_topo | whether detect network topology | False | No | + | | automatically | | | + +-----------------+----------------------------------+----------------+-------------+ + | virt_config | virtual machine config location | N/A | Alternative | + +-----------------+----------------------------------+----------------+-------------+ + | virt_params | local parameters of virutal | N/A | Alternative | + | | machine | | | + +-----------------+----------------------------------+----------------+-------------+ + +Arguments for "cmd" command: + +.. table:: + + +-----------------+----------------------------------+----------------+-------------+ + | name | Description | Default value | Must have | + | | | | | + +-----------------+----------------------------------+----------------+-------------+ + | name | virtual machine name | N/A | Yes | + +-----------------+----------------------------------+----------------+-------------+ + | dut_id | index of DUT | 0 | No | + +-----------------+----------------------------------+----------------+-------------+ + | commands | list of commands which will be | N/A | Yes | + | | sent into the vitual machine | | | + +-----------------+----------------------------------+----------------+-------------+ + | expects | list of expect output of the | N/A | Yes | + | | commands | | | + +-----------------+----------------------------------+----------------+-------------+ + | timeouts | list of timeout value of the | N/A | Yes | + | | commands | | | + +-----------------+----------------------------------+----------------+-------------+ + +.. note:: + If there's nothing expected for the command, still need to define expected string as blank + +Multiple module will catagorize and save the result value after all tasks have been done. Later users can retrieve the result by function get_parallel_result. + +Sample Code +----------- + +.. code-block:: console + + vm_task = MultipleVM(max_vm=self.VM_NUM, duts=self.duts) + + for dut_id in range(len(self.duts)): + for vm_idx in range(VM_NUM): + vm_name = "vm%d" % vm_idx + args = {'name': vm_name, + 'dut_id': dut_id, + 'autodetect_topo': False, + 'virt_params': { + 'qemu': [{'path': '/usr/local/bin/qemu-system-x86_64'}], + 'cpu': [{'model': 'host', 'number': '1', 'cpupin': ''}], + 'mem': [{'size': '1024', 'hugepage': 'yes'}], + 'disk': [{'file': '/storage/vm-image/%s.qcow2' % vm_name}], + 'login': [{'user': 'root', 'password': 'root'}], + 'device': None} + } + + vm_task.add_parallel_task(action="start", config=args) + + vm_task.do_parallel_task() + print vm_task.get_parallel_result() diff --git a/doc/dts_gsg/virtualization.rst b/doc/dts_gsg/virtualization.rst index 1848563..2328603 100644 --- a/doc/dts_gsg/virtualization.rst +++ b/doc/dts_gsg/virtualization.rst @@ -84,6 +84,15 @@ For network access, should disable guest firewall service. systemctl disable firewalld.service +Console connection +"""""""""""""""""" + +Enable virtual machine serial console in kernel command line, DTS will manage virtual machine through serial port. + +.. code-block:: console + + console=ttyS0,115200 + Suite Programing ---------------- @@ -310,34 +319,27 @@ Enable KVM DTS enable KVM full virtualization support as default. This option will significant improve the speed of virtual machine. -Qemu Guest Agent +Qemu Serial Port """""""""""""""" -Qemu monitor supply one method to interact with qemu process. DTS can monitor guest status by command supplied by qemu guest agent. Qemu guest agent is based on virtio-serial devices. +Qemu serial port is the default method to interact with guest OS. DTS can monitor guest status/manage guest network by serial port. .. code-block:: console - -device virtio-serial -device virtserialport,chardev=vm_qga0,name=org.qemu.guest_agent.0 - -daemonize -monitor unix:/tmp/vm_monitor.sock,server,nowait + -serial telnet::7000,server,nowait -Check whether guest os has been started up. +DTS will check the output from serial port and determine whether guest os has been started up. The prompt string for guest login session can be configured by parameter "start". .. code-block:: console - qemu-ga-client address=/tmp/{vm_name}_qga0.sock ping 120 + start = + wait_seconds=120,login_timeout=60,login_prompt=login:,password_prompt=Password:; .. note:: - We only wait two minutes for guest os start up. For guest os only has few hardware and we has disabled most services, so 2 minutes is enough. - This command will be return when guest os is ready, so DTS will not wait 2 minutes for each time. + Default timeout for guest OS start up is 2 minutes. For guest os only has few hardware and we has disabled most services, so 2 minutes is enough. If guest OS can't start up in 2 minutes, DTS will try to restart it once. -Check whether guest os default interface has been up. - -.. code-block:: console - - qemu-ga-client address=/tmp/{vm_name}_qga0.sock ifconfig - -DTS will wait for guest os default interface upped and get auto dhcp address. After that DTS can connect to guest by ssh connections. +DTS will check default interface upped and utilize dhcp to retrieve address. After that DTS can connect to guest by ssh connections. .. code-block:: console @@ -349,15 +351,15 @@ DTS will wait for guest os default interface upped and get auto dhcp address. Af inet6 fe80::200:ff:feb9:fed7 prefixlen 64 ether 00:00:00:b9:fe:d7 -Power down guest os. +Power down guest os by serial port. .. code-block:: console - qemu-ga-client address=/tmp/{vm_name}_qga0.sock powerdown + init 0 .. note:: - For more information about qemu guest agent, please reference to http://wiki.qemu.org/Features/QAPI/GuestAgent. + For more information about qemu serial port, please reference to https://qemu.weilnetz.de/doc/qemu-doc.html. Qemu Monitor """""""""""" @@ -396,7 +398,7 @@ Connection to monitor socket on DUT. For More detail information about qemu monitor. https://en.wikibooks.org/wiki/QEMU/Monitor#info Qemu Machine -"""""""""" +"""""""""""" DTS set default qemu machine type as virt for Aarch64. This option is mandatory for qemu-system-aarch64. -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu ` (15 preceding siblings ...) 2018-01-08 2:49 ` [dts] [PATCH v1 16/16] doc: add descriptions for multiple virtual machine module Marvin Liu @ 2018-01-10 0:10 ` Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 01/16] framework: add external thread pool library Marvin Liu ` (15 more replies) 16 siblings, 16 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:10 UTC (permalink / raw) To: dts This series of patches will support parallel multiple virtual machine management function. Qemu default initialzation process will be enhanced for parallel start. With new model, efficiency for VMs management can be significantly improved. In my environment, test suite can start and control 2 * 63 VMs in two minutes with this new module. v2: add QGA control session back close logger handler when quit for virtual machine optimize allocate display port function for parallel mode add some descriptions in virt_global config file Tested-by: malihong<lihongx.ma@intel.com> Tested-by: zhao,meijuan <meijuanx.zhao@intel.com> Marvin Liu (16): framework: add external thread pool library framework/multiple_vm: add multiple VM management module framework/utils: support locks for parallel model framework: add DUT index support framework/logger: optimize output format for threads framework/dts: support multiple VMs module framework/debugger: support multiple VMs module framework/ssh_pexpect: support multiple VMs module framework/ssh_connection: support multiple VMs module framework/settings: add parallel related settings framework/virt_resource: support multiple VMs module framework/virt_base: add attach/quick start/quit function for VM management framework/virt_dut: support multiple VMs module framework/qemu_kvm: support multiple VMs module conf/virt_global: add vm management related configuration doc: add descriptions for multiple virtual machines module conf/virt_global.cfg | 24 +- doc/dts_gsg/index.rst | 1 + doc/dts_gsg/multiple_vm.rst | 87 +++++ doc/dts_gsg/virtualization.rst | 98 ++++-- extra_libs/threadpool.py | 426 ++++++++++++++++++++++++ framework/crb.py | 37 ++- framework/debugger.py | 24 +- framework/dts.py | 26 +- framework/dut.py | 50 ++- framework/logger.py | 37 +-- framework/multiple_vm.py | 304 +++++++++++++++++ framework/project_dpdk.py | 6 +- framework/qemu_kvm.py | 737 +++++++++++++++++++++++++++++++++-------- framework/settings.py | 3 + framework/ssh_connection.py | 9 +- framework/ssh_pexpect.py | 82 +++-- framework/utils.py | 107 +++++- framework/virt_base.py | 108 ++++-- framework/virt_dut.py | 76 +++-- framework/virt_resource.py | 119 ++++--- 20 files changed, 1978 insertions(+), 383 deletions(-) create mode 100644 doc/dts_gsg/multiple_vm.rst create mode 100644 extra_libs/threadpool.py create mode 100644 framework/multiple_vm.py -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 01/16] framework: add external thread pool library 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu @ 2018-01-10 0:10 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu ` (14 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:10 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Imported thread pool library as external library. Multiple VM module will create parallel threads based on this library. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/extra_libs/threadpool.py b/extra_libs/threadpool.py new file mode 100644 index 0000000..3839f26 --- /dev/null +++ b/extra_libs/threadpool.py @@ -0,0 +1,426 @@ +# -*- coding: UTF-8 -*- +"""Easy to use object-oriented thread pool framework. + +A thread pool is an object that maintains a pool of worker threads to perform +time consuming operations in parallel. It assigns jobs to the threads +by putting them in a work request queue, where they are picked up by the +next available thread. This then performs the requested operation in the +background and puts the results in another queue. + +The thread pool object can then collect the results from all threads from +this queue as soon as they become available or after all threads have +finished their work. It's also possible, to define callbacks to handle +each result as it comes in. + +The basic concept and some code was taken from the book "Python in a Nutshell, +2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section +14.5 "Threaded Program Architecture". I wrapped the main program logic in the +ThreadPool class, added the WorkRequest class and the callback system and +tweaked the code here and there. Kudos also to Florent Aide for the exception +handling mechanism. + +Basic usage:: + + >>> pool = ThreadPool(poolsize) + >>> requests = makeRequests(some_callable, list_of_args, callback) + >>> [pool.putRequest(req) for req in requests] + >>> pool.wait() + +See the end of the module code for a brief, annotated usage example. + +Website : http://chrisarndt.de/projects/threadpool/ + +""" +__docformat__ = "restructuredtext en" + +__all__ = [ + 'makeRequests', + 'NoResultsPending', + 'NoWorkersAvailable', + 'ThreadPool', + 'WorkRequest', + 'WorkerThread' +] + +__author__ = "Christopher Arndt" +__version__ = '1.3.2' +__license__ = "MIT license" + + +# standard library modules +import sys +import threading +import traceback + +try: + import Queue # Python 2 +except ImportError: + import queue as Queue # Python 3 + + +# exceptions +class NoResultsPending(Exception): + """All work requests have been processed.""" + pass + +class NoWorkersAvailable(Exception): + """No worker threads available to process remaining requests.""" + pass + + +# internal module helper functions +def _handle_thread_exception(request, exc_info): + """Default exception handler callback function. + + This just prints the exception info via ``traceback.print_exception``. + + """ + traceback.print_exception(*exc_info) + + +# utility functions +def makeRequests(callable_, args_list, callback=None, + exc_callback=_handle_thread_exception): + """Create several work requests for same callable with different arguments. + + Convenience function for creating several work requests for the same + callable where each invocation of the callable receives different values + for its arguments. + + ``args_list`` contains the parameters for each invocation of callable. + Each item in ``args_list`` should be either a 2-item tuple of the list of + positional arguments and a dictionary of keyword arguments or a single, + non-tuple argument. + + See docstring for ``WorkRequest`` for info on ``callback`` and + ``exc_callback``. + + """ + requests = [] + for item in args_list: + if isinstance(item, tuple): + requests.append( + WorkRequest(callable_, item[0], item[1], callback=callback, + exc_callback=exc_callback) + ) + else: + requests.append( + WorkRequest(callable_, [item], None, callback=callback, + exc_callback=exc_callback) + ) + return requests + + +# classes +class WorkerThread(threading.Thread): + """Background thread connected to the requests/results queues. + + A worker thread sits in the background and picks up work requests from + one queue and puts the results in another until it is dismissed. + + """ + + def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): + """Set up thread in daemonic mode and start it immediatedly. + + ``requests_queue`` and ``results_queue`` are instances of + ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a + new worker thread. + + """ + threading.Thread.__init__(self, **kwds) + self.setDaemon(1) + self._requests_queue = requests_queue + self._results_queue = results_queue + self._poll_timeout = poll_timeout + self._dismissed = threading.Event() + self.start() + + def run(self): + """Repeatedly process the job queue until told to exit.""" + while True: + if self._dismissed.isSet(): + # we are dismissed, break out of loop + break + # get next work request. If we don't get a new request from the + # queue after self._poll_timout seconds, we jump to the start of + # the while loop again, to give the thread a chance to exit. + try: + request = self._requests_queue.get(True, self._poll_timeout) + except Queue.Empty: + continue + else: + if self._dismissed.isSet(): + # we are dismissed, put back request in queue and exit loop + self._requests_queue.put(request) + break + try: + result = request.callable(*request.args, **request.kwds) + self._results_queue.put((request, result)) + except: + request.exception = True + self._results_queue.put((request, sys.exc_info())) + + def dismiss(self): + """Sets a flag to tell the thread to exit when done with current job. + """ + self._dismissed.set() + + +class WorkRequest: + """A request to execute a callable for putting in the request queue later. + + See the module function ``makeRequests`` for the common case + where you want to build several ``WorkRequest`` objects for the same + callable but with different arguments for each call. + + """ + + def __init__(self, callable_, args=None, kwds=None, requestID=None, + callback=None, exc_callback=_handle_thread_exception): + """Create a work request for a callable and attach callbacks. + + A work request consists of the a callable to be executed by a + worker thread, a list of positional arguments, a dictionary + of keyword arguments. + + A ``callback`` function can be specified, that is called when the + results of the request are picked up from the result queue. It must + accept two anonymous arguments, the ``WorkRequest`` object and the + results of the callable, in that order. If you want to pass additional + information to the callback, just stick it on the request object. + + You can also give custom callback for when an exception occurs with + the ``exc_callback`` keyword parameter. It should also accept two + anonymous arguments, the ``WorkRequest`` and a tuple with the exception + details as returned by ``sys.exc_info()``. The default implementation + of this callback just prints the exception info via + ``traceback.print_exception``. If you want no exception handler + callback, just pass in ``None``. + + ``requestID``, if given, must be hashable since it is used by + ``ThreadPool`` object to store the results of that work request in a + dictionary. It defaults to the return value of ``id(self)``. + + """ + if requestID is None: + self.requestID = id(self) + else: + try: + self.requestID = hash(requestID) + except TypeError: + raise TypeError("requestID must be hashable.") + self.exception = False + self.callback = callback + self.exc_callback = exc_callback + self.callable = callable_ + self.args = args or [] + self.kwds = kwds or {} + + def __str__(self): + return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ + (self.requestID, self.args, self.kwds, self.exception) + +class ThreadPool: + """A thread pool, distributing work requests and collecting results. + + See the module docstring for more information. + + """ + + def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): + """Set up the thread pool and start num_workers worker threads. + + ``num_workers`` is the number of worker threads to start initially. + + If ``q_size > 0`` the size of the work *request queue* is limited and + the thread pool blocks when the queue is full and it tries to put + more work requests in it (see ``putRequest`` method), unless you also + use a positive ``timeout`` value for ``putRequest``. + + If ``resq_size > 0`` the size of the *results queue* is limited and the + worker threads will block when the queue is full and they try to put + new results in it. + + .. warning: + If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is + the possibilty of a deadlock, when the results queue is not pulled + regularly and too many jobs are put in the work requests queue. + To prevent this, always set ``timeout > 0`` when calling + ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. + + """ + self._requests_queue = Queue.Queue(q_size) + self._results_queue = Queue.Queue(resq_size) + self.workers = [] + self.dismissedWorkers = [] + self.workRequests = {} + self.createWorkers(num_workers, poll_timeout) + + def createWorkers(self, num_workers, poll_timeout=5): + """Add num_workers worker threads to the pool. + + ``poll_timout`` sets the interval in seconds (int or float) for how + ofte threads should check whether they are dismissed, while waiting for + requests. + + """ + for i in range(num_workers): + self.workers.append(WorkerThread(self._requests_queue, + self._results_queue, poll_timeout=poll_timeout)) + + def dismissWorkers(self, num_workers, do_join=False): + """Tell num_workers worker threads to quit after their current task.""" + dismiss_list = [] + for i in range(min(num_workers, len(self.workers))): + worker = self.workers.pop() + worker.dismiss() + dismiss_list.append(worker) + + if do_join: + for worker in dismiss_list: + worker.join() + else: + self.dismissedWorkers.extend(dismiss_list) + + def joinAllDismissedWorkers(self): + """Perform Thread.join() on all worker threads that have been dismissed. + """ + for worker in self.dismissedWorkers: + worker.join() + self.dismissedWorkers = [] + + def putRequest(self, request, block=True, timeout=None): + """Put work request into work queue and save its id for later.""" + assert isinstance(request, WorkRequest) + # don't reuse old work requests + assert not getattr(request, 'exception', None) + self._requests_queue.put(request, block, timeout) + self.workRequests[request.requestID] = request + + def poll(self, block=False): + """Process any new results in the queue.""" + while True: + # still results pending? + if not self.workRequests: + raise NoResultsPending + # are there still workers to process remaining requests? + elif block and not self.workers: + raise NoWorkersAvailable + try: + # get back next results + request, result = self._results_queue.get(block=block) + # has an exception occured? + if request.exception and request.exc_callback: + request.exc_callback(request, result) + # hand results to callback, if any + if request.callback and not \ + (request.exception and request.exc_callback): + request.callback(request, result) + del self.workRequests[request.requestID] + except Queue.Empty: + break + except Exception as e: + traceback.print_exception(*sys.exc_info()) + # unexpected thing happened, need further dedbugging + import pdb + pdb.set_trace() + + def wait(self): + """Wait for results, blocking until all have arrived.""" + while 1: + try: + self.poll(True) + except NoResultsPending: + break + + +################ +# USAGE EXAMPLE +################ + +if __name__ == '__main__': + import random + import time + + # the work the threads will have to do (rather trivial in our example) + def do_something(data): + time.sleep(random.randint(1,5)) + result = round(random.random() * data, 5) + # just to show off, we throw an exception once in a while + if result > 5: + raise RuntimeError("Something extraordinary happened!") + return result + + # this will be called each time a result is available + def print_result(request, result): + print("**** Result from request #%s: %r" % (request.requestID, result)) + + # this will be called when an exception occurs within a thread + # this example exception handler does little more than the default handler + def handle_exception(request, exc_info): + if not isinstance(exc_info, tuple): + # Something is seriously wrong... + print(request) + print(exc_info) + raise SystemExit + print("**** Exception occured in request #%s: %s" % \ + (request.requestID, exc_info)) + + # assemble the arguments for each job to a list... + data = [random.randint(1,10) for i in range(20)] + # ... and build a WorkRequest object for each item in data + requests = makeRequests(do_something, data, print_result, handle_exception) + # to use the default exception handler, uncomment next line and comment out + # the preceding one. + #requests = makeRequests(do_something, data, print_result) + + # or the other form of args_lists accepted by makeRequests: ((,), {}) + data = [((random.randint(1,10),), {}) for i in range(20)] + requests.extend( + makeRequests(do_something, data, print_result, handle_exception) + #makeRequests(do_something, data, print_result) + # to use the default exception handler, uncomment next line and comment + # out the preceding one. + ) + + # we create a pool of 3 worker threads + print("Creating thread pool with 3 worker threads.") + main = ThreadPool(3) + + # then we put the work requests in the queue... + for req in requests: + main.putRequest(req) + print("Work request #%s added." % req.requestID) + # or shorter: + # [main.putRequest(req) for req in requests] + + # ...and wait for the results to arrive in the result queue + # by using ThreadPool.wait(). This would block until results for + # all work requests have arrived: + # main.wait() + + # instead we can poll for results while doing something else: + i = 0 + while True: + try: + time.sleep(0.5) + main.poll() + print("Main thread working...") + print("(active worker threads: %i)" % (threading.activeCount()-1, )) + if i == 10: + print("**** Adding 3 more worker threads...") + main.createWorkers(3) + if i == 20: + print("**** Dismissing 2 worker threads...") + main.dismissWorkers(2) + i += 1 + except KeyboardInterrupt: + print("**** Interrupted!") + break + except NoResultsPending: + print("**** No pending results.") + break + if main.dismissedWorkers: + print("Joining all dismissed worker threads...") + main.joinAllDismissedWorkers() -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 02/16] framework/multiple_vm: add multiple VM management module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 01/16] framework: add external thread pool library Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 03/16] framework/utils: support locks for parallel model Marvin Liu ` (13 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Support parallel VMs start and executions in multiple threads. In the init function, will create thread pool instance. Then call add parallel task function with command 'start'|'cmd'| 'migration'|'stop', related tasks will be added into task pool. If call function do_parallel_task, all tasks will be retrieved and be done in parallel threads. When that function return, mean that all threads have been done and return results saved. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/multiple_vm.py b/framework/multiple_vm.py new file mode 100644 index 0000000..04de6e4 --- /dev/null +++ b/framework/multiple_vm.py @@ -0,0 +1,304 @@ +#!/usr/bin/python +import time +import re +import threadpool +import traceback +import threading +from settings import DTS_ERR_TBL, save_global_setting, DTS_PARALLEL_SETTING +from utils import RED +from logger import getLogger + + +class MultipleVM(object): + """ + Module for handle VM related actions in parallel on mulitple DUTs + Supported actions: [start|command|migration] + Param max_vm: maximum number of threads + Param duts: list of DUT objects + """ + def __init__(self, max_vm, duts): + self.max_vm = max_vm + self.duts = duts + self.pool = threadpool.ThreadPool(max_vm) + self.pool_result = [dict() for _ in duts] + self._pool_requests = list() + self._pool_executors = dict() + self.logger = getLogger("multiple_vm") + + self.logger.info("Created MultipleVM instance with %d DUTs and %d VMs" % (len(duts), max_vm)) + + def parallel_vm_start(self, args): + """ + Start VMs in parallel. + Args format: + { + 'name': 'VM0', + 'dut_id': 1, + 'autodetect_topo': False, + 'virt_config': { 'suite_name': '', + 'vm_name': '', + } + 'virt_params' : { + 'qemu': [{'path': '/usr/local/bin/qemu-system-x86_64'}, + 'cpu': [{'model': 'host', 'number': '1'}, + 'mem': [{'size': '1024', 'hugepage': 'yes'}, + 'disk': [{'file': 'vm0.qcow2'}, + 'login': [{'user': 'root', 'password': 'root'}, + 'vnc' : [{'displayNum': '2'}, + 'device': [{'driver': 'vfio-pci', 'opt_host': '0000:82:00.1'}] + [{'driver': 'vhost-user', 'opt_path': '/tmp/vhost-user-0', + 'opt_mac': '', + 'opt_legacy': 'on' | 'off'}], + 'migration': [{'enable': 'yes'}] + } + + 'command': '' + } + + return format: + { + 'name': 'VM0', + 'dut_id' : 1, + 'vm_obj': vm_obj + } + """ + + result = {} + vm_name = args['name'] + dut_id = args['dut_id'] + + if 'autodetect_topo' in args: + autodetect_topo = args['autodetect_topo'] + else: + autodetect_topo = True + + self.logger.info("Parallel task start for DUT%d %s" % (dut_id, vm_name)) + threading.current_thread().name = vm_name + + from qemu_kvm import QEMUKvm + # VM configured by configuration file + if 'virt_config' in args: + suite_name = args['virt_config']['suite_name'] + vm_name = args['virt_config']['vm_name'] + vm_obj = QEMUKvm(self.duts[dut_id], vm_name, suite_name) + if 'virt_params' in args: + virt_params = args['virt_params'] + else: + virt_params = dict() + else: + # VM configured by parameters + vm_obj = QEMUKvm(self.duts[dut_id], vm_name, 'multi_vm') + virt_params = args['virt_params'] + # just save config, should be list + vm_obj.set_local_config([virt_params]) + + vm_dut = None + + if vm_obj.check_alive(): + self.logger.debug("Check VM[%s] is alive" % vm_name) + vm_obj.attach() + self.logger.debug("VM[%s] attach is done" % vm_name) + if 'migration' in virt_params: + self.logger.debug("Immigrated VM[%s] is ready" % vm_name) + else: + vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo) + self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name) + else: + vm_obj.quick_start() + self.duts[dut_id].logger.debug("VM[%s] quick start is done" % vm_name) + if 'migration' in virt_params: + self.logger.debug("Immigrated VM[%s] is ready" % vm_name) + else: + vm_obj._check_vm_status() + self.logger.debug("VM[%s] check status is done" % vm_name) + vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo) + self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name) + + result['name'] = vm_name + result['dut_id'] = dut_id + result['vm_obj'] = vm_obj + result['vm_dut'] = vm_dut + self.logger.info("Parallel task DUT%d %s Done and returned" % (dut_id, vm_name)) + return result + + def parallel_vm_stop(self, args): + NotImplemented + + def parallel_vm_command(self, args): + """ + Run commands in parallel. + Args format: + { + 'name': 'vm1', + 'vm_dut': self.vm_dut, + 'dut_id': 0, + 'commands': ['cd dpdk', 'make install T=x86_64-native-linuxapp-gcc'], + 'expects': ['#', "#"], + 'timeouts': [5, 120], + } + """ + result = {} + vm_name = args['name'] + vm_dut = args['vm_dut'] + dut_id = args['dut_id'] + commands = args['commands'] + expects = args['expects'] + timeouts = args['timeouts'] + outputs = [] + + if 'delay' in args: + time.sleep(args['delay']) + + self.logger.debug("Parallel task start for DUT%d %s" % (dut_id, vm_name)) + + combinations = zip(commands, expects, timeouts) + for combine in combinations: + command, expect, timeout = combine + # timeout value need enlarge if vm number increased + add_time = int(self.max_vm * 0.5) + timeout += add_time + if len(expect) == 0: + output = vm_dut.send_command(command, timeout) + else: + output = vm_dut.send_expect(command, expect, timeout) + outputs.append(output) + + result['name'] = vm_name + result['dut_id'] = dut_id + result['outputs'] = outputs + self.logger.debug("Parallel task for DUT%d %s has been done and returned" % (dut_id, vm_name)) + + return result + + def parallel_vm_migration(self, args): + """ + Do vm migartion action in parallel. + Args format: + { + 'name': 'vm1', + 'vm_obj': self.vm_obj, + 'remote_ip': host2_ip, + 'migrage_port': 6666, + } + """ + result = {} + vm_name = args['name'] + vm_obj = args['vm_obj'] + dut_id = args['dut_id'] + remote_ip = args['remote_ip'] + migrate_port = args['migrate_port'] + + vm_obj.start_migration(remote_ip, migrate_port) + vm_obj.wait_migration_done() + + result['name'] = vm_name + result['dut_id'] = dut_id + + return result + + def save_result(self, request, result): + """ + Save result in local variable, will be used later + """ + self.pool_result[result['dut_id']][result['name']] = result + self.pool_result[result['dut_id']][result['name']]['status'] = 0 + + def handle_vm_exception(self, request, exc_info): + """ + Handle exception when do parallel task + should check vm status in this function + """ + if not isinstance(exc_info, tuple): + # Something is seriously wrong... + print(request) + print(exc_info) + raise SystemExit + + # print traceback info for exception + name = request.args[0]['name'] + self.logger.error(("**** Exception occured DUT%d:%s" % (request.args[0]['dut_id'], name))) + exc_type, exc_value, exc_traceback = exc_info + self.logger.error(repr(traceback.format_tb(exc_traceback))) + + result = {'name': name, 'dut_id': request.args[0]['dut_id']} + self.pool_result[result['dut_id']][result['name']] = result + self.pool_result[result['dut_id']][result['name']]['status'] = DTS_ERR_TBL["PARALLEL_EXECUTE_ERR"] + + def add_parallel_task(self, action, config): + """ + Add task into parallel pool, will call corresponding function later + based on action type. + """ + if action == "start": + task = self.parallel_vm_start + data = config + elif action == "stop": + task = self.parallel_vm_stop + data = config['name'] + elif action == "cmd": + # just string command by now + task = self.parallel_vm_command + data = config + elif action == "migration": + task = self.parallel_vm_migration + data = config + + # due to threadpool request, one item + request = threadpool.makeRequests(task, [data], self.save_result, self.handle_vm_exception) + self._pool_requests.extend(request) + + def do_parallel_task(self): + """ + Do configured tasks in parallel, will return if all tasks finished + """ + # set parallel mode + save_global_setting(DTS_PARALLEL_SETTING, 'yes') + + self.pool_result = [dict() for _ in self.duts] + for req in self._pool_requests: + self.pool.putRequest(req) + + self.logger.info("All parallel tasks start at %s" % time.ctime()) + # clean the request queue + self._pool_requests = list() + + while True: + try: + time.sleep(0.5) + self.pool.poll() + except threadpool.NoResultsPending: + self.logger.info("All parallel tasks have been done at %s" % time.ctime()) + break + except Exception as e: + self.logger.error("Met exception %s" % (str(e))) + break + + # clear pool related queues, clean thread + self.pool._requests_queue.queue.clear() + self.pool._results_queue.queue.clear() + + time.sleep(2) + + # exit from parallel mode + save_global_setting(DTS_PARALLEL_SETTING, 'no') + + def get_parallel_result(self): + """ + Return result information for this parallel task + """ + return self.pool_result + + def list_threads(self): + main_thread = threading.currentThread() + for t in threading.enumerate(): + if t is main_thread: + continue + self.logger.error("thread [%s] is still activing" % t.getName()) + + def destroy_parallel(self): + """ + Destroy created threads otherwise threads may can't created + """ + self.pool.dismissWorkers(self.max_vm, do_join=True) + self.pool.wait() + self.list_threads() -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 03/16] framework/utils: support locks for parallel model 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 01/16] framework: add external thread pool library Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 04/16] framework: add DUT index support Marvin Liu ` (12 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Add parallel lock support which can protect critical resources and actions. Parallel locks are function level and separated between DUTs. 2. Add user-defined serialzer function support in pprint function. 3. Remove rsa key action will only do once for all virtual machines. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/utils.py b/framework/utils.py index 1ecef09..762c927 100644 --- a/framework/utils.py +++ b/framework/utils.py @@ -35,9 +35,95 @@ import os import inspect import socket import struct +import threading +import types +from functools import wraps DTS_ENV_PAT = r"DTS_*" +def create_parallel_locks(num_duts): + """ + Create thread lock dictionary based on DUTs number + """ + global locks_info + locks_info = [] + for _ in range(num_duts): + lock_info = dict() + lock_info['update_lock'] = threading.RLock() + locks_info.append(lock_info) + + +def parallel_lock(num=1): + """ + Wrapper function for protect parallel threads, allow mulitple threads + share one lock. Locks are created based on function name. Thread locks are + separated between duts according to argument 'dut_id'. + Parameter: + num: Number of parallel threads for the lock + """ + global locks_info + + def decorate(func): + @wraps(func) + def wrapper(*args, **kwargs): + if 'dut_id' in kwargs: + dut_id = kwargs['dut_id'] + else: + dut_id = 0 + + # in case function arguments is not correct + if dut_id >= len(locks_info): + dut_id = 0 + + lock_info = locks_info[dut_id] + uplock = lock_info['update_lock'] + + name = func.__name__ + uplock.acquire() + + if name not in lock_info: + lock_info[name] = dict() + lock_info[name]['lock'] = threading.RLock() + lock_info[name]['current_thread'] = 1 + else: + lock_info[name]['current_thread'] += 1 + + lock = lock_info[name]['lock'] + + # make sure when owned global lock, should also own update lock + if lock_info[name]['current_thread'] >= num: + if lock._is_owned(): + print RED("DUT%d %s waiting for func lock %s" % (dut_id, + threading.current_thread().name, func.__name__)) + lock.acquire() + else: + uplock.release() + + try: + ret = func(*args, **kwargs) + except Exception as e: + if not uplock._is_owned(): + uplock.acquire() + + if lock._is_owned(): + lock.release() + lock_info[name]['current_thread'] = 0 + uplock.release() + raise e + + if not uplock._is_owned(): + uplock.acquire() + + if lock._is_owned(): + lock.release() + lock_info[name]['current_thread'] = 0 + + uplock.release() + + return ret + return wrapper + return decorate + def RED(text): return "\x1B[" + "31;1m" + str(text) + "\x1B[" + "0m" @@ -51,11 +137,11 @@ def GREEN(text): return "\x1B[" + "32;1m" + str(text) + "\x1B[" + "0m" -def pprint(some_dict): +def pprint(some_dict, serialzer=None): """ Print JSON format dictionary object. """ - return json.dumps(some_dict, sort_keys=True, indent=4) + return json.dumps(some_dict, sort_keys=True, indent=4, default=serialzer) def regexp(s, to_match, allString=False): @@ -83,26 +169,13 @@ def get_obj_funcs(obj, func_name_regex): yield func +@parallel_lock() def remove_old_rsa_key(crb, ip): """ Remove the old RSA key of specified IP on crb. """ - if ':' not in ip: - ip = ip.strip() - port = '' - else: - addr = ip.split(':') - ip = addr[0].strip() - port = addr[1].strip() - rsa_key_path = "~/.ssh/known_hosts" - if port: - remove_rsa_key_cmd = "sed -i '/^\[%s\]:%d/d' %s" % \ - (ip.strip(), int( - port), rsa_key_path) - else: - remove_rsa_key_cmd = "sed -i '/^%s/d' %s" % \ - (ip.strip(), rsa_key_path) + remove_rsa_key_cmd = "sed -i '/%s/d' %s" % (ip, rsa_key_path) crb.send_expect(remove_rsa_key_cmd, "# ") -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 04/16] framework: add DUT index support 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (2 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 03/16] framework/utils: support locks for parallel model Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 05/16] framework/logger: optimize output format for threads Marvin Liu ` (11 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. All CRBs will have index concept, the index value is assigned when CRB instantiating. 2. Alternative session is not must for virtual DUT. Thus can save lots of system resource when starting many vitual machines. 3. Virtual environment setup will remove all related ssh rsa keys. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/crb.py b/framework/crb.py index dd29a8b..7affce7 100644 --- a/framework/crb.py +++ b/framework/crb.py @@ -49,7 +49,8 @@ class Crb(object): CPU/PCI/NIC on the board and setup running environment for DPDK. """ - def __init__(self, crb, serializer, name): + def __init__(self, crb, serializer, name, alt_session=True, dut_id=0): + self.dut_id = dut_id self.crb = crb self.read_cache = False self.skip_setup = False @@ -62,14 +63,18 @@ class Crb(object): self.logger = getLogger(name) self.session = SSHConnection(self.get_ip_address(), name, self.get_username(), - self.get_password()) + self.get_password(), dut_id) self.session.init_log(self.logger) - self.alt_session = SSHConnection( - self.get_ip_address(), - name + '_alt', - self.get_username(), - self.get_password()) - self.alt_session.init_log(self.logger) + if alt_session: + self.alt_session = SSHConnection( + self.get_ip_address(), + name + '_alt', + self.get_username(), + self.get_password(), + dut_id) + self.alt_session.init_log(self.logger) + else: + self.alt_session = None def send_expect(self, cmds, expected, timeout=TIMEOUT, alt_session=False, verify=False): @@ -78,8 +83,8 @@ class Crb(object): there's no expected string found before timeout, TimeoutException will be raised. """ - - if alt_session: + # sometimes there will be no alt_session like VM dut + if alt_session and self.alt_session: return self.alt_session.session.send_expect(cmds, expected, timeout, verify) @@ -93,7 +98,8 @@ class Crb(object): session = SSHConnection(self.get_ip_address(), name, self.get_username(), - self.get_password()) + self.get_password(), + dut_id = self.dut_id) session.init_log(logger) self.sessions.append(session) return session @@ -104,7 +110,7 @@ class Crb(object): """ for save_session in self.sessions: if save_session == session: - save_session.close() + save_session.close(force=True) logger = getLogger(save_session.name) logger.logger_exit() self.sessions.remove(save_session) @@ -141,7 +147,7 @@ class Crb(object): Send commands to crb and return string before timeout. """ - if alt_session: + if alt_session and self.alt_session: return self.alt_session.session.send_command(cmds, timeout) return self.session.send_command(cmds, timeout) @@ -167,7 +173,7 @@ class Crb(object): "awk '/HugePages_Total/ { print $2 }' /proc/meminfo", "# ", alt_session=True) if huge_pages != "": - return int(huge_pages) + return int(huge_pages.split()[0]) return 0 def mount_huge_pages(self): @@ -220,9 +226,6 @@ class Crb(object): """ self.base_dir = base_dir - def set_virttype(self, virttype): - self.virttype = virttype - def admin_ports(self, port, status): """ Force set port's interface status. diff --git a/framework/dut.py b/framework/dut.py index 22ff0bb..9c2a5a8 100644 --- a/framework/dut.py +++ b/framework/dut.py @@ -39,7 +39,7 @@ from ssh_connection import SSHConnection from crb import Crb from net_device import GetNicObj from virt_resource import VirtResource -from utils import RED +from utils import RED, remove_old_rsa_key from uuid import uuid4 @@ -60,9 +60,9 @@ class Dut(Crb): CORE_LIST_CACHE_KEY = 'dut_core_list' PCI_DEV_CACHE_KEY = 'dut_pci_dev_info' - def __init__(self, crb, serializer): + def __init__(self, crb, serializer, dut_id): self.NAME = 'dut' + LOG_NAME_SEP + '%s' % crb['My IP'] - super(Dut, self).__init__(crb, serializer, self.NAME) + super(Dut, self).__init__(crb, serializer, self.NAME, alt_session=True, dut_id=dut_id) self.host_init_flag = False self.number_of_cores = 0 @@ -76,29 +76,35 @@ class Dut(Crb): # hypervisor pid list, used for cleanup self.virt_pids = [] - def init_host_session(self): - if self.host_init_flag: - pass - else: - self.host_session = SSHConnection( - self.get_ip_address(), - self.NAME + '_host', - self.get_username(), - self.get_password()) - self.host_session.init_log(self.logger) - self.host_init_flag = True + def init_host_session(self, vm_name): + """ + Create session for each VM, session will be handled by VM instance + """ + self.host_session = SSHConnection( + self.get_ip_address(), + vm_name + '_host', + self.get_username(), + self.get_password()) + self.host_session.init_log(self.logger) + self.logger.info("[%s] create new session for VM" % (threading.current_thread().name)) def new_session(self, suite=""): """ Create new session for dut instance. Session name will be unique. """ - session_name = self.NAME + '_' + str(uuid4()) + if len(suite): + session_name = self.NAME + '_' + suite + else: + session_name = self.NAME + '_' + str(uuid4()) session = self.create_session(name=session_name) if suite != "": session.logger.config_suite(suite, self.NAME) else: session.logger.config_execution(self.NAME) - session.send_expect("cd %s" % self.base_dir, "# ") + + if getattr(self, "base_dir", None): + session.send_expect("cd %s" % self.base_dir, "# ") + return session def close_session(self, session): @@ -366,11 +372,11 @@ class Dut(Crb): return False def get_dpdk_bind_script(self): - op = self.send_command("ls") + op = self.send_expect("ls", "#") if "usertools" in op: res = 'usertools/dpdk-devbind.py' else: - op = self.send_command("ls tools") + op = self.send_expect("ls tools", "#") if "dpdk_nic_bind.py" in op: res = 'tools/dpdk_nic_bind.py' else: @@ -788,6 +794,14 @@ class Dut(Crb): self.ports_info.append({'port': port, 'pci': pci_str, 'type': pci_id, 'intf': intf, 'mac': macaddr, 'ipv6': ipv6, 'numa': -1}) + def setup_virtenv(self, virttype): + """ + Setup current virtualization hypervisor type and remove elder VM ssh keys + """ + self.virttype = virttype + # remove VM ras keys from tester + remove_old_rsa_key(self.tester, self.crb['My IP']) + def generate_sriov_vfs_by_port(self, port_id, vf_num, driver='default'): """ Generate SRIOV VFs with default driver it is bound now or specifid driver. diff --git a/framework/project_dpdk.py b/framework/project_dpdk.py index 1f673c9..40c862a 100644 --- a/framework/project_dpdk.py +++ b/framework/project_dpdk.py @@ -49,8 +49,8 @@ class DPDKdut(Dut): build, memory and kernel module. """ - def __init__(self, crb, serializer): - super(DPDKdut, self).__init__(crb, serializer) + def __init__(self, crb, serializer, dut_id): + super(DPDKdut, self).__init__(crb, serializer, dut_id) self.testpmd = None def set_target(self, target, bind_dev=True): @@ -447,7 +447,7 @@ class DPDKtester(Tester): interface and generate port map. """ - def __init__(self, crb, serializer): + def __init__(self, crb, serializer, dut_id): self.NAME = "tester" super(DPDKtester, self).__init__(crb, serializer) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 05/16] framework/logger: optimize output format for threads 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (3 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 04/16] framework: add DUT index support Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 06/16] framework/dts: support multiple VMs module Marvin Liu ` (10 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/logger.py b/framework/logger.py index 1018674..78e90d6 100644 --- a/framework/logger.py +++ b/framework/logger.py @@ -35,7 +35,7 @@ import sys import inspect import re -from settings import LOG_NAME_SEP, FOLDERS +from settings import LOG_NAME_SEP, FOLDERS, load_global_setting, DTS_PARALLEL_SETTING from utils import RED """ @@ -87,10 +87,9 @@ logging.addLevelName(logging.SUITE_TESTER_OUTPUT, 'SUITE_TESTER_OUTPUT') logging.addLevelName(logging.DTS_IXIA_CMD, 'DTS_IXIA_CMD') logging.addLevelName(logging.DTS_IXIA_OUTPUT, 'DTS_IXIA_OUTPUT') -message_fmt = '%(asctime)s %(levelname)20s: %(message)s' date_fmt = '%d/%m/%Y %H:%M:%S' RESET_COLOR = '\033[0m' -stream_fmt = '%(color)s%(levelname)20s: %(message)s' + RESET_COLOR +stream_fmt = '%(color)s%(name)30s: %(message)s' + RESET_COLOR log_dir = None @@ -99,13 +98,6 @@ def set_verbose(): verbose = True -def add_salt(salt, msg): - if not salt: - return msg - else: - return '[%s] ' % salt + str(msg) - - class BaseLoggerAdapter(logging.LoggerAdapter): """ Upper layer of original logging module. @@ -212,8 +204,6 @@ class DTSLOG(BaseLoggerAdapter): self.crb = crb super(DTSLOG, self).__init__(self.logger, dict(crb=self.crb)) - self.salt = '' - self.fh = None self.ch = None @@ -226,6 +216,11 @@ class DTSLOG(BaseLoggerAdapter): """ Config stream handler and file handler. """ + if load_global_setting(DTS_PARALLEL_SETTING) == 'yes': + message_fmt = '%(asctime)s %(name)30s %(threadName)s: %(message)s' + else: + message_fmt = '%(asctime)s %(name)30s: %(message)s' + fh.setFormatter(logging.Formatter(message_fmt, date_fmt)) ch.setFormatter(logging.Formatter(stream_fmt, date_fmt)) @@ -251,28 +246,24 @@ class DTSLOG(BaseLoggerAdapter): """ DTS warnning level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.warn_lvl, message) def info(self, message): """ DTS information level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.info_lvl, message) def error(self, message): """ DTS error level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.error_lvl, message) def debug(self, message): """ DTS debug level log function. """ - message = add_salt(self.salt, message) self.logger.log(self.debug_lvl, message) def set_logfile_path(self, path): @@ -304,34 +295,20 @@ class DTSLOG(BaseLoggerAdapter): ch = ColorHandler() self.__log_handler(fh, ch) - def set_salt(crb, start_flag): - if LOG_NAME_SEP in crb: - old = '%s%s' % (start_flag, LOG_NAME_SEP) - if not self.salt: - self.salt = crb.replace(old, '', 1) - if crb.startswith('dut'): self.info_lvl = logging.DTS_DUT_CMD self.debug_lvl = logging.DTS_DUT_OUTPUT self.warn_lvl = logging.DTS_DUT_RESULT - - set_salt(crb, 'dut') elif crb.startswith('tester'): self.info_lvl = logging.DTS_TESTER_CMD self.debug_lvl = logging.DTS_TESTER_OUTPUT self.warn_lvl = logging.DTS_TESTER_RESULT - - set_salt(crb, 'tester') elif crb.startswith('ixia'): self.info_lvl = logging.DTS_IXIA_CMD self.debug_lvl = logging.DTS_IXIA_OUTPUT - - set_salt(crb, 'ixia') elif crb.startswith('virtdut'): self.info_lvl = logging.DTS_VIRTDUT_CMD self.debug_lvl = logging.DTS_VIRTDUT_OUTPUT - - set_salt(crb, 'virtdut') else: self.error_lvl = logging.ERROR self.warn_lvl = logging.WARNING -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 06/16] framework/dts: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (4 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 05/16] framework/logger: optimize output format for threads Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 07/16] framework/debugger: " Marvin Liu ` (9 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Add external library folder into default system path. 2. Initialize global parallel lock structure before execution. 3. Init DUTs and tester with index argument. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/dts.py b/framework/dts.py index 581d282..0b2240c 100644 --- a/framework/dts.py +++ b/framework/dts.py @@ -58,7 +58,7 @@ import logger import debugger from config import CrbsConf from checkCase import CheckCase -from utils import get_subclasses, copy_instance_attr +from utils import get_subclasses, copy_instance_attr, create_parallel_locks import sys reload(sys) sys.setdefaultencoding('UTF8') @@ -211,7 +211,7 @@ def dts_run_commands(crb, dts_commands): raise VerifyFailure("Command execution failed") -def get_project_obj(project_name, super_class, crbInst, serializer): +def get_project_obj(project_name, super_class, crbInst, serializer, dut_id): """ Load project module and return crb instance. """ @@ -221,12 +221,12 @@ def get_project_obj(project_name, super_class, crbInst, serializer): project_module = __import__(PROJECT_MODULE_PREFIX + project_name) for project_subclassname, project_subclass in get_subclasses(project_module, super_class): - project_obj = project_subclass(crbInst, serializer) + project_obj = project_subclass(crbInst, serializer, dut_id) if project_obj is None: - project_obj = super_class(crbInst, serializer) + project_obj = super_class(crbInst, serializer, dut_id) except Exception as e: log_handler.info("LOAD PROJECT MODULE INFO: " + str(e)) - project_obj = super_class(crbInst, serializer) + project_obj = super_class(crbInst, serializer, dut_id) return project_obj @@ -280,13 +280,15 @@ def dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serialize testInst = copy.copy(crbInsts[0]) testInst['My IP'] = crbInsts[0]['tester IP'] - tester = get_project_obj(project, Tester, testInst, serializer) + tester = get_project_obj(project, Tester, testInst, serializer, dut_id=0) + dut_id = 0 for crbInst in crbInsts: dutInst = copy.copy(crbInst) dutInst['My IP'] = crbInst['IP'] - dutobj = get_project_obj(project, Dut, dutInst, serializer) + dutobj = get_project_obj(project, Dut, dutInst, serializer, dut_id=dut_id) duts.append(dutobj) + dut_id += 1 dts_log_execution(duts, tester, log_handler) @@ -298,7 +300,7 @@ def dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serialize nic = settings.load_global_setting(settings.HOST_NIC_SETTING) for dutobj in duts: dutobj.tester = tester - dutobj.set_virttype(virttype) + dutobj.setup_virtenv(virttype) dutobj.set_speedup_options(read_cache, skip_setup) dutobj.set_directory(base_dir) # save execution nic setting @@ -463,6 +465,11 @@ def run_all(config_file, pkgName, git, patch, skip_setup, if not os.path.exists(output_dir): os.mkdir(output_dir) + # add external library + exec_file = os.path.realpath(__file__) + extra_libs_path = exec_file.replace('framework/dts.py', '') + 'extra_libs' + sys.path.insert(1, extra_libs_path) + # add python module search path sys.path.append(suite_dir) @@ -537,6 +544,9 @@ def run_all(config_file, pkgName, git, patch, skip_setup, result.dut = duts[0] + # init global lock + create_parallel_locks(len(duts)) + # init dut, tester crb duts, tester = dts_crbs_init(crbInsts, skip_setup, read_cache, project, base_dir, serializer, virttype) tester.set_re_run(re_run) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 07/16] framework/debugger: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (5 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 06/16] framework/dts: support multiple VMs module Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH 08/16] framework/ssh_pexpect: " Marvin Liu ` (8 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Disable signal when running in parallel mode since signal is not support in child thread. 2. Support connect to session by integer index argument. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/debugger.py b/framework/debugger.py index 5c02414..8018f55 100644 --- a/framework/debugger.py +++ b/framework/debugger.py @@ -28,8 +28,8 @@ import signal import code import time import imp -from settings import load_global_setting, DEBUG_SETTING -from utils import get_subclasses, copy_instance_attr +from settings import load_global_setting, DEBUG_SETTING, DTS_PARALLEL_SETTING +from utils import get_subclasses, copy_instance_attr, GREEN from test_case import TestCase @@ -71,10 +71,16 @@ def connect_command(connect): Connect to ssh session and give control to user. """ from ssh_connection import CONNECTIONS - for connection in CONNECTIONS: - for name, session in connection.items(): - if name == connect: - session.session.interact() + if type(connect) == int: + name, session = CONNECTIONS[connect].items()[0] + print GREEN("Connecting to session[%s]" % name) + session.session.interact() + else: + for connection in CONNECTIONS: + for name, session in connection.items(): + if name == connect: + print GREEN("Connecting to session[%s]" % name) + session.session.interact() def rerun_command(): @@ -164,7 +170,8 @@ def ignore_keyintr(): """ Temporary disable interrupt handler. """ - if load_global_setting(DEBUG_SETTING) != 'yes': + # signal can't be used in thread + if load_global_setting(DEBUG_SETTING) != 'yes' or load_global_setting(DTS_PARALLEL_SETTING) == 'yes': return global debug_cmd @@ -180,7 +187,8 @@ def aware_keyintr(): """ Reenable interrupt handler. """ - if load_global_setting(DEBUG_SETTING) != 'yes': + # signal can't be used in thread + if load_global_setting(DEBUG_SETTING) != 'yes' or load_global_setting(DTS_PARALLEL_SETTING) == 'yes': return return signal.signal(signal.SIGINT, keyboard_handle) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH 08/16] framework/ssh_pexpect: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (6 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 07/16] framework/debugger: " Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 09/16] framework/ssh_connection: " Marvin Liu ` (7 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Protect ssh connection action by parallel lock, number is limited to DUT SSH service. 2. Enlarge terminal column for serial output. 3. Enhance error handling when exception happened. 4. Add DUT index argument when connection CRB which can separate locks. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/ssh_pexpect.py b/framework/ssh_pexpect.py index 8d6c3dd..3c988b7 100644 --- a/framework/ssh_pexpect.py +++ b/framework/ssh_pexpect.py @@ -3,7 +3,7 @@ import pexpect from pexpect import pxssh from debugger import ignore_keyintr, aware_keyintr from exception import TimeoutException, SSHConnectionException, SSHSessionDeadException -from utils import RED, GREEN +from utils import RED, GREEN, parallel_lock """ Module handle ssh sessions between tester and DUT. @@ -14,31 +14,47 @@ Aslo support transfer files to tester or DUT. class SSHPexpect(object): - def __init__(self, host, username, password): + def __init__(self, host, username, password, dut_id): self.magic_prompt = "MAGIC PROMPT" + self.logger = None + + self.host = host + self.username = username + self.password = password + + self._connect_host(dut_id=dut_id) + + @parallel_lock(num=8) + def _connect_host(self, dut_id=0): + """ + Create connection to assigned crb, parameter dut_id will be used in + parallel_lock thus can assure isolated locks for each crb. + Parallel ssh connections are limited to MaxStartups option in SSHD + configuration file. By default concurrent number is 10, so default + threads number is limited to 8 which less than 10. Lock number can + be modified along with MaxStartups value. + """ try: self.session = pxssh.pxssh() - self.host = host - self.username = username - self.password = password - if ':' in host: - self.ip = host.split(':')[0] - self.port = int(host.split(':')[1]) + if ':' in self.host: + self.ip = self.host.split(':')[0] + self.port = int(self.host.split(':')[1]) self.session.login(self.ip, self.username, self.password, original_prompt='[$#>]', port=self.port, login_timeout=20) else: self.session.login(self.host, self.username, self.password, original_prompt='[$#>]') - self.send_expect('stty -echo', '# ', timeout=2) - except Exception, e: + self.send_expect('stty -echo', '#') + self.send_expect('stty columns 1000', "#") + except Exception as e: print RED(e) if getattr(self, 'port', None): suggestion = "\nSuggession: Check if the fireware on [ %s ] " % \ self.ip + "is stoped\n" print GREEN(suggestion) - raise SSHConnectionException(host) + raise SSHConnectionException(self.host) def init_log(self, logger, name): self.logger = logger @@ -56,24 +72,33 @@ class SSHPexpect(object): return before def send_expect(self, command, expected, timeout=15, verify=False): - ret = self.send_expect_base(command, expected, timeout) - if verify: - ret_status = self.send_expect_base("echo $?", expected, timeout) - if not int(ret_status): - return ret + + try: + ret = self.send_expect_base(command, expected, timeout) + if verify: + ret_status = self.send_expect_base("echo $?", expected, timeout) + if not int(ret_status): + return ret + else: + self.logger.error("Command: %s failure!" % command) + self.logger.error(ret) + return int(ret_status) else: - self.logger.error("Command: %s failure!" % command) - self.logger.error(ret) - return int(ret_status) - else: - return ret + return ret + except Exception as e: + print RED("Exception happened in [%s] and output is [%s]" % (command, self.get_output_before())) + raise(e) def send_command(self, command, timeout=1): - ignore_keyintr() - self.clean_session() - self.__sendline(command) - aware_keyintr() - return self.get_session_before(timeout) + try: + ignore_keyintr() + self.clean_session() + self.__sendline(command) + aware_keyintr() + except Exception as e: + raise(e) + + return self.get_session_before(timeout=timeout) def clean_session(self): self.get_session_before(timeout=0.01) @@ -90,8 +115,9 @@ class SSHPexpect(object): pass aware_keyintr() - before = self.get_output_before() + before = self.get_output_all() self.__flush() + return before def __flush(self): @@ -116,7 +142,6 @@ class SSHPexpect(object): def get_output_before(self): if not self.isalive(): raise SSHSessionDeadException(self.host) - self.session.flush() before = self.session.before.rsplit('\r\n', 1) if before[0] == "[PEXPECT]": before[0] = "" @@ -124,7 +149,6 @@ class SSHPexpect(object): return before[0] def get_output_all(self): - self.session.flush() output = self.session.before output.replace("[PEXPECT]", "") return output -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 09/16] framework/ssh_connection: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (7 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH 08/16] framework/ssh_pexpect: " Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 10/16] framework/settings: add parallel related settings Marvin Liu ` (6 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Support DUT index argument. 2. Remove logger handler when close the connection. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/ssh_connection.py b/framework/ssh_connection.py index 675279b..915d081 100644 --- a/framework/ssh_connection.py +++ b/framework/ssh_connection.py @@ -44,8 +44,8 @@ class SSHConnection(object): Implement send_expect/copy function upper SSHPexpet module. """ - def __init__(self, host, session_name, username, password=''): - self.session = SSHPexpect(host, username, password) + def __init__(self, host, session_name, username, password='', dut_id=0): + self.session = SSHPexpect(host, username, password, dut_id) self.name = session_name connection = {} connection[self.name] = self.session @@ -63,7 +63,7 @@ class SSHConnection(object): def send_command(self, cmds, timeout=1): self.logger.info(cmds) - out = self.session.send_command(cmds) + out = self.session.send_command(cmds, timeout) self.logger.debug(out) return out @@ -73,6 +73,9 @@ class SSHConnection(object): return out def close(self, force=False): + if getattr(self, "logger", None): + self.logger.logger_exit() + self.session.close(force) connection = {} connection[self.name] = self.session -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 10/16] framework/settings: add parallel related settings 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (8 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 09/16] framework/ssh_connection: " Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 11/16] framework/virt_resource: support multiple VMs module Marvin Liu ` (5 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/settings.py b/framework/settings.py index df3ac88..680f32b 100644 --- a/framework/settings.py +++ b/framework/settings.py @@ -217,6 +217,8 @@ DEBUG_CASE_SETTING = "DTS_DEBUGCASE_ENABLE" DPDK_RXMODE_SETTING = "DTS_DPDK_RXMODE" DTS_ERROR_ENV = "DTS_RUNNING_ERROR" DTS_CFG_FOLDER = "DTS_CFG_FOLDER" +DTS_PARALLEL_SETTING = "DTS_PARALLEL_ENABLE" +MKS_LM_ENABLING = "DTS_MKS_LM_ENABLE" """ @@ -229,6 +231,7 @@ DTS_ERR_TBL = { "TESTER_SETUP_ERR" : 4, "SUITE_SETUP_ERR": 5, "SUITE_EXECUTE_ERR": 6, + "PARALLEL_EXECUTE_ERR": 7 } def get_nic_name(type): -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 11/16] framework/virt_resource: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (9 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 10/16] framework/settings: add parallel related settings Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu ` (4 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Add serial/migrate/display port allocation support. 2. Add parallel lock for virtualzation resource allocation functions. 3. Quick scan free port in port allocation function. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_resource.py b/framework/virt_resource.py index b830f4e..1b37d4c 100644 --- a/framework/virt_resource.py +++ b/framework/virt_resource.py @@ -31,10 +31,14 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from random import randint -from utils import get_obj_funcs +from utils import get_obj_funcs, parallel_lock, RED -INIT_FREE_PORT = 6060 +INIT_FREE_PORT = 6000 +INIT_SERIAL_PORT = 7000 +INIT_MIGRATE_PORT = 8000 +INIT_DISPLAY_PORT = 0 +QuickScan = True class VirtResource(object): @@ -149,6 +153,7 @@ class VirtResource(object): for cpu in cpus: self.__core_used(cpu) + @parallel_lock() def alloc_cpu(self, vm='', number=-1, socket=-1, corelist=None): """ There're two options for request cpu resouce for vm. @@ -207,12 +212,14 @@ class VirtResource(object): return False return True + @parallel_lock() def free_cpu(self, vm): if self.__vm_has_resource(vm, 'cores'): for core in self.allocated_info[vm]['cores']: self.__core_unused(core) self.allocated_info[vm].pop('cores') + @parallel_lock() def alloc_pf(self, vm='', number=-1, socket=-1, pflist=[]): """ There're two options for request pf devices for vm. @@ -246,12 +253,14 @@ class VirtResource(object): self.allocated_info[vm]['ports'] = ports return ports + @parallel_lock() def free_pf(self, vm): if self.__vm_has_resource(vm, 'ports'): for pci in self.allocated_info[vm]['ports']: self.__port_unused(pci) self.allocated_info[vm].pop('ports') + @parallel_lock() def alloc_vf_from_pf(self, vm='', pf_pci='', number=-1, vflist=[]): """ There're two options for request vf devices of pf device. @@ -286,12 +295,14 @@ class VirtResource(object): self.allocated_info[vm]['vfs'] = vfs return vfs + @parallel_lock() def free_vf(self, vm): if self.__vm_has_resource(vm, 'vfs'): for pci in self.allocated_info[vm]['vfs']: self.__vf_unused(pci) self.allocated_info[vm].pop('vfs') + @parallel_lock() def add_vf_on_pf(self, pf_pci='', vflist=[]): """ Add vf devices generated by specified pf devices. @@ -307,6 +318,7 @@ class VirtResource(object): self.used_vfs += used_vfs self.vfs += vfs + @parallel_lock() def del_vf_on_pf(self, pf_pci='', vflist=[]): """ Remove vf devices generated by specified pf devices. @@ -327,66 +339,95 @@ class VirtResource(object): del self.used_vfs[index] del self.vfs[index] - def alloc_port(self, vm=''): + @parallel_lock() + def _check_port_allocated(self, port): + """ + Check whether port has been pre-allocated + """ + for vm_info in self.allocated_info.values(): + if vm_info.has_key('hostport') and port == vm_info['hostport']: + return True + if vm_info.has_key('serialport') and port == vm_info['serialport']: + return True + if vm_info.has_key('migrateport') and port == vm_info['migrateport']: + return True + if vm_info.has_key('displayport') and port == (vm_info['displayport'] + 5900): + return True + return False + + @parallel_lock() + def alloc_port(self, vm='', port_type='connect'): """ Allocate unused host port for vm """ + global INIT_FREE_PORT + global INIT_SERIAL_PORT + global INIT_MIGRATE_PORT + global INIT_DISPLAY_PORT + if vm == '': print "Alloc host port request vitual machine name!!!" return None - port_start = INIT_FREE_PORT + randint(1, 100) - port_step = randint(1, 10) - port = None - count = 20 + if port_type == 'connect': + port = INIT_FREE_PORT + elif port_type == 'serial': + port = INIT_SERIAL_PORT + elif port_type == 'migrate': + port = INIT_MIGRATE_PORT + elif port_type == 'display': + port = INIT_DISPLAY_PORT + 5900 + while True: - if self.dut.check_port_occupied(port_start) is False: - port = port_start + if self.dut.check_port_occupied(port) is False and self._check_port_allocated(port) is False: break - count -= 1 - if count < 0: - print 'No available port on the host!!!' - break - port_start += port_step + else: + port += 1 + continue if vm not in self.allocated_info: self.allocated_info[vm] = {} - self.allocated_info[vm]['hostport'] = port + if port_type == 'connect': + self.allocated_info[vm]['hostport'] = port + elif port_type == 'serial': + self.allocated_info[vm]['serialport'] = port + elif port_type == 'migrate': + self.allocated_info[vm]['migrateport'] = port + elif port_type == 'display': + port -= 5900 + self.allocated_info[vm]['displayport'] = port + + # do not scan port from the begining + if QuickScan: + if port_type == 'connect': + INIT_FREE_PORT = port + elif port_type == 'serial': + INIT_SERIAL_PORT = port + elif port_type == 'migrate': + INIT_MIGRATE_PORT = port + elif port_type == 'display': + INIT_DISPLAY_PORT = port + return port + @parallel_lock() def free_port(self, vm): if self.__vm_has_resource(vm, 'hostport'): self.allocated_info[vm].pop('hostport') - - def alloc_vnc_num(self, vm=''): - """ - Allocate unused host VNC display number for VM. - """ - if vm == '': - print "Alloc vnc display number request vitual machine name!!!" - return None - - max_vnc_display_num = self.dut.get_maximal_vnc_num() - free_vnc_display_num = max_vnc_display_num + 1 - - if vm not in self.allocated_info: - self.allocated_info[vm] = {} - - self.allocated_info[vm]['vnc_display_num'] = free_vnc_display_num - - return free_vnc_display_num - - def free_vnc_num(self, vm): - if self.__vm_has_resource(vm, 'vnc_display_num'): - self.allocated_info[vm].pop('vnc_display_num') - + if self.__vm_has_resource(vm, 'serialport'): + self.allocated_info[vm].pop('serialport') + if self.__vm_has_resource(vm, 'migrateport'): + self.allocated_info[vm].pop('migrateport') + if self.__vm_has_resource(vm, 'displayport'): + self.allocated_info[vm].pop('displayport') + + @parallel_lock() def free_all_resource(self, vm): """ Free all resource VM has been allocated. """ self.free_port(vm) - self.free_vnc_num(vm) self.free_vf(vm) self.free_pf(vm) self.free_cpu(vm) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 12/16] framework/virt_base: add attach/quick start/quit function for VM management 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (10 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 11/16] framework/virt_resource: support multiple VMs module Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 13/16] framework/virt_dut: support multiple VMs module Marvin Liu ` (3 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Add attach function, virtual machine can be auto-detected and connected. There's no need to stop and restart the virtual machine if it has been started. 2. Add quit function, just close the ssh connection to virtual machine. The virutal machine will still alive afte quit function. 3. Add quick start function, just start virtual machine and will not initialize it. 4. Add local configuration set function, user can assigned virtual machine parameters without any configuration file. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_base.py b/framework/virt_base.py index 1838ca1..e93cbab 100644 --- a/framework/virt_base.py +++ b/framework/virt_base.py @@ -42,7 +42,6 @@ from config import VIRTCONF from logger import getLogger from settings import CONFIG_ROOT_PATH from virt_dut import VirtDut -from utils import remove_old_rsa_key ST_NOTSTART = "NOTSTART" ST_PAUSE = "PAUSE" @@ -67,11 +66,9 @@ class VirtBase(object): self.vm_name = vm_name self.suite = suite_name - # init the host session and logger for VM - self.host_dut.init_host_session() + # create self used host session, need close it later + self.host_session = self.host_dut.new_session(self.vm_name) - # replace dut session - self.host_session = self.host_dut.host_session self.host_logger = self.host_dut.logger # base_dir existed for host dut has prepared it self.host_session.send_expect("cd %s" % self.host_dut.base_dir, "# ") @@ -87,6 +84,7 @@ class VirtBase(object): self.virt_type = self.get_virt_type() self.params = [] + self.local_conf = [] # default call back function is None self.callback = None @@ -124,22 +122,32 @@ class VirtBase(object): if self.find_option_index(key) is None: self.__save_local_config(key, param[key]) + def set_local_config(self, local_conf): + """ + Configure VM configuration from user input + """ + self.local_conf = local_conf + def load_local_config(self, suite_name): """ Load local configure in the path DTS_ROOT_PATH/conf. """ # load local configuration by suite and vm name - conf = VirtConf(CONFIG_ROOT_PATH + os.sep + suite_name + '.cfg') - conf.load_virt_config(self.vm_name) - local_conf = conf.get_virt_config() + try: + conf = VirtConf(CONFIG_ROOT_PATH + os.sep + suite_name + '.cfg') + conf.load_virt_config(self.vm_name) + self.local_conf = conf.get_virt_config() + except: + # when met exception in load VM config + # just leave local conf untouched + pass + # replace global configurations with local configurations - for param in local_conf: + for param in self.local_conf: if 'mem' in param.keys(): self.__save_local_config('mem', param['mem']) - continue if 'cpu' in param.keys(): self.__save_local_config('cpu', param['cpu']) - continue # save local configurations for key in param.keys(): self.__save_local_config(key, param[key]) @@ -165,13 +173,16 @@ class VirtBase(object): try: param_func = getattr(self, 'add_vm_' + key) if callable(param_func): - for option in value: - param_func(**option) + if type(value) is list: + for option in value: + param_func(**option) else: print utils.RED("Virt %s function not callable!!!" % key) except AttributeError: + self.host_logger.error(traceback.print_exception(*sys.exc_info())) print utils.RED("Virt %s function not implemented!!!" % key) except Exception: + self.host_logger.error(traceback.print_exception(*sys.exc_info())) raise exception.VirtConfigParamException(key) def find_option_index(self, option): @@ -232,6 +243,21 @@ class VirtBase(object): self.load_global_config() self.load_local_config(self.suite) + def attach(self): + # load configuration + self.load_config() + + # change login user/password + index = self.find_option_index("login") + if index: + value = self.params[index]["login"] + for option in value: + self.add_vm_login(**option) + + # attach real vm + self._attach_vm() + return None + def start(self, load_config=True, set_target=True, cpu_topo=''): """ Start VM and instantiate the VM with VirtDut. @@ -247,7 +273,7 @@ class VirtBase(object): if self.vm_status is ST_RUNNING: # connect vm dut and init running environment - vm_dut = self.instantiate_vm_dut(set_target, cpu_topo) + vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, autodetect_topo=True) else: vm_dut = None @@ -263,6 +289,28 @@ class VirtBase(object): return None return vm_dut + def quick_start(self, load_config=True, set_target=True, cpu_topo=''): + """ + Only Start VM and not do anything else, will be helpful in multiple VMs + """ + try: + if load_config is True: + self.load_config() + # compose boot command for different hypervisors + self.compose_boot_param() + + # start virutal machine + self._quick_start_vm() + + except Exception as vm_except: + if self.handle_exception(vm_except): + print utils.RED("Handled expection " + str(type(vm_except))) + else: + print utils.RED("Unhandled expection " + str(type(vm_except))) + + if callable(self.callback): + self.callback() + def migrated_start(self, set_target=True, cpu_topo=''): """ Instantiate the VM after migration done @@ -271,7 +319,7 @@ class VirtBase(object): try: if self.vm_status is ST_PAUSE: # connect backup vm dut and it just inherited from host - vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, bind_dev=False) + vm_dut = self.instantiate_vm_dut(set_target, cpu_topo, bind_dev=False, autodetect_topo=False) except Exception as vm_except: if self.handle_exception(vm_except): print utils.RED("Handled expection " + str(type(vm_except))) @@ -324,7 +372,7 @@ class VirtBase(object): """ NotImplemented - def instantiate_vm_dut(self, set_target=True, cpu_topo='', bind_dev=True): + def instantiate_vm_dut(self, set_target=True, cpu_topo='', bind_dev=True, autodetect_topo=True): """ Instantiate the Dut class for VM. """ @@ -337,9 +385,6 @@ class VirtBase(object): crb['user'] = username crb['pass'] = password - # remove default key - remove_old_rsa_key(self.host_dut.tester, crb['IP']) - serializer = self.host_dut.serializer try: @@ -350,8 +395,10 @@ class VirtBase(object): self.virt_type, self.vm_name, self.suite, - cpu_topo) - except: + cpu_topo, + dut_id=self.host_dut.dut_id) + except Exception as vm_except: + self.handle_exception(vm_except) raise exception.VirtDutConnectException return None @@ -374,7 +421,7 @@ class VirtBase(object): try: # setting up dpdk in vm, must call at last - vm_dut.prerequisites(self.host_dut.package, self.host_dut.patches) + vm_dut.prerequisites(self.host_dut.package, self.host_dut.patches, autodetect_topo) if set_target: target = self.host_dut.target vm_dut.set_target(target, bind_dev) @@ -389,6 +436,19 @@ class VirtBase(object): """ Stop the VM. """ + self._stop_vm() + self.quit() + + self.virt_pool.free_all_resource(self.vm_name) + + def quit(self): + """ + Just quit connection to the VM + """ + if getattr(self, 'host_session', None): + self.host_session.close() + self.host_session = None + # vm_dut may not init in migration case if getattr(self, 'vm_dut', None): if self.vm_status is ST_RUNNING: @@ -400,10 +460,6 @@ class VirtBase(object): self.vm_dut.logger.logger_exit() self.vm_dut = None - self._stop_vm() - - self.virt_pool.free_all_resource(self.vm_name) - def register_exit_callback(self, callback): """ Call register exit call back function -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 13/16] framework/virt_dut: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (11 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 14/16] framework/qemu_kvm: " Marvin Liu ` (2 subsequent siblings) 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Add DUT index argument and protect tester interactive process by lock. 2. Only create one ssh connection to virtual machine. 3. Support disable auto-detect function which must be protected, thus can save virtual machine instantiate time. 4. Without alternative session, kill_all function should be handled by control session. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/virt_dut.py b/framework/virt_dut.py index 348279d..83f661d 100644 --- a/framework/virt_dut.py +++ b/framework/virt_dut.py @@ -32,8 +32,8 @@ import os import re import time -import utils import settings +from utils import RED, parallel_lock from config import PortConf from settings import NICS, LOG_NAME_SEP, get_netdev from project_dpdk import DPDKdut @@ -53,14 +53,16 @@ class VirtDut(DPDKdut): or CRBBareMetal. """ - def __init__(self, hyper, crb, serializer, virttype, vm_name, suite, cpu_topo): + def __init__(self, hyper, crb, serializer, virttype, vm_name, suite, cpu_topo, dut_id): self.vm_name = vm_name self.hyper = hyper self.cpu_topo = cpu_topo + self.dut_id = dut_id self.vm_ip = crb['IP'] self.NAME = 'virtdut' + LOG_NAME_SEP + '%s' % self.vm_ip - super(Dut, self).__init__(crb, serializer, self.NAME) + # do not create addition alt_session + super(Dut, self).__init__(crb, serializer, self.NAME, alt_session=False, dut_id=self.dut_id) # load port config from suite cfg self.suite = suite @@ -73,15 +75,13 @@ class VirtDut(DPDKdut): self.virttype = virttype def init_log(self): - self.logger.config_suite(self.host_dut.test_classname, 'virtdut') + if hasattr(self.host_dut, "test_classname"): + self.logger.config_suite(self.host_dut.test_classname, 'virtdut') def close(self, force=False): if self.session: self.session.close(force) self.session = None - if self.alt_session: - self.alt_session.close(force) - self.alt_session = None RemoveNicObj(self) def set_nic_type(self, nic_type): @@ -91,6 +91,7 @@ class VirtDut(DPDKdut): self.nic_type = nic_type # vm_dut config will load from vm configuration file + @parallel_lock() def load_portconf(self): """ Load port config for this virtual machine @@ -99,12 +100,24 @@ class VirtDut(DPDKdut): self.conf.load_ports_config(self.vm_name) self.ports_cfg = self.conf.get_ports_config() - return + @parallel_lock() + def detect_portmap(self, dut_id): + """ + Detect port mapping with ping6 message, should be locked for protect + tester operations. + """ + # enable tester port ipv6 + self.host_dut.enable_tester_ipv6() + + self.map_available_ports() - def create_portmap(self): - # if not config ports in vm port config file, used ping6 get portmap - if not self.ports_cfg: - self.map_available_ports() + # disable tester port ipv6 + self.host_dut.disable_tester_ipv6() + + def load_portmap(self): + """ + Generate port mapping base on loaded port configuration + """ port_num = len(self.ports_info) self.ports_map = [-1] * port_num for key in self.ports_cfg.keys(): @@ -142,7 +155,7 @@ class VirtDut(DPDKdut): if bind_dev: self.bind_interfaces_linux('igb_uio') - def prerequisites(self, pkgName, patch): + def prerequisites(self, pkgName, patch, autodetect_topo): """ Prerequest function should be called before execute any test case. Will call function to scan all lcore's information which on DUT. @@ -152,7 +165,10 @@ class VirtDut(DPDKdut): if not self.skip_setup: self.prepare_package() - self.send_expect("cd %s" % self.base_dir, "# ") + out = self.send_expect("cd %s" % self.base_dir, "# ") + if 'No such file or directory' in out: + self.logger.error("Can't switch to dpdk folder!!!") + self.send_expect("alias ls='ls --color=none'", "#") if self.get_os_type() == 'freebsd': @@ -180,14 +196,14 @@ class VirtDut(DPDKdut): # load port infor from config file self.load_portconf() - # enable tester port ipv6 - self.host_dut.enable_tester_ipv6() self.mount_procfs() - self.create_portmap() - - # disable tester port ipv6 - self.host_dut.disable_tester_ipv6() + if self.ports_cfg: + self.load_portmap() + else: + # if no config ports in port config file, will auto-detect portmap + if autodetect_topo: + self.detect_portmap(dut_id=self.dut_id) # print latest ports_info for port_info in self.ports_info: @@ -196,7 +212,7 @@ class VirtDut(DPDKdut): def init_core_list(self): self.cores = [] cpuinfo = self.send_expect("grep --color=never \"processor\"" - " /proc/cpuinfo", "#", alt_session=False) + " /proc/cpuinfo", "#") cpuinfo = cpuinfo.split('\r\n') if self.cpu_topo != '': topo_reg = r"(\d)S/(\d)C/(\d)T" @@ -394,3 +410,21 @@ class VirtDut(DPDKdut): self.ports_map[vmPort] = remotePort hits[remotePort] = True continue + + def kill_all(self, alt_session=False): + """ + Kill all dpdk applications on VM + """ + control = getattr(self.hyper, 'control_session', None) + if callable(control): + out = control("lsof -Fp /var/run/.rte_config") + pids = [] + pid_reg = r'p(\d+)' + if len(out): + lines = out.split('\r\n') + for line in lines: + m = re.match(pid_reg, line) + if m: + pids.append(m.group(1)) + for pid in pids: + control('kill -9 %s' % pid) -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 14/16] framework/qemu_kvm: support multiple VMs module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (12 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 13/16] framework/virt_dut: support multiple VMs module Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 15/16] conf/virt_global: add vm management related configuration Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 16/16] doc: add descriptions for multiple virtual machines module Marvin Liu 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu 1. Also utilize serial port for control session, now three types 'qga'|'telnet'|'socket' are supported for virtual machine control session. 2. Protect qemu start action with parallel lock. 3. Support attach function. 4. With serial port, can control virtual machine startup process by start time/logout time/login prompt/passwork prompt variables. Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/framework/qemu_kvm.py b/framework/qemu_kvm.py index 5671de8..a75bf78 100644 --- a/framework/qemu_kvm.py +++ b/framework/qemu_kvm.py @@ -37,14 +37,12 @@ import os from virt_base import VirtBase from virt_base import ST_NOTSTART, ST_PAUSE, ST_RUNNING, ST_UNKNOWN from exception import StartVMFailedException -from settings import get_host_ip +from settings import get_host_ip, load_global_setting, DTS_PARALLEL_SETTING +from utils import parallel_lock, RED # This name is derictly defined in the qemu guest serivce # So you can not change it except it is changed by the service QGA_DEV_NAME = 'org.qemu.guest_agent.0' -# This path defines an socket path on the host connected with -# a specified VM -QGA_SOCK_PATH_TEMPLATE = '/tmp/%(vm_name)s_qga0.sock' class QEMUKvm(VirtBase): @@ -65,6 +63,17 @@ class QEMUKvm(VirtBase): "fi" QEMU_IFUP_PATH = '/etc/qemu-ifup' + # Default login session timeout value + LOGIN_TIMEOUT = 60 + # By default will wait 120 seconds for VM start + # If VM not ready in this period, will try restart it once + START_TIMEOUT = 120 + # Default timeout value for operation when VM starting + OPERATION_TIMEOUT = 20 + # Default login prompt + LOGIN_PROMPT = "login:" + # Default password prompt + PASSWORD_PROMPT = "Password:" def __init__(self, dut, vm_name, suite_name): super(QEMUKvm, self).__init__(dut, vm_name, suite_name) @@ -79,9 +88,6 @@ class QEMUKvm(VirtBase): # initialize some resource used by guest. self.init_vm_request_resource() - QGA_CLI_PATH = '-r dep/QMP/' - self.host_session.copy_file_to(QGA_CLI_PATH) - # charater and network device default index self.char_idx = 0 self.netdev_idx = 0 @@ -105,13 +111,38 @@ class QEMUKvm(VirtBase): # if there is not the values of the specified options self.set_vm_default() + self.am_attached = False + + # allow restart VM when can't login + self.restarted = False + + def check_alive(self): + """ + Check whether VM is alive for has been start up + """ + pid_regx = r'p(\d+)' + out = self.host_session.send_expect("lsof -Fp /tmp/.%s.pid" % self.vm_name, "#", timeout=30) + for line in out.splitlines(): + m = re.match(pid_regx, line) + if m: + self.host_logger.info("Found VM %s already running..." % m.group(0)) + return True + return False + + def kill_alive(self): + pid_regx = r'p(\d+)' + out = self.host_session.send_expect("lsof -Fp /tmp/.%s.pid" % self.vm_name, "# ") + for line in out.splitlines(): + m = re.match(pid_regx, line) + if m: + self.host_session.send_expect("kill -9 %s" % m.group(0)[1:], "# ") + def set_vm_default(self): self.set_vm_name(self.vm_name) if self.arch == 'aarch64': self.set_vm_machine('virt') self.set_vm_enable_kvm() self.set_vm_pid_file() - self.set_vm_qga() self.set_vm_daemon() self.set_vm_monitor() @@ -388,6 +419,19 @@ class QEMUKvm(VirtBase): pflash_boot_line = '-pflash %s' % options['file'] self.__add_boot_line(pflash_boot_line) + def add_vm_start(self, **options): + """ + Update VM start and login related settings + """ + if 'wait_seconds' in options.keys(): + self.START_TIMEOUT = int(options['wait_seconds']) + if 'login_timeout' in options.keys(): + self.LOGIN_TIMEOUT = int(options['login_timeout']) + if 'login_prompt' in options.keys(): + self.LOGIN_PROMPT = options['login_prompt'] + if 'password_prompt' in options.keys(): + self.PASSWORD_PROMPT = options['password_prompt'] + def add_vm_login(self, **options): """ user: login username of virtual machine @@ -540,8 +584,11 @@ class QEMUKvm(VirtBase): # get the host port in the option host_port = field(opt_hostfwd, 2).split('-')[0] + + # if no host assigned, just allocate it if not host_port: - host_port = str(self.virt_pool.alloc_port(self.vm_name)) + host_port = str(self.virt_pool.alloc_port(self.vm_name, port_type='connect')) + self.redir_port = host_port # get the guest addr @@ -639,6 +686,9 @@ class QEMUKvm(VirtBase): else: self.params.append({'device': [opts]}) + # start up time may increase after add device + self.START_TIMEOUT += 8 + def add_vm_device(self, **options): """ driver: [pci-assign | virtio-net-pci | ...] @@ -655,8 +705,8 @@ class QEMUKvm(VirtBase): self.__add_vm_virtio_user_pci(**options) elif options['driver'] == 'vhost-cuse': self.__add_vm_virtio_cuse_pci(**options) - if options['driver'] == 'vfio-pci': - self.__add_vm_pci_vfio(**options) + elif options['driver'] == 'vfio-pci': + self.__add_vm_pci_vfio(**options) def __add_vm_pci_vfio(self, **options): """ @@ -710,7 +760,17 @@ class QEMUKvm(VirtBase): """ separator = ',' # chardev parameter - if 'opt_path' in options.keys() and options['opt_path']: + netdev_id = 'netdev%d' % self.netdev_idx + if 'opt_script' in options.keys() and options['opt_script']: + if 'opt_br' in options.keys() and \ + options['opt_br']: + bridge = options['opt_br'] + else: + bridge = self.DEFAULT_BRIDGE + self.__generate_net_config_script(str(bridge)) + dev_boot_line = '-netdev tap,id=%s,script=%s' % (netdev_id, options['opt_script']) + self.netdev_idx += 1 + elif 'opt_path' in options.keys() and options['opt_path']: dev_boot_line = '-chardev socket' char_id = 'char%d' % self.char_idx if 'opt_server' in options.keys() and options['opt_server']: @@ -736,12 +796,16 @@ class QEMUKvm(VirtBase): netdev_id, char_id) self.__add_boot_line(dev_boot_line) # device parameter - opts = {'opt_netdev': '%s' % netdev_id} - if 'opt_mac' in options.keys() and \ - options['opt_mac']: - opts['opt_mac'] = options['opt_mac'] - if 'opt_settings' in options.keys() and options['opt_settings']: - opts['opt_settings'] = options['opt_settings'] + opts = {'opt_netdev': '%s' % netdev_id} + if 'opt_mac' in options.keys() and \ + options['opt_mac']: + opts['opt_mac'] = options['opt_mac'] + if 'opt_settings' in options.keys() and options['opt_settings']: + opts['opt_settings'] = options['opt_settings'] + if 'opt_legacy' in options.keys() and options['opt_legacy']: + opts['opt_legacy'] = options['opt_legacy'] + if 'opt_settings' in options.keys() and options['opt_settings']: + opts['opt_settings'] = options['opt_settings'] self.__add_vm_virtio_net_pci(**opts) def __add_vm_virtio_cuse_pci(self, **options): @@ -797,6 +861,9 @@ class QEMUKvm(VirtBase): if 'opt_addr' in options.keys() and \ options['opt_addr']: dev_boot_line += separator + 'addr=%s' % options['opt_addr'] + if 'opt_legacy' in options.keys() and \ + options['opt_legacy']: + dev_boot_line += separator + 'disable-modern=%s' % options['opt_legacy'] if 'opt_settings' in options.keys() and \ options['opt_settings']: dev_boot_line += separator + '%s' % options['opt_settings'] @@ -843,41 +910,6 @@ class QEMUKvm(VirtBase): else: self.monitor_sock_path = None - def set_vm_qga(self, enable='yes'): - """ - Set VM qemu-guest-agent. - """ - index = self.find_option_index('qga') - if index: - self.params[index] = {'qga': [{'enable': '%s' % enable}]} - else: - self.params.append({'qga': [{'enable': '%s' % enable}]}) - QGA_SOCK_PATH = QGA_SOCK_PATH_TEMPLATE % {'vm_name': self.vm_name} - self.qga_sock_path = QGA_SOCK_PATH - - def add_vm_qga(self, **options): - """ - enable: 'yes' - Make sure qemu-guest-agent servie up in vm - """ - QGA_DEV_ID = '%(vm_name)s_qga0' % {'vm_name': self.vm_name} - QGA_SOCK_PATH = QGA_SOCK_PATH_TEMPLATE % {'vm_name': self.vm_name} - - separator = ' ' - - if 'enable' in options.keys(): - if options['enable'] == 'yes': - qga_boot_block = '-chardev socket,path=%(SOCK_PATH)s,server,nowait,id=%(ID)s' + \ - separator + '-device virtio-serial' + separator + \ - '-device virtserialport,chardev=%(ID)s,name=%(DEV_NAME)s' - qga_boot_line = qga_boot_block % {'SOCK_PATH': QGA_SOCK_PATH, - 'DEV_NAME': QGA_DEV_NAME, - 'ID': QGA_DEV_ID} - self.__add_boot_line(qga_boot_line) - self.qga_sock_path = QGA_SOCK_PATH - else: - self.qga_sock_path = '' - def add_vm_migration(self, **options): """ enable: yes @@ -891,63 +923,209 @@ class QEMUKvm(VirtBase): self.migrate_port = options['port'] else: self.migrate_port = str( - self.virt_pool.alloc_port(self.vm_name)) + self.virt_pool.alloc_port(self.vm_name), port_type="migrate") migrate_boot_line = migrate_cmd % { 'migrate_port': self.migrate_port} self.__add_boot_line(migrate_boot_line) - def add_vm_serial_port(self, **options): + + def set_vm_control(self, **options): """ - enable: 'yes' + Set control session options """ - if 'enable' in options.keys(): - if options['enable'] == 'yes': - self.serial_path = "/tmp/%s_serial.sock" % self.vm_name - serial_boot_line = '-serial unix:%s,server,nowait' % self.serial_path - self.__add_boot_line(serial_boot_line) + if 'type' in options.keys(): + self.control_type = options['type'] + else: + self.control_type = 'telnet' + + index = self.find_option_index('control') + if index: + self.params[index] = {'control': [{'type': self.control_type}]} + else: + self.params.append({'control': [{'type': self.control_type}]}) + + def add_vm_control(self, **options): + """ + Add control method for VM management + type : 'telnet' | 'socket' | 'qga' + """ + separator = ' ' + + self.control_type = options['type'] + if self.control_type == 'telnet': + if 'port' in options: + self.serial_port = int(options['port']) else: - pass + self.serial_port = self.virt_pool.alloc_port(self.vm_name, port_type="serial") + control_boot_line = '-serial telnet::%d,server,nowait' % self.serial_port + elif self.control_type == 'socket': + self.serial_path = "/tmp/%s_serial.sock" % self.vm_name + control_boot_line = '-serial unix:%s,server,nowait' % self.serial_path + elif self.control_type == 'qga': + qga_dev_id = '%(vm_name)s_qga0' % {'vm_name': self.vm_name} + self.qga_socket_path = '/tmp/%(vm_name)s_qga0.sock' % {'vm_name': self.vm_name} + self.qga_cmd_head = '~/QMP/qemu-ga-client --address=%s ' % self.qga_socket_path + qga_boot_block = '-chardev socket,path=%(SOCK_PATH)s,server,nowait,id=%(ID)s' + \ + separator + '-device virtio-serial' + separator + \ + '-device virtserialport,chardev=%(ID)s,name=%(DEV_NAME)s' + control_boot_line = qga_boot_block % {'SOCK_PATH': self.qga_socket_path, + 'DEV_NAME': QGA_DEV_NAME, + 'ID': qga_dev_id} + + self.__add_boot_line(control_boot_line) + + def connect_serial_port(self, name=""): + """ + Connect to serial port and return connected session for usage + if connected failed will return None + """ + shell_reg = r"(\s*)\[(.*)\]# " + try: + if getattr(self, 'control_session', None) is None: + self.control_session = self.host_session + + self.control_session.send_command("nc -U %s" % self.serial_path) + + # login message not ouput if timeout too small + out = self.control_session.send_command("", timeout=5).replace('\r', '').replace('\n', '') - def connect_serial_port(self, name="", first=True): + if len(out) == 0: + raise StartVMFailedException("Can't get output from [%s:%s]" % (self.host_dut.crb['My IP'], self.vm_name)) + + m = re.match(shell_reg, out) + if m: + # dmidecode output contain #, so use other matched string + out = self.control_session.send_expect("dmidecode -t system", "Product Name", timeout=self.OPERATION_TIMEOUT) + # cleanup previous output + self.control_session.get_session_before(timeout=0.1) + + # if still on host, need reconnect + if 'QEMU' not in out: + raise StartVMFailedException("Not real login [%s]" % self.vm_name) + else: + # has enter into VM shell + return True + + # login into Redhat os, not sure can work on all distributions + if self.LOGIN_PROMPT not in out: + raise StartVMFailedException("Can't login [%s] now!!!" % self.vm_name) + else: + self.control_session.send_expect("%s" % self.username, self.PASSWORD_PROMPT, timeout=self.LOGIN_TIMEOUT) + # system maybe busy here, enlarge timeout equal to login timeout + self.control_session.send_expect("%s" % self.password, "#", timeout=self.LOGIN_TIMEOUT) + return self.control_session + except Exception as e: + # when exception happened, force close serial connection and reconnect + print RED("[%s:%s] exception [%s] happened" % (self.host_dut.crb['My IP'], self.vm_name, str(e))) + self.close_control_session(dut_id=self.host_dut.dut_id) + return False + + def connect_telnet_port(self, name=""): """ Connect to serial port and return connected session for usage if connected failed will return None """ - if getattr(self, 'serial_path', None): - self.serial_session = self.host_dut.new_session(suite=name) - self.serial_session.send_command("nc -U %s" % self.serial_path) - if first: - # login into Fedora os, not sure can work on all distributions - self.serial_session.send_expect("", "login:") - self.serial_session.send_expect( - "%s" % self.username, "Password:") - self.serial_session.send_expect("%s" % self.password, "# ") - return self.serial_session + shell_reg = r"(\s*)\[(.*)\]# " + scan_cmd = "lsof -i:%d | grep telnet | awk '{print $2}'" % self.serial_port - return None + try: + # assume serial is not connect + if getattr(self, 'control_session', None) is None: + self.control_session = self.host_session + + self.control_session.send_expect("telnet localhost %d" % self.serial_port, "Connected to localhost", timeout=self.OPERATION_TIMEOUT) + + # output will be empty if timeout too small + out = self.control_session.send_command("", timeout=5).replace('\r', '').replace('\n', '') - def close_serial_port(self): + # if no output from serial port, either connection close or system hang + if len(out) == 0: + raise StartVMFailedException("Can't get output from [%s]" % self.vm_name) + + # if enter into shell + m = re.match(shell_reg, out) + if m: + # dmidecode output contain #, so use other matched string + out = self.control_session.send_expect("dmidecode -t system", "Product Name", timeout=self.OPERATION_TIMEOUT) + # cleanup previous output + self.control_session.get_session_before(timeout=0.1) + + # if still on host, need reconnect + if 'QEMU' not in out: + raise StartVMFailedException("Not real login [%s]" % self.vm_name) + else: + # has enter into VM shell + return True + + # login into Redhat os, not sure can work on all distributions + if "x86_64 on an x86_64" not in out: + print RED("[%s:%s] not ready for login" % (self.host_dut.crb['My IP'], self.vm_name)) + return False + else: + self.control_session.send_expect("%s" % self.username, "Password:", timeout=self.LOGIN_TIMEOUT) + self.control_session.send_expect("%s" % self.password, "#", timeout=self.LOGIN_TIMEOUT) + return True + except Exception as e: + # when exception happened, force close serial connection and reconnect + print RED("[%s:%s] exception [%s] happened" % (self.host_dut.crb['My IP'], self.vm_name, str(e))) + self.close_control_session(dut_id=self.host_dut.dut_id) + return False + + def connect_qga_port(self, name=""): """ - Close serial session if it existed + QGA control session just share with host session """ - if getattr(self, 'serial_session', None): - # exit from nc first - self.serial_session.send_expect("^C", "# ") - self.host_dut.close_session(self.serial_session) + try: + # assume serial is not connect + if getattr(self, 'control_session', None) is None: + self.control_session = self.host_session + + self.control_session.send_expect("%s ping %d" %(self.qga_cmd_head, self.START_TIMEOUT), "#", timeout=self.START_TIMEOUT) + + # here VM has been start and qga also ready + return True + except Exception as e: + # when exception happened, force close qga process and reconnect + print RED("[%s:%s] QGA not ready" % (self.host_dut.crb['My IP'], self.vm_name)) + self.close_control_session(dut_id=self.host_dut.dut_id) + return False def add_vm_vnc(self, **options): """ - displayNum: 1 + Add VM display option """ - if 'displayNum' in options.keys() and \ - options['displayNum']: - display_num = options['displayNum'] + if 'disable' in options.keys() and options['disable'] == 'True': + vnc_boot_line = '-display none' else: - display_num = self.virt_pool.alloc_vnc_num(self.vm_name) + if 'displayNum' in options.keys() and \ + options['displayNum']: + display_num = options['displayNum'] + else: + display_num = self.virt_pool.alloc_port(self.vm_name, port_type="display") + + vnc_boot_line = '-vnc :%d' % int(display_num) - vnc_boot_line = '-vnc :%d' % int(display_num) self.__add_boot_line(vnc_boot_line) + def set_vm_vnc(self, **options): + """ + Set VM display options + """ + if 'disable' in options.keys(): + vnc_option = [{'disable': 'True'}] + else: + if 'displayNum' in options.keys(): + vnc_option = [{'displayNum': options['displayNum']}] + else: + # will allocate vnc display later + vnc_option = [{'disable': 'False'}] + + index = self.find_option_index('vnc') + if index: + self.params[index] = {'vnc': vnc_option} + else: + self.params.append({'vnc': vnc_option}) + def set_vm_daemon(self, enable='yes'): """ Set VM daemon option. @@ -981,6 +1159,79 @@ class QEMUKvm(VirtBase): cmd = options['cmd'] self.__add_boot_line(cmd) + def _check_vm_status(self): + """ + Check and restart QGA if not ready, wait for network ready + """ + self.__wait_vm_ready() + + self.__wait_vmnet_ready() + + def _attach_vm(self): + """ + Attach VM + Collected information : serial/monitor/qga sock file + : hostfwd address + """ + self.am_attached = True + + if not self._query_pid(): + raise StartVMFailedException("Can't strip process pid!!!") + + cmdline = self.host_session.send_expect('cat /proc/%d/cmdline' % self.pid, '# ') + qemu_boot_line = cmdline.replace('\x00', ' ') + self.qemu_boot_line = qemu_boot_line.split(' ', 1)[1] + self.qemu_emulator = qemu_boot_line.split(' ', 1)[0] + + serial_reg = ".*serial\x00unix:(.*?)," + telnet_reg = ".*serial\x00telnet::(\d+)," + monitor_reg = ".*monitor\x00unix:(.*?)," + hostfwd_reg = ".*hostfwd=tcp:(.*):(\d+)-:" + migrate_reg = ".*incoming\x00tcp::(\d+)" + + # support both telnet and unix domain socket serial device + m = re.match(serial_reg, cmdline) + if not m: + m1 = re.match(telnet_reg, cmdline) + if not m1: + raise StartVMFailedException("No serial sock available!!!") + else: + self.serial_port = int(m1.group(1)) + self.control_type = "telnet" + else: + self.serial_path = m.group(1) + self.control_type = "socket" + + m = re.match(monitor_reg, cmdline) + if not m: + raise StartVMFailedException("No monitor sock available!!!") + self.monitor_sock_path = m.group(1) + + m = re.match(hostfwd_reg, cmdline) + if not m: + raise StartVMFailedException("No host fwd config available!!!") + + self.net_type = 'hostfwd' + self.host_port = m.group(2) + self.hostfwd_addr = m.group(1) + ':' + self.host_port + + # record start time, need call before check_vm_status + self.start_time = time.time() + + try: + self.update_status() + except: + self.host_logger.error("Can't query vm status!!!") + + if self.vm_status is not ST_PAUSE: + self._check_vm_status() + else: + m = re.match(migrate_reg, cmdline) + if not m: + raise StartVMFailedException("No migrate port available!!!") + + self.migrate_port = int(m.group(1)) + def _start_vm(self): """ Start VM. @@ -989,25 +1240,86 @@ class QEMUKvm(VirtBase): qemu_boot_line = self.generate_qemu_boot_line() - # Start VM using the qemu command - ret = self.host_session.send_expect(qemu_boot_line, '# ', verify=True) - if type(ret) is int and ret != 0: - raise StartVMFailedException('Start VM failed!!!') + self.__send_qemu_cmd(qemu_boot_line, dut_id=self.host_dut.dut_id) self.__get_pci_mapping() # query status self.update_status() + # sleep few seconds for bios/grub + time.sleep(10) + # when vm is waiting for migration, can't ping if self.vm_status is not ST_PAUSE: - # if VM waiting for migration, can't return ping - out = self.__control_session('ping', '120') - if "Not responded" in out: - raise StartVMFailedException('Not response in 120 seconds!!!') + self.__wait_vm_ready() self.__wait_vmnet_ready() + # Start VM using the qemu command + # lock critical action like start qemu + @parallel_lock(num=4) + def __send_qemu_cmd(self, qemu_boot_line, dut_id): + # add more time for qemu start will be slow when system is busy + ret = self.host_session.send_expect(qemu_boot_line, '# ', verify=True, timeout=30) + + # record start time + self.start_time = time.time() + + # wait for qemu process ready + time.sleep(2) + if type(ret) is int and ret != 0: + raise StartVMFailedException('Start VM failed!!!') + + def _quick_start_vm(self): + self.__alloc_assigned_pcis() + + qemu_boot_line = self.generate_qemu_boot_line() + + self.__send_qemu_cmd(qemu_boot_line, dut_id=self.host_dut.dut_id) + + self.__get_pci_mapping() + + # query status + self.update_status() + + # sleep few seconds for bios and grub + time.sleep(10) + + def __ping_vm(self): + logged_in = False + cur_time = time.time() + time_diff = cur_time - self.start_time + try_times = 0 + while (time_diff < self.START_TIMEOUT): + if self.control_command('ping') == "Success": + logged_in = True + break + + # update time consume + cur_time = time.time() + time_diff = cur_time - self.start_time + + self.host_logger.warning("Can't login [%s] on [%s], retry %d times!!!" % (self.vm_name, self.host_dut.crb['My IP'], try_times + 1)) + time.sleep(self.OPERATION_TIMEOUT) + try_times += 1 + continue + + return logged_in + + def __wait_vm_ready(self): + logged_in = self.__ping_vm() + if not logged_in: + if not self.restarted: + # make sure serial session has been quit + self.close_control_session(dut_id=self.host_dut.dut_id) + self.vm_status = ST_NOTSTART + self._stop_vm() + self.restarted = True + self._start_vm() + else: + raise StartVMFailedException('Not response in %d seconds!!!' % self.START_TIMEOUT) + def start_migration(self, remote_ip, remote_port): """ Send migration command to host and check whether start migration @@ -1049,18 +1361,14 @@ class QEMUKvm(VirtBase): """ Generate the whole QEMU boot line. """ - qemu_emulator = self.qemu_emulator - - if self.vcpus_pinned_to_vm.strip(): - vcpus = self.__alloc_vcpus() - - if vcpus.strip(): - qemu_boot_line = 'taskset -c %s ' % vcpus + \ - qemu_emulator + ' ' + \ - self.qemu_boot_line + if self.vcpus_pinned_to_vm: + vcpus = self.vcpus_pinned_to_vm.replace(' ', ',') + qemu_boot_line = 'taskset -c %s ' % vcpus + \ + self.qemu_emulator + ' ' + \ + self.qemu_boot_line else: - qemu_boot_line = qemu_emulator + ' ' + \ - self.qemu_boot_line + qemu_boot_line = self.qemu_emulator + ' ' + \ + self.qemu_boot_line return qemu_boot_line @@ -1069,16 +1377,12 @@ class QEMUKvm(VirtBase): wait for 120 seconds for vm net ready 10.0.2.* is the default ip address allocated by qemu """ - count = 40 - while count: - out = self.__control_session('ifconfig') - if "10.0.2" in out: - return True - time.sleep(6) - count -= 1 - - raise StartVMFailedException( - 'Virtual machine control net not ready in 120 seconds!!!') + ret = self.control_command("network") + # network has been ready, just return + if ret == "Success": + return True + else: + raise StartVMFailedException('Virtual machine control net not ready!!!') def __alloc_vcpus(self): """ @@ -1203,10 +1507,10 @@ class QEMUKvm(VirtBase): """ Get IP which VM is connected by bridge. """ - out = self.__control_session('ping', '60') + out = self.control_command('ping', '60') if not out: time.sleep(10) - out = self.__control_session('ifconfig') + out = self.control_command('ifconfig') ips = re.findall(r'inet (\d+\.\d+\.\d+\.\d+)', out) if '127.0.0.1' in ips: @@ -1269,7 +1573,7 @@ class QEMUKvm(VirtBase): Query and update VM status """ out = self.__monitor_session('info', 'status') - self.host_logger.info("Virtual machine status: %s" % out) + self.host_logger.warning("Virtual machine status: %s" % out) if 'paused' in out: self.vm_status = ST_PAUSE @@ -1280,12 +1584,23 @@ class QEMUKvm(VirtBase): info = self.host_session.send_expect('cat %s' % self.__pid_file, "# ") try: - pid = int(info) + pid = int(info.split()[0]) # save pid into dut structure self.host_dut.virt_pids.append(pid) except: self.host_logger.info("Failed to capture pid!!!") + def _query_pid(self): + info = self.host_session.send_expect('cat %s' % self.__pid_file, "# ") + try: + # sometimes saw to lines in pid file + pid = int(info.splitlines()[0]) + # save pid into dut structure + self.pid = pid + return True + except: + return False + def __strip_guest_pci(self): """ Strip all pci-passthrough device information, based on qemu monitor @@ -1317,43 +1632,183 @@ class QEMUKvm(VirtBase): return pcis - def __control_session(self, command, *args): + def __strip_guest_core(self): """ - Use the qemu guest agent service to control VM. + Strip all lcore-thread binding information + Return array will be [thread0, thread1, ...] + """ + cores = [] + # CPU #0: pc=0xffffffff8104c416 (halted) thread_id=40677 + core_reg = r'^.*CPU #(\d+): (.*) thread_id=(\d+)' + out = self.__monitor_session('info', 'cpus') + + if out is None: + return cores + + lines = out.split("\r\n") + for line in lines: + m = re.match(core_reg, line) + if m: + cores.append(int(m.group(3))) + + return cores + + def handle_control_session(func): + """ + Wrapper function to handle serial port, must return serial to host session + """ + def _handle_control_session(self, command): + # just raise error if connect failed, for func can't all any more + try: + if self.control_type == 'socket': + assert (self.connect_serial_port(name=self.vm_name)), "Can't connect to serial socket" + elif self.control_type == 'telnet': + assert (self.connect_telnet_port(name=self.vm_name)), "Can't connect to serial port" + else: + assert (self.connect_qga_port(name=self.vm_name)), "Can't connect to qga port" + except: + return 'Failed' + + try: + out = func(self, command) + self.quit_control_session() + return out + except Exception as e: + print RED("Exception happend on [%s] serial with cmd [%s]" % (self.vm_name, command)) + print RED(e) + self.close_control_session(dut_id=self.host_dut.dut_id) + return 'Failed' + + return _handle_control_session + + def quit_control_session(self): + """ + Quit from serial session gracefully + """ + if self.control_type == 'socket': + self.control_session.send_expect("^C", "# ") + elif self.control_type == 'telnet': + self.control_session.send_command("^]") + self.control_session.send_command("quit") + # nothing need to do for qga session + self.control_session = None + + @parallel_lock() + def close_control_session(self, dut_id): + """ + Force kill serial connection from DUT when exception happened + """ + # return control_session to host_session + if self.control_type == 'socket': + scan_cmd = "ps -e -o pid,cmd |grep 'nc -U %s' |grep -v grep" % self.serial_path + out = self.host_dut.send_expect(scan_cmd, "#") + proc_info = out.strip().split() + try: + pid = int(proc_info[0]) + self.host_dut.send_expect('kill %d' % pid, "#") + except: + pass + self.host_dut.send_expect("", "# ") + elif self.control_type == 'telnet': + scan_cmd = "lsof -i:%d | grep telnet | awk '{print $2}'" % self.serial_port + proc_info = self.host_dut.send_expect(scan_cmd, "#") + try: + pid = int(proc_info) + self.host_dut.send_expect('kill %d' % pid, "#") + except: + pass + elif self.control_type == 'qga': + scan_cmd = "ps -e -o pid,cmd |grep 'address=%s' |grep -v grep" % self.qga_socket_path + out = self.host_dut.send_expect(scan_cmd, "#") + proc_info = out.strip().split() + try: + pid = int(proc_info[0]) + self.host_dut.send_expect('kill %d' % pid, "#") + except: + pass + + self.control_session = None + return + + @handle_control_session + def control_command(self, command): + """ + Use the serial port to control VM. Note: :command: there are these commands as below: - cat, fsfreeze, fstrim, halt, ifconfig, info,\ - ping, powerdown, reboot, shutdown, suspend + ping, network, powerdown :args: give different args by the different commands. """ - if not self.qga_sock_path: - self.host_logger.info( - "No QGA service between host [ %s ] and guest [ %s ]" % - (self.host_dut.NAME, self.vm_name)) - return None - cmd_head = '~/QMP/' + \ - "qemu-ga-client " + \ - "--address=%s %s" % \ - (self.qga_sock_path, command) + if command == "ping": + if self.control_type == "qga": + return "Success" + else: + # disable stty input characters for send_expect function + self.control_session.send_expect("stty -echo", "#", timeout=self.OPERATION_TIMEOUT) + return "Success" + elif command == "network": + if self.control_type == "qga": + # wait few seconds for network ready + time.sleep(5) + out = self.control_session.send_expect(self.qga_cmd_head + "ifconfig" , "#", timeout=self.OPERATION_TIMEOUT) + else: + intf = self.control_session.send_expect("ls -1 /sys/bus/pci/devices/0000:00:1f.0/net", "#", timeout=self.OPERATION_TIMEOUT) + out = self.control_session.send_expect("ifconfig %s" % intf, "#", timeout=self.OPERATION_TIMEOUT) + if "10.0.2" not in out: + self.control_session.send_expect("dhclient %s -timeout 10" % intf, "#", timeout=30) + else: + return "Success" + + out = self.control_session.send_expect("ifconfig", "#", timeout=self.OPERATION_TIMEOUT) - cmd = cmd_head - for arg in args: - cmd = cmd_head + ' ' + str(arg) + if "10.0.2" not in out: + return "Failed" + else: + return "Success" + elif command == "powerdown": + if self.control_type == "qga": + self.control_session.send_expect(self.qga_cmd_head + "powerdown", "#", timeout=self.OPERATION_TIMEOUT) + else: + self.control_session.send_command("init 0") - if command is "ping": - out = self.host_session.send_expect(cmd, '# ', int(args[0])) - else: - out = self.host_session.send_expect(cmd, '# ') + if self.control_type == "socket": + self.control_session.send_expect("^C", "# ") + elif self.control_type == "telnet": + self.control_session.send_command("^]") + self.control_session.send_command("quit") - return out + time.sleep(10) + self.kill_alive() + return "Success" + else: + if self.control_type == "qga": + self.host_logger.warning("QGA not support [%s] command" % command) + out = "Failed" + else: + out = self.control_session.send_command(command) + return out def _stop_vm(self): """ Stop VM. """ if self.vm_status is ST_RUNNING: - self.__control_session('powerdown') + self.control_command('powerdown') else: self.__monitor_session('quit') time.sleep(5) + # remove temporary file + self.host_session.send_expect("rm -f %s" % self.__pid_file, "#") + + def pin_threads(self, lcores): + """ + Pin thread to assigned cores + """ + thread_reg = r'CPU #(\d+): .* thread_id=(\d+)' + output = self.__monitor_session('info', 'cpus') + thread_cores = re.findall(thread_reg, output) + cores_map = zip(thread_cores, lcores) + for thread_info, core_id in cores_map: + cpu_id, thread_id = thread_info + self.host_session.send_expect("taskset -pc %d %s" % (core_id, thread_id), "#") -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 15/16] conf/virt_global: add vm management related configuration 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (13 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 14/16] framework/qemu_kvm: " Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 16/16] doc: add descriptions for multiple virtual machines module Marvin Liu 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/conf/virt_global.cfg b/conf/virt_global.cfg index 176650a..58ca061 100644 --- a/conf/virt_global.cfg +++ b/conf/virt_global.cfg @@ -1,6 +1,23 @@ # virtualization global configurations #[LIBVIRT] #[KVM] +# start: settings for VM startup check +# wait_second: [wait seconds for VM startup] +# Will retry once if can't start in this time +# login_timeout: [seconds for login session timeout] +# used in login action command, login session will reset when timeout happened +# login_prompt: [prompt string] +# password_prompt: [prompt string] +# control: type for connect and manage the virtual machine +# type : [qga|telnet|socket] +# qga: control VM by quest qemu agent +# telnet: control VM by serial port, serial port will redirect to telnet port +# socket : control VM by serial port, serial port will redirect to unix socket domain +# vnc: setting for the virtual machine +# disable: [True|False] +# True: Disable display +# False: Enable display, will automatically allocate display port if no displayNumber option +# displayNumber: [display number] #[XEN] [LIBVIRT] cpu = @@ -12,6 +29,12 @@ cpu = model=host,number=4,cpupin=3 4 5 6; mem = size=2048; +start = + wait_seconds=120,login_timeout=60,login_prompt=login:,password_prompt=Password:; +control = + type=qga; +vnc = + disable=False; [XEN] cpu = number=4,cpupin=3 4 5 6; @@ -19,4 +42,3 @@ mem = size=2048; vif = mac=random,bridge=br0 - -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
* [dts] [PATCH v2 16/16] doc: add descriptions for multiple virtual machines module 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu ` (14 preceding siblings ...) 2018-01-10 0:11 ` [dts] [PATCH v2 15/16] conf/virt_global: add vm management related configuration Marvin Liu @ 2018-01-10 0:11 ` Marvin Liu 15 siblings, 0 replies; 34+ messages in thread From: Marvin Liu @ 2018-01-10 0:11 UTC (permalink / raw) To: dts; +Cc: Marvin Liu Signed-off-by: Marvin Liu <yong.liu@intel.com> diff --git a/doc/dts_gsg/index.rst b/doc/dts_gsg/index.rst index 05587af..fbd16dc 100644 --- a/doc/dts_gsg/index.rst +++ b/doc/dts_gsg/index.rst @@ -41,3 +41,4 @@ Getting Started Guide review virtualization scenario + multiple_vm diff --git a/doc/dts_gsg/multiple_vm.rst b/doc/dts_gsg/multiple_vm.rst new file mode 100644 index 0000000..85eca26 --- /dev/null +++ b/doc/dts_gsg/multiple_vm.rst @@ -0,0 +1,87 @@ +Multiple Virtual Machines Management +==================================== + +When managing multiple virtual machines, waiting around 2 minutes for each VM will be exhausted. So DTS imported parallel threads model into multiple VMs management scenario. + +.. note:: + Critical resources and actions which can't be handled in parallel have been protected by function level lock. + +Command arguments +----------------- + +Multiple VMs module support start VMs or send commands to VMs in parallel with specified arguments format. + +Arguments for "start" command: + +.. table:: + + +-----------------+----------------------------------+----------------+-------------+ + | name | Description | Default value | Must have | + | | | | | + +-----------------+----------------------------------+----------------+-------------+ + | name | virtual machine name | N/A | Yes | + +-----------------+----------------------------------+----------------+-------------+ + | dut_id | index of DUT | 0 | No | + +-----------------+----------------------------------+----------------+-------------+ + | autodetect_topo | whether detect network topology | False | No | + | | automatically | | | + +-----------------+----------------------------------+----------------+-------------+ + | virt_config | virtual machine config location | N/A | Alternative | + +-----------------+----------------------------------+----------------+-------------+ + | virt_params | local parameters of virutal | N/A | Alternative | + | | machine | | | + +-----------------+----------------------------------+----------------+-------------+ + +Arguments for "cmd" command: + +.. table:: + + +-----------------+----------------------------------+----------------+-------------+ + | name | Description | Default value | Must have | + | | | | | + +-----------------+----------------------------------+----------------+-------------+ + | name | virtual machine name | N/A | Yes | + +-----------------+----------------------------------+----------------+-------------+ + | dut_id | index of DUT | 0 | No | + +-----------------+----------------------------------+----------------+-------------+ + | commands | list of commands which will be | N/A | Yes | + | | sent into the vitual machine | | | + +-----------------+----------------------------------+----------------+-------------+ + | expects | list of expect output of the | N/A | Yes | + | | commands | | | + +-----------------+----------------------------------+----------------+-------------+ + | timeouts | list of timeout value of the | N/A | Yes | + | | commands | | | + +-----------------+----------------------------------+----------------+-------------+ + +.. note:: + If there's nothing expected for the command, still need to define expected string as blank + +Multiple module will catagorize and save the result value after all tasks have been done. Later users can retrieve the result by function get_parallel_result. + +Sample Code +----------- + +.. code-block:: console + + vm_task = MultipleVM(max_vm=self.VM_NUM, duts=self.duts) + + for dut_id in range(len(self.duts)): + for vm_idx in range(VM_NUM): + vm_name = "vm%d" % vm_idx + args = {'name': vm_name, + 'dut_id': dut_id, + 'autodetect_topo': False, + 'virt_params': { + 'qemu': [{'path': '/usr/local/bin/qemu-system-x86_64'}], + 'cpu': [{'model': 'host', 'number': '1', 'cpupin': ''}], + 'mem': [{'size': '1024', 'hugepage': 'yes'}], + 'disk': [{'file': '/storage/vm-image/%s.qcow2' % vm_name}], + 'login': [{'user': 'root', 'password': 'root'}], + 'device': None} + } + + vm_task.add_parallel_task(action="start", config=args) + + vm_task.do_parallel_task() + print vm_task.get_parallel_result() diff --git a/doc/dts_gsg/virtualization.rst b/doc/dts_gsg/virtualization.rst index 1848563..f1f658a 100644 --- a/doc/dts_gsg/virtualization.rst +++ b/doc/dts_gsg/virtualization.rst @@ -72,17 +72,29 @@ Enable dhcp on default host_connect interface. chkconfig --level 2345 network on -Install qemu guest agent for DTS monitor guest os. +For network access, should disable guest firewall service. + +.. code-block:: console + + systemctl disable firewalld.service + +QGA connection +"""""""""""""" + +Install qemu guest agent, DTS will manage virtual machine through QGA if control type is 'qga'. .. code-block:: console yum install qemu-guest-agent.x86_64 -For network access, should disable guest firewall service. +Console connection +"""""""""""""""""" + +Enable virtual machine serial console in kernel command line, DTS will manage virtual machine through serial port if control type is 'telnet' or 'socket'. .. code-block:: console - systemctl disable firewalld.service + console=ttyS0,115200 Suite Programing ---------------- @@ -125,7 +137,7 @@ Below is the brief view of the qemu parameters of vxlan sample virtual machine. [{'name': [{'name': 'vm0'}]}, {'enable_kvm': [{'enable': 'yes'}]}, - {'qga': [{'enable': 'yes'}]}, + {'control': [{'type': 'telnet'}]}, {'daemon': [{'enable': 'yes'}]}, {'monitor': [{'path': '/tmp/vm0_monitor.sock'}]}, {'net': [{'opt_addr': '1f', 'type': 'nic', 'opt_vlan': '0'}, {'type': 'user', 'opt_vlan': '0'}]}, @@ -310,34 +322,76 @@ Enable KVM DTS enable KVM full virtualization support as default. This option will significant improve the speed of virtual machine. -Qemu Guest Agent -"""""""""""""""" +Control (Qemu Guest Agent) +""""""""""""""""""""""""""" -Qemu monitor supply one method to interact with qemu process. DTS can monitor guest status by command supplied by qemu guest agent. Qemu guest agent is based on virtio-serial devices. +Qemu monitor supply one method to interact with qemu process. DTS can monitor guest status by command supplied by qemu guest agent. Qemu guest agent is based on virtio-serial devices. .. code-block:: console - -device virtio-serial -device virtserialport,chardev=vm_qga0,name=org.qemu.guest_agent.0 - -daemonize -monitor unix:/tmp/vm_monitor.sock,server,nowait + -device virtio-serial -device virtserialport,chardev=vm_qga0,name=org.qemu.guest_agent.0 + -daemonize -monitor unix:/tmp/vm_monitor.sock,server,nowait Check whether guest os has been started up. .. code-block:: console - qemu-ga-client address=/tmp/{vm_name}_qga0.sock ping 120 + qemu-ga-client address=/tmp/{vm_name}_qga0.sock ping 120 -.. note:: +.. note:: - We only wait two minutes for guest os start up. For guest os only has few hardware and we has disabled most services, so 2 minutes is enough. - This command will be return when guest os is ready, so DTS will not wait 2 minutes for each time. + We only wait two minutes for guest os start up. For guest os only has few hardware and we has disabled most services, so 2 minutes is enough. + This command will be return when guest os is ready, so DTS will not wait 2 minutes for each time. Check whether guest os default interface has been up. .. code-block:: console - qemu-ga-client address=/tmp/{vm_name}_qga0.sock ifconfig - -DTS will wait for guest os default interface upped and get auto dhcp address. After that DTS can connect to guest by ssh connections. + qemu-ga-client address=/tmp/{vm_name}_qga0.sock ifconfig + +DTS will wait for guest os default interface upped and get auto dhcp address. After that DTS can connect to guest by ssh connections. + +.. code-block:: console + + lo: + inet 127.0.0.1 netmask 255.0.0.0 + inet6 ::1 prefixlen 128 + host_connect: + inet 10.0.2.15 netmask 255.255.255.0 + inet6 fe80::200:ff:feb9:fed7 prefixlen 64 + ether 00:00:00:b9:fe:d7 + +Power down guest os. + +.. code-block:: console + + qemu-ga-client address=/tmp/{vm_name}_qga0.sock powerdown + +.. note:: + + For more information about qemu guest agent, please reference to http://wiki.qemu.org/Features/QAPI/GuestAgent. + +Control (Qemu Serial Port) +"""""""""""""""""""""""""" + +Qemu serial port is the default method to interact with guest OS. DTS can monitor guest status/manage guest network by serial port. + +.. code-block:: console + + -serial telnet::7000,server,nowait + +DTS will check the output from serial port and determine whether guest os has been started up. The prompt string for guest login session can be configured by parameter "start". + +.. code-block:: console + + start = + wait_seconds=120,login_timeout=60,login_prompt=login:,password_prompt=Password:; + +.. note:: + + Default timeout for guest OS start up is 2 minutes. For guest os only has few hardware and we has disabled most services, so 2 minutes is enough. If guest OS can't start up in 2 minutes, DTS will try to restart it once. + +DTS will check default interface upped and utilize dhcp to retrieve address. After that DTS can connect to guest by ssh connections. .. code-block:: console @@ -349,15 +403,15 @@ DTS will wait for guest os default interface upped and get auto dhcp address. Af inet6 fe80::200:ff:feb9:fed7 prefixlen 64 ether 00:00:00:b9:fe:d7 -Power down guest os. +Power down guest os by serial port. .. code-block:: console - qemu-ga-client address=/tmp/{vm_name}_qga0.sock powerdown + init 0 .. note:: - For more information about qemu guest agent, please reference to http://wiki.qemu.org/Features/QAPI/GuestAgent. + For more information about qemu serial port, please reference to https://qemu.weilnetz.de/doc/qemu-doc.html. Qemu Monitor """""""""""" @@ -395,10 +449,10 @@ Connection to monitor socket on DUT. For More detail information about qemu monitor. https://en.wikibooks.org/wiki/QEMU/Monitor#info -Qemu Machine -"""""""""" +Qemu Machine (Aarch64) +"""""""""""""""""""""" -DTS set default qemu machine type as virt for Aarch64. This option is mandatory for qemu-system-aarch64. +DTS need set default qemu machine type as virt for Aarch64. This option is mandatory for qemu-system-aarch64. Configured Parameters ~~~~~~~~~~~~~~~~~~~~~ -- 1.9.3 ^ permalink raw reply [flat|nested] 34+ messages in thread
end of thread, other threads:[~2018-01-10 7:18 UTC | newest] Thread overview: 34+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2018-01-08 2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 01/16] framework: add external thread pool library Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 03/16] framework/utils: support locks function in parallel model Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 04/16] framework: add DUT index support Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 05/16] framework/logger: optimize output format for child threads Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 06/16] framework/dts: support multiple VMs module Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 07/16] framework/debugger: " Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 08/16] framework/ssh_pexpect: " Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 09/16] framework/ssh_connection: support DUT index argument Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 10/16] framework/settings: add parallel related settings Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 11/16] framework/virt_resource: support multiple VMs module Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 13/16] framework/virt_dut: support multiple VMs module Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 14/16] framework/qemu_kvm: " Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 15/16] conf/virt_global: add vm management related configuration Marvin Liu 2018-01-08 2:49 ` [dts] [PATCH v1 16/16] doc: add descriptions for multiple virtual machine module Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu 2018-01-10 0:10 ` [dts] [PATCH v2 01/16] framework: add external thread pool library Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 03/16] framework/utils: support locks for parallel model Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 04/16] framework: add DUT index support Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 05/16] framework/logger: optimize output format for threads Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 06/16] framework/dts: support multiple VMs module Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 07/16] framework/debugger: " Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH 08/16] framework/ssh_pexpect: " Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 09/16] framework/ssh_connection: " Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 10/16] framework/settings: add parallel related settings Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 11/16] framework/virt_resource: support multiple VMs module Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 13/16] framework/virt_dut: support multiple VMs module Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 14/16] framework/qemu_kvm: " Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 15/16] conf/virt_global: add vm management related configuration Marvin Liu 2018-01-10 0:11 ` [dts] [PATCH v2 16/16] doc: add descriptions for multiple virtual machines module Marvin Liu
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).