test suite reviews and discussions
 help / color / mirror / Atom feed
From: LihongX Ma <lihongx.ma@intel.com>
To: dts@dpdk.org
Cc: LihongX Ma <lihongx.ma@intel.com>
Subject: [dts] [PATCH V1 1/2] framework: use the default python lib threadpool to maintain thread
Date: Fri, 11 Sep 2020 02:58:54 +0800	[thread overview]
Message-ID: <1599764335-1164-2-git-send-email-lihongx.ma@intel.com> (raw)
In-Reply-To: <1599764335-1164-1-git-send-email-lihongx.ma@intel.com>

the function in extra_libs/threadpool.py is similar to default python lib,
so use the default lib instead of it.

Signed-off-by: LihongX Ma <lihongx.ma@intel.com>
---
 extra_libs/threadpool.py | 426 -----------------------------------------------
 framework/dts.py         |   5 -
 2 files changed, 431 deletions(-)
 delete mode 100644 extra_libs/threadpool.py

diff --git a/extra_libs/threadpool.py b/extra_libs/threadpool.py
deleted file mode 100644
index 3839f26..0000000
--- a/extra_libs/threadpool.py
+++ /dev/null
@@ -1,426 +0,0 @@
-# -*- coding: UTF-8 -*-
-"""Easy to use object-oriented thread pool framework.
-
-A thread pool is an object that maintains a pool of worker threads to perform
-time consuming operations in parallel. It assigns jobs to the threads
-by putting them in a work request queue, where they are picked up by the
-next available thread. This then performs the requested operation in the
-background and puts the results in another queue.
-
-The thread pool object can then collect the results from all threads from
-this queue as soon as they become available or after all threads have
-finished their work. It's also possible, to define callbacks to handle
-each result as it comes in.
-
-The basic concept and some code was taken from the book "Python in a Nutshell,
-2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
-14.5 "Threaded Program Architecture". I wrapped the main program logic in the
-ThreadPool class, added the WorkRequest class and the callback system and
-tweaked the code here and there. Kudos also to Florent Aide for the exception
-handling mechanism.
-
-Basic usage::
-
-    >>> pool = ThreadPool(poolsize)
-    >>> requests = makeRequests(some_callable, list_of_args, callback)
-    >>> [pool.putRequest(req) for req in requests]
-    >>> pool.wait()
-
-See the end of the module code for a brief, annotated usage example.
-
-Website : http://chrisarndt.de/projects/threadpool/
-
-"""
-__docformat__ = "restructuredtext en"
-
-__all__ = [
-    'makeRequests',
-    'NoResultsPending',
-    'NoWorkersAvailable',
-    'ThreadPool',
-    'WorkRequest',
-    'WorkerThread'
-]
-
-__author__ = "Christopher Arndt"
-__version__ = '1.3.2'
-__license__ = "MIT license"
-
-
-# standard library modules
-import sys
-import threading
-import traceback
-
-try:
-    import Queue            # Python 2
-except ImportError:
-    import queue as Queue   # Python 3
-
-
-# exceptions
-class NoResultsPending(Exception):
-    """All work requests have been processed."""
-    pass
-
-class NoWorkersAvailable(Exception):
-    """No worker threads available to process remaining requests."""
-    pass
-
-
-# internal module helper functions
-def _handle_thread_exception(request, exc_info):
-    """Default exception handler callback function.
-
-    This just prints the exception info via ``traceback.print_exception``.
-
-    """
-    traceback.print_exception(*exc_info)
-
-
-# utility functions
-def makeRequests(callable_, args_list, callback=None,
-        exc_callback=_handle_thread_exception):
-    """Create several work requests for same callable with different arguments.
-
-    Convenience function for creating several work requests for the same
-    callable where each invocation of the callable receives different values
-    for its arguments.
-
-    ``args_list`` contains the parameters for each invocation of callable.
-    Each item in ``args_list`` should be either a 2-item tuple of the list of
-    positional arguments and a dictionary of keyword arguments or a single,
-    non-tuple argument.
-
-    See docstring for ``WorkRequest`` for info on ``callback`` and
-    ``exc_callback``.
-
-    """
-    requests = []
-    for item in args_list:
-        if isinstance(item, tuple):
-            requests.append(
-                WorkRequest(callable_, item[0], item[1], callback=callback,
-                    exc_callback=exc_callback)
-            )
-        else:
-            requests.append(
-                WorkRequest(callable_, [item], None, callback=callback,
-                    exc_callback=exc_callback)
-            )
-    return requests
-
-
-# classes
-class WorkerThread(threading.Thread):
-    """Background thread connected to the requests/results queues.
-
-    A worker thread sits in the background and picks up work requests from
-    one queue and puts the results in another until it is dismissed.
-
-    """
-
-    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
-        """Set up thread in daemonic mode and start it immediatedly.
-
-        ``requests_queue`` and ``results_queue`` are instances of
-        ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a
-        new worker thread.
-
-        """
-        threading.Thread.__init__(self, **kwds)
-        self.setDaemon(1)
-        self._requests_queue = requests_queue
-        self._results_queue = results_queue
-        self._poll_timeout = poll_timeout
-        self._dismissed = threading.Event()
-        self.start()
-
-    def run(self):
-        """Repeatedly process the job queue until told to exit."""
-        while True:
-            if self._dismissed.isSet():
-                # we are dismissed, break out of loop
-                break
-            # get next work request. If we don't get a new request from the
-            # queue after self._poll_timout seconds, we jump to the start of
-            # the while loop again, to give the thread a chance to exit.
-            try:
-                request = self._requests_queue.get(True, self._poll_timeout)
-            except Queue.Empty:
-                continue
-            else:
-                if self._dismissed.isSet():
-                    # we are dismissed, put back request in queue and exit loop
-                    self._requests_queue.put(request)
-                    break
-                try:
-                    result = request.callable(*request.args, **request.kwds)
-                    self._results_queue.put((request, result))
-                except:
-                    request.exception = True
-                    self._results_queue.put((request, sys.exc_info()))
-
-    def dismiss(self):
-        """Sets a flag to tell the thread to exit when done with current job.
-        """
-        self._dismissed.set()
-
-
-class WorkRequest:
-    """A request to execute a callable for putting in the request queue later.
-
-    See the module function ``makeRequests`` for the common case
-    where you want to build several ``WorkRequest`` objects for the same
-    callable but with different arguments for each call.
-
-    """
-
-    def __init__(self, callable_, args=None, kwds=None, requestID=None,
-            callback=None, exc_callback=_handle_thread_exception):
-        """Create a work request for a callable and attach callbacks.
-
-        A work request consists of the a callable to be executed by a
-        worker thread, a list of positional arguments, a dictionary
-        of keyword arguments.
-
-        A ``callback`` function can be specified, that is called when the
-        results of the request are picked up from the result queue. It must
-        accept two anonymous arguments, the ``WorkRequest`` object and the
-        results of the callable, in that order. If you want to pass additional
-        information to the callback, just stick it on the request object.
-
-        You can also give custom callback for when an exception occurs with
-        the ``exc_callback`` keyword parameter. It should also accept two
-        anonymous arguments, the ``WorkRequest`` and a tuple with the exception
-        details as returned by ``sys.exc_info()``. The default implementation
-        of this callback just prints the exception info via
-        ``traceback.print_exception``. If you want no exception handler
-        callback, just pass in ``None``.
-
-        ``requestID``, if given, must be hashable since it is used by
-        ``ThreadPool`` object to store the results of that work request in a
-        dictionary. It defaults to the return value of ``id(self)``.
-
-        """
-        if requestID is None:
-            self.requestID = id(self)
-        else:
-            try:
-                self.requestID = hash(requestID)
-            except TypeError:
-                raise TypeError("requestID must be hashable.")
-        self.exception = False
-        self.callback = callback
-        self.exc_callback = exc_callback
-        self.callable = callable_
-        self.args = args or []
-        self.kwds = kwds or {}
-
-    def __str__(self):
-        return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
-            (self.requestID, self.args, self.kwds, self.exception)
-
-class ThreadPool:
-    """A thread pool, distributing work requests and collecting results.
-
-    See the module docstring for more information.
-
-    """
-
-    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
-        """Set up the thread pool and start num_workers worker threads.
-
-        ``num_workers`` is the number of worker threads to start initially.
-
-        If ``q_size > 0`` the size of the work *request queue* is limited and
-        the thread pool blocks when the queue is full and it tries to put
-        more work requests in it (see ``putRequest`` method), unless you also
-        use a positive ``timeout`` value for ``putRequest``.
-
-        If ``resq_size > 0`` the size of the *results queue* is limited and the
-        worker threads will block when the queue is full and they try to put
-        new results in it.
-
-        .. warning:
-            If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
-            the possibilty of a deadlock, when the results queue is not pulled
-            regularly and too many jobs are put in the work requests queue.
-            To prevent this, always set ``timeout > 0`` when calling
-            ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
-
-        """
-        self._requests_queue = Queue.Queue(q_size)
-        self._results_queue = Queue.Queue(resq_size)
-        self.workers = []
-        self.dismissedWorkers = []
-        self.workRequests = {}
-        self.createWorkers(num_workers, poll_timeout)
-
-    def createWorkers(self, num_workers, poll_timeout=5):
-        """Add num_workers worker threads to the pool.
-
-        ``poll_timout`` sets the interval in seconds (int or float) for how
-        ofte threads should check whether they are dismissed, while waiting for
-        requests.
-
-        """
-        for i in range(num_workers):
-            self.workers.append(WorkerThread(self._requests_queue,
-                self._results_queue, poll_timeout=poll_timeout))
-
-    def dismissWorkers(self, num_workers, do_join=False):
-        """Tell num_workers worker threads to quit after their current task."""
-        dismiss_list = []
-        for i in range(min(num_workers, len(self.workers))):
-            worker = self.workers.pop()
-            worker.dismiss()
-            dismiss_list.append(worker)
-
-        if do_join:
-            for worker in dismiss_list:
-                worker.join()
-        else:
-            self.dismissedWorkers.extend(dismiss_list)
-
-    def joinAllDismissedWorkers(self):
-        """Perform Thread.join() on all worker threads that have been dismissed.
-        """
-        for worker in self.dismissedWorkers:
-            worker.join()
-        self.dismissedWorkers = []
-
-    def putRequest(self, request, block=True, timeout=None):
-        """Put work request into work queue and save its id for later."""
-        assert isinstance(request, WorkRequest)
-        # don't reuse old work requests
-        assert not getattr(request, 'exception', None)
-        self._requests_queue.put(request, block, timeout)
-        self.workRequests[request.requestID] = request
-
-    def poll(self, block=False):
-        """Process any new results in the queue."""
-        while True:
-            # still results pending?
-            if not self.workRequests:
-                raise NoResultsPending
-            # are there still workers to process remaining requests?
-            elif block and not self.workers:
-                raise NoWorkersAvailable
-            try:
-                # get back next results
-                request, result = self._results_queue.get(block=block)
-                # has an exception occured?
-                if request.exception and request.exc_callback:
-                    request.exc_callback(request, result)
-                # hand results to callback, if any
-                if request.callback and not \
-                       (request.exception and request.exc_callback):
-                    request.callback(request, result)
-                del self.workRequests[request.requestID]
-            except Queue.Empty:
-                break
-            except Exception as e:
-                traceback.print_exception(*sys.exc_info())
-                # unexpected thing happened, need further dedbugging
-                import pdb
-                pdb.set_trace()
-
-    def wait(self):
-        """Wait for results, blocking until all have arrived."""
-        while 1:
-            try:
-                self.poll(True)
-            except NoResultsPending:
-                break
-
-
-################
-# USAGE EXAMPLE
-################
-
-if __name__ == '__main__':
-    import random
-    import time
-
-    # the work the threads will have to do (rather trivial in our example)
-    def do_something(data):
-        time.sleep(random.randint(1,5))
-        result = round(random.random() * data, 5)
-        # just to show off, we throw an exception once in a while
-        if result > 5:
-            raise RuntimeError("Something extraordinary happened!")
-        return result
-
-    # this will be called each time a result is available
-    def print_result(request, result):
-        print("**** Result from request #%s: %r" % (request.requestID, result))
-
-    # this will be called when an exception occurs within a thread
-    # this example exception handler does little more than the default handler
-    def handle_exception(request, exc_info):
-        if not isinstance(exc_info, tuple):
-            # Something is seriously wrong...
-            print(request)
-            print(exc_info)
-            raise SystemExit
-        print("**** Exception occured in request #%s: %s" % \
-          (request.requestID, exc_info))
-
-    # assemble the arguments for each job to a list...
-    data = [random.randint(1,10) for i in range(20)]
-    # ... and build a WorkRequest object for each item in data
-    requests = makeRequests(do_something, data, print_result, handle_exception)
-    # to use the default exception handler, uncomment next line and comment out
-    # the preceding one.
-    #requests = makeRequests(do_something, data, print_result)
-
-    # or the other form of args_lists accepted by makeRequests: ((,), {})
-    data = [((random.randint(1,10),), {}) for i in range(20)]
-    requests.extend(
-        makeRequests(do_something, data, print_result, handle_exception)
-        #makeRequests(do_something, data, print_result)
-        # to use the default exception handler, uncomment next line and comment
-        # out the preceding one.
-    )
-
-    # we create a pool of 3 worker threads
-    print("Creating thread pool with 3 worker threads.")
-    main = ThreadPool(3)
-
-    # then we put the work requests in the queue...
-    for req in requests:
-        main.putRequest(req)
-        print("Work request #%s added." % req.requestID)
-    # or shorter:
-    # [main.putRequest(req) for req in requests]
-
-    # ...and wait for the results to arrive in the result queue
-    # by using ThreadPool.wait(). This would block until results for
-    # all work requests have arrived:
-    # main.wait()
-
-    # instead we can poll for results while doing something else:
-    i = 0
-    while True:
-        try:
-            time.sleep(0.5)
-            main.poll()
-            print("Main thread working...")
-            print("(active worker threads: %i)" % (threading.activeCount()-1, ))
-            if i == 10:
-                print("**** Adding 3 more worker threads...")
-                main.createWorkers(3)
-            if i == 20:
-                print("**** Dismissing 2 worker threads...")
-                main.dismissWorkers(2)
-            i += 1
-        except KeyboardInterrupt:
-            print("**** Interrupted!")
-            break
-        except NoResultsPending:
-            print("**** No pending results.")
-            break
-    if main.dismissedWorkers:
-        print("Joining all dismissed worker threads...")
-        main.joinAllDismissedWorkers()
diff --git a/framework/dts.py b/framework/dts.py
index 2d86bc3..1a08d6d 100644
--- a/framework/dts.py
+++ b/framework/dts.py
@@ -514,11 +514,6 @@ def run_all(config_file, pkgName, git, patch, skip_setup,
     if not os.path.exists(output_dir):
         os.mkdir(output_dir)
 
-    # add external library
-    exec_file = os.path.realpath(__file__)
-    extra_libs_path = exec_file.replace('framework/dts.py', '') + 'extra_libs'
-    sys.path.insert(1, extra_libs_path)
-
     # add python module search path
     sys.path.append(suite_dir)
 
-- 
2.7.4


  reply	other threads:[~2020-09-11  2:32 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-10 18:58 [dts] [PATCH V1 0/2] use the default python lib threadpool to maintain the pool of worker threads LihongX Ma
2020-09-10 18:58 ` LihongX Ma [this message]
2020-09-10 18:58 ` [dts] [PATCH V1 2/2] doc: add the required python lib - threadpool LihongX Ma
2020-09-15  8:36 ` [dts] [PATCH V1 0/2] use the default python lib threadpool to maintain the pool of worker threads Tu, Lijuan

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1599764335-1164-2-git-send-email-lihongx.ma@intel.com \
    --to=lihongx.ma@intel.com \
    --cc=dts@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).