test suite reviews and discussions
 help / color / mirror / Atom feed
From: Marvin Liu <yong.liu@intel.com>
To: dts@dpdk.org
Cc: Marvin Liu <yong.liu@intel.com>
Subject: [dts] [PATCH v1 02/16] framework/multiple_vm: add multiple VM management module
Date: Sun,  7 Jan 2018 21:49:15 -0500	[thread overview]
Message-ID: <1515379769-11553-3-git-send-email-yong.liu@intel.com> (raw)
In-Reply-To: <1515379769-11553-1-git-send-email-yong.liu@intel.com>

Support parallel VMs start and executions in multiple threads. In the
init function, will create thread pool instance. With add parallel task
function, command 'start'|'cmd'|'migration'|'stop' related tasks will be
added into task pool. If call function do_parallel_task, all tasks will
be retrieved and do in parallel threads. If that function return, mean that
all threads have been done and results saved.

Signed-off-by: Marvin Liu <yong.liu@intel.com>

diff --git a/framework/multiple_vm.py b/framework/multiple_vm.py
new file mode 100644
index 0000000..04de6e4
--- /dev/null
+++ b/framework/multiple_vm.py
@@ -0,0 +1,304 @@
+#!/usr/bin/python
+import time
+import re
+import threadpool
+import traceback
+import threading
+from settings import DTS_ERR_TBL, save_global_setting, DTS_PARALLEL_SETTING
+from utils import RED
+from logger import getLogger
+
+
+class MultipleVM(object):
+    """
+    Module for handle VM related actions in parallel on mulitple DUTs
+    Supported actions: [start|command|migration]
+    Param max_vm: maximum number of threads
+    Param duts: list of DUT objects
+    """
+    def __init__(self, max_vm, duts):
+        self.max_vm = max_vm
+        self.duts = duts
+        self.pool = threadpool.ThreadPool(max_vm)
+        self.pool_result = [dict() for _ in duts]
+        self._pool_requests = list()
+        self._pool_executors = dict()
+        self.logger = getLogger("multiple_vm")
+
+        self.logger.info("Created MultipleVM instance with %d DUTs and %d VMs" % (len(duts), max_vm))
+
+    def parallel_vm_start(self, args):
+        """
+        Start VMs in parallel.
+        Args format:
+        {
+            'name': 'VM0',
+            'dut_id': 1,
+            'autodetect_topo': False,
+            'virt_config': { 'suite_name': '',
+                             'vm_name': '',
+                           }
+            'virt_params' : {
+                    'qemu': [{'path': '/usr/local/bin/qemu-system-x86_64'},
+                    'cpu': [{'model': 'host', 'number': '1'},
+                    'mem': [{'size': '1024', 'hugepage': 'yes'},
+                    'disk': [{'file': 'vm0.qcow2'},
+                    'login': [{'user': 'root', 'password': 'root'},
+                    'vnc' : [{'displayNum': '2'},
+                    'device': [{'driver': 'vfio-pci', 'opt_host': '0000:82:00.1'}]
+                              [{'driver': 'vhost-user', 'opt_path': '/tmp/vhost-user-0',
+                               'opt_mac': '',
+                               'opt_legacy': 'on' | 'off'}],
+                    'migration': [{'enable': 'yes'}]
+            }
+
+            'command': ''
+        }
+
+        return format:
+        {
+            'name': 'VM0',
+            'dut_id' : 1,
+            'vm_obj': vm_obj
+        }
+        """
+
+        result = {}
+        vm_name = args['name']
+        dut_id = args['dut_id']
+
+        if 'autodetect_topo' in args:
+            autodetect_topo = args['autodetect_topo']
+        else:
+            autodetect_topo = True
+
+        self.logger.info("Parallel task start for DUT%d %s" % (dut_id, vm_name))
+        threading.current_thread().name = vm_name
+
+        from qemu_kvm import QEMUKvm
+        # VM configured by configuration file 
+        if 'virt_config' in args:
+            suite_name = args['virt_config']['suite_name']
+            vm_name = args['virt_config']['vm_name']
+            vm_obj = QEMUKvm(self.duts[dut_id], vm_name, suite_name)
+            if 'virt_params' in args:
+                virt_params = args['virt_params']
+            else:
+                virt_params = dict()
+        else:
+            # VM configured by parameters
+            vm_obj = QEMUKvm(self.duts[dut_id], vm_name, 'multi_vm')
+            virt_params = args['virt_params']
+            # just save config, should be list
+            vm_obj.set_local_config([virt_params])
+
+        vm_dut = None
+
+        if vm_obj.check_alive():
+            self.logger.debug("Check VM[%s] is alive" % vm_name)
+            vm_obj.attach()
+            self.logger.debug("VM[%s] attach is done" % vm_name)
+            if 'migration' in virt_params:
+                self.logger.debug("Immigrated VM[%s] is ready" % vm_name)
+            else:
+                vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo)
+                self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name)
+        else:
+            vm_obj.quick_start()
+            self.duts[dut_id].logger.debug("VM[%s] quick start is done" % vm_name)
+            if 'migration' in virt_params:
+                self.logger.debug("Immigrated VM[%s] is ready" % vm_name)
+            else:
+                vm_obj._check_vm_status()
+                self.logger.debug("VM[%s] check status is done" % vm_name)
+                vm_dut = vm_obj.instantiate_vm_dut(autodetect_topo=autodetect_topo)
+                self.logger.debug("VM[%s] instantiate vm dut is done" % vm_name)
+
+        result['name'] = vm_name
+        result['dut_id'] = dut_id
+        result['vm_obj'] = vm_obj
+        result['vm_dut'] = vm_dut
+        self.logger.info("Parallel task DUT%d %s Done and returned" % (dut_id, vm_name))
+        return result
+
+    def parallel_vm_stop(self, args):
+        NotImplemented
+
+    def parallel_vm_command(self, args):
+        """
+        Run commands in parallel.
+        Args format:
+        {
+            'name': 'vm1',
+            'vm_dut': self.vm_dut,
+            'dut_id': 0,
+            'commands': ['cd dpdk', 'make install T=x86_64-native-linuxapp-gcc'],
+            'expects': ['#', "#"],
+            'timeouts': [5, 120],
+        }
+        """
+        result = {}
+        vm_name = args['name']
+        vm_dut = args['vm_dut']
+        dut_id = args['dut_id']
+        commands = args['commands']
+        expects = args['expects']
+        timeouts = args['timeouts']
+        outputs = []
+
+        if 'delay' in args:
+            time.sleep(args['delay'])
+
+        self.logger.debug("Parallel task start for DUT%d %s" % (dut_id, vm_name))
+
+        combinations = zip(commands, expects, timeouts)
+        for combine in combinations:
+            command, expect, timeout = combine
+            # timeout value need enlarge if vm number increased
+            add_time = int(self.max_vm * 0.5)
+            timeout += add_time
+            if len(expect) == 0:
+                output = vm_dut.send_command(command, timeout)
+            else:
+                output = vm_dut.send_expect(command, expect, timeout)
+            outputs.append(output)
+
+        result['name'] = vm_name
+        result['dut_id'] = dut_id
+        result['outputs'] = outputs
+        self.logger.debug("Parallel task for DUT%d %s has been done and returned" % (dut_id, vm_name))
+
+        return result
+
+    def parallel_vm_migration(self, args):
+        """
+        Do vm migartion action in parallel.
+        Args format:
+        {
+            'name': 'vm1',
+            'vm_obj': self.vm_obj,
+            'remote_ip': host2_ip,
+            'migrage_port': 6666,
+        }
+        """
+        result = {}
+        vm_name = args['name']
+        vm_obj = args['vm_obj']
+        dut_id = args['dut_id']
+        remote_ip = args['remote_ip']
+        migrate_port = args['migrate_port']
+
+        vm_obj.start_migration(remote_ip, migrate_port)
+        vm_obj.wait_migration_done()
+
+        result['name'] = vm_name
+        result['dut_id'] = dut_id
+
+        return result
+
+    def save_result(self, request, result):
+        """
+        Save result in local variable, will be used later
+        """
+        self.pool_result[result['dut_id']][result['name']] = result
+        self.pool_result[result['dut_id']][result['name']]['status'] = 0
+
+    def handle_vm_exception(self, request, exc_info):
+        """
+        Handle exception when do parallel task
+        should check vm status in this function
+        """
+        if not isinstance(exc_info, tuple):
+            # Something is seriously wrong...
+            print(request)
+            print(exc_info)
+            raise SystemExit
+
+        # print traceback info for exception
+        name = request.args[0]['name']
+        self.logger.error(("**** Exception occured DUT%d:%s" % (request.args[0]['dut_id'], name)))
+        exc_type, exc_value, exc_traceback = exc_info
+        self.logger.error(repr(traceback.format_tb(exc_traceback)))
+
+        result = {'name': name, 'dut_id': request.args[0]['dut_id']}
+        self.pool_result[result['dut_id']][result['name']] = result
+        self.pool_result[result['dut_id']][result['name']]['status'] = DTS_ERR_TBL["PARALLEL_EXECUTE_ERR"]
+
+    def add_parallel_task(self, action, config):
+        """
+        Add task into parallel pool, will call corresponding function later
+        based on action type.
+        """
+        if action == "start":
+            task = self.parallel_vm_start
+            data = config
+        elif action == "stop":
+            task = self.parallel_vm_stop
+            data = config['name']
+        elif action == "cmd":
+            # just string command by now
+            task = self.parallel_vm_command
+            data = config
+        elif action == "migration":
+            task = self.parallel_vm_migration
+            data = config
+
+        # due to threadpool request, one item
+        request = threadpool.makeRequests(task, [data], self.save_result, self.handle_vm_exception)
+        self._pool_requests.extend(request)
+
+    def do_parallel_task(self):
+        """
+        Do configured tasks in parallel, will return if all tasks finished
+        """
+        # set parallel mode
+        save_global_setting(DTS_PARALLEL_SETTING, 'yes')
+
+        self.pool_result = [dict() for _ in self.duts]
+        for req in self._pool_requests:
+            self.pool.putRequest(req)
+
+        self.logger.info("All parallel tasks start at %s" % time.ctime())
+        # clean the request queue
+        self._pool_requests = list()
+
+        while True:
+            try:
+                time.sleep(0.5)
+                self.pool.poll()
+            except threadpool.NoResultsPending:
+                self.logger.info("All parallel tasks have been done at %s" % time.ctime())
+                break
+            except Exception as e:
+                self.logger.error("Met exception %s" % (str(e)))
+                break
+
+        # clear pool related queues, clean thread
+        self.pool._requests_queue.queue.clear()
+        self.pool._results_queue.queue.clear()
+
+        time.sleep(2)
+
+        # exit from parallel mode
+        save_global_setting(DTS_PARALLEL_SETTING, 'no')
+
+    def get_parallel_result(self):
+        """
+        Return result information for this parallel task
+        """
+        return self.pool_result
+
+    def list_threads(self):
+        main_thread = threading.currentThread()
+        for t in threading.enumerate():
+            if t is main_thread:
+                continue
+            self.logger.error("thread [%s] is still activing" % t.getName())
+
+    def destroy_parallel(self):
+        """
+        Destroy created threads otherwise threads may can't created
+        """
+        self.pool.dismissWorkers(self.max_vm, do_join=True)
+        self.pool.wait()
+        self.list_threads()
-- 
1.9.3

  parent reply	other threads:[~2018-01-08  9:56 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-01-08  2:49 [dts] [PATCH v1 00/16] Support parallel multiple virtual machine management Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 01/16] framework: add external thread pool library Marvin Liu
2018-01-08  2:49 ` Marvin Liu [this message]
2018-01-08  2:49 ` [dts] [PATCH v1 03/16] framework/utils: support locks function in parallel model Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 04/16] framework: add DUT index support Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 05/16] framework/logger: optimize output format for child threads Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 06/16] framework/dts: support multiple VMs module Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 07/16] framework/debugger: " Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 08/16] framework/ssh_pexpect: " Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 09/16] framework/ssh_connection: support DUT index argument Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 10/16] framework/settings: add parallel related settings Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 11/16] framework/virt_resource: support multiple VMs module Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 13/16] framework/virt_dut: support multiple VMs module Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 14/16] framework/qemu_kvm: " Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 15/16] conf/virt_global: add vm management related configuration Marvin Liu
2018-01-08  2:49 ` [dts] [PATCH v1 16/16] doc: add descriptions for multiple virtual machine module Marvin Liu
2018-01-10  0:10 ` [dts] [PATCH v2 00/16] Support parallel multiple virtual machines management Marvin Liu
2018-01-10  0:10   ` [dts] [PATCH v2 01/16] framework: add external thread pool library Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 02/16] framework/multiple_vm: add multiple VM management module Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 03/16] framework/utils: support locks for parallel model Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 04/16] framework: add DUT index support Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 05/16] framework/logger: optimize output format for threads Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 06/16] framework/dts: support multiple VMs module Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 07/16] framework/debugger: " Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH 08/16] framework/ssh_pexpect: " Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 09/16] framework/ssh_connection: " Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 10/16] framework/settings: add parallel related settings Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 11/16] framework/virt_resource: support multiple VMs module Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 12/16] framework/virt_base: add attach/quick start/quit function for VM management Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 13/16] framework/virt_dut: support multiple VMs module Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 14/16] framework/qemu_kvm: " Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 15/16] conf/virt_global: add vm management related configuration Marvin Liu
2018-01-10  0:11   ` [dts] [PATCH v2 16/16] doc: add descriptions for multiple virtual machines module Marvin Liu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1515379769-11553-3-git-send-email-yong.liu@intel.com \
    --to=yong.liu@intel.com \
    --cc=dts@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).