Soft Patch Panel
 help / color / mirror / Atom feed
From: ogawa.yasufumi@lab.ntt.co.jp
To: ferruh.yigit@intel.com, spp@dpdk.org
Cc: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
Subject: [spp] [PATCH 02/13] controller: move connection threads
Date: Tue,  6 Mar 2018 19:50:44 +0900	[thread overview]
Message-ID: <20180306105055.65210-3-ogawa.yasufumi@lab.ntt.co.jp> (raw)
In-Reply-To: <20180306105055.65210-1-ogawa.yasufumi@lab.ntt.co.jp>

From: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>

There are three classes in 'spp.py' for managing connection as threads.
This update is for separating them from main file to improve
maintainability.

Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
---
 src/controller/conn_thread.py | 259 ++++++++++++++++++++++++++++++++++++++++++
 src/controller/spp.py         | 243 +--------------------------------------
 2 files changed, 261 insertions(+), 241 deletions(-)
 create mode 100644 src/controller/conn_thread.py

diff --git a/src/controller/conn_thread.py b/src/controller/conn_thread.py
new file mode 100644
index 0000000..7ba3b00
--- /dev/null
+++ b/src/controller/conn_thread.py
@@ -0,0 +1,259 @@
+from Queue import Queue
+import select
+import socket
+import spp_common
+import threading
+import traceback
+
+# Turn true if activate logger to debug remote command.
+logger = None
+
+if logger is True:
+    import logging
+    logger = logging.getLogger(__name__)
+    handler = logging.StreamHandler()
+    handler.setLevel(logging.DEBUG)
+    formatter = logging.Formatter(
+        '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s')
+    handler.setFormatter(formatter)
+    logger.setLevel(logging.DEBUG)
+    logger.addHandler(handler)
+
+
+class ConnectionThread(threading.Thread):
+    """Manage connection between controller and secondary"""
+
+    def __init__(self, client_id, conn):
+        super(ConnectionThread, self).__init__()
+        self.daemon = True
+
+        self.client_id = client_id
+        self.conn = conn
+        self.stop_event = threading.Event()
+        self.conn_opened = False
+
+    def stop(self):
+        self.stop_event.set()
+
+    def run(self):
+        cmd_str = 'hello'
+
+        # infinite loop so that function do not terminate and thread do not
+        # end.
+        while True:
+            try:
+                _, _, _ = select.select(
+                    [self.conn, ], [self.conn, ], [], 5)
+            except select.error:
+                break
+
+            # Sending message to connected secondary
+            try:
+                cmd_str = spp_common.MAIN2SEC[self.client_id].get(True)
+                self.conn.send(cmd_str)  # send only takes string
+            except KeyError:
+                break
+            except Exception as excep:
+                print(excep, ",Error while sending msg in connectionthread()!")
+                break
+
+            # Receiving from secondary
+            try:
+                # 1024 stands for bytes of data to be received
+                data = self.conn.recv(1024)
+                if data:
+                    spp_common.SEC2MAIN[self.client_id].put(
+                        "recv:%s:{%s}" % (str(self.conn.fileno()), data))
+                else:
+                    spp_common.SEC2MAIN[self.client_id].put(
+                        "closing:" + str(self.conn))
+                    break
+            except Exception as excep:
+                print(
+                    excep, ",Error while receiving msg in connectionthread()!")
+                break
+
+        spp_common.SECONDARY_LIST.remove(self.client_id)
+        self.conn.close()
+
+
+class AcceptThread(threading.Thread):
+    """Manage connection"""
+
+    def __init__(self, host, port):
+        super(AcceptThread, self).__init__()
+        self.daemon = True
+
+        # Creating secondary socket object
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+        # Binding secondary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
+
+        # Listening secondary at the address
+        self.sock.listen(spp_common.MAX_SECONDARY)
+
+        self.stop_event = threading.Event()
+        self.sock_opened = False
+
+    def getclientid(self, conn):
+        """Get client_id from client"""
+
+        try:
+            conn.send("_get_client_id")
+        except KeyError:
+            return -1
+
+        data = conn.recv(1024)
+        if data is None:
+            return -1
+
+        if logger is not None:
+            logger.debug("data: %s" % data)
+        client_id = int(data.strip('\0'))
+
+        if client_id < 0 or client_id > spp_common.MAX_SECONDARY:
+            logger.debug("Failed to get client_id: %d" % client_id)
+            return -1
+
+        found = 0
+        for i in spp_common.SECONDARY_LIST:
+            if client_id == i:
+                found = 1
+                break
+
+        if found == 0:
+            return client_id
+
+        # client_id in use, find a free one
+        free_client_id = -1
+        for i in range(spp_common.MAX_SECONDARY):
+            found = -1
+            for j in spp_common.SECONDARY_LIST:
+                if i == j:
+                    found = i
+                    break
+            if found == -1:
+                free_client_id = i
+                break
+
+        if logger is not None:
+            logger.debug("Found free_client_id: %d" % free_client_id)
+
+        if free_client_id < 0:
+            return -1
+
+        conn.send("_set_client_id %u" % free_client_id)
+        data = conn.recv(1024)
+
+        return free_client_id
+
+    def stop(self):
+        if self.sock_opened is True:
+            try:
+                self.sock.shutdown(socket.SHUT_RDWR)
+            except socket.error as excep:
+                print(excep, ", Error while closing sock in AcceptThread!")
+                traceback.print_exc()
+        self.sock.close()
+        self.stop_event.set()
+
+    def run(self):
+        try:
+            while True:
+                # Accepting incoming connections
+                conn, _ = self.sock.accept()
+
+                client_id = self.getclientid(conn)
+                if client_id < 0:
+                    break
+
+                # Creating new thread.
+                # Calling secondarythread function for this function and
+                # passing conn as argument.
+                spp_common.SECONDARY_LIST.append(client_id)
+                spp_common.MAIN2SEC[client_id] = Queue()
+                spp_common.SEC2MAIN[client_id] = Queue()
+                connection_thread = ConnectionThread(client_id, conn)
+                connection_thread.daemon = True
+                connection_thread.start()
+
+                spp_common.SECONDARY_COUNT += 1
+        except Exception as excep:
+            print(excep, ", Error in AcceptThread!")
+            traceback.print_exc()
+            self.sock_opened = False
+            self.sock.close()
+
+
+class PrimaryThread(threading.Thread):
+
+    def __init__(self, host, port):
+        super(PrimaryThread, self).__init__()
+        self.daemon = True
+
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        # Binding primary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
+
+        # Listening primary at the address
+        self.sock.listen(1)  # 5 denotes the number of clients can queue
+
+        self.stop_event = threading.Event()
+        self.sock_opened = False
+
+    def stop(self):
+        if self.sock_opened is True:
+            self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+        self.stop_event.set()
+
+    def run(self):
+        cmd_str = ''
+
+        while True:
+            # waiting for connection
+            spp_common.PRIMARY = False
+            conn, addr = self.sock.accept()
+            spp_common.PRIMARY = True
+
+            while conn:
+                try:
+                    _, _, _ = select.select([conn, ], [conn, ], [], 5)
+                except select.error:
+                    break
+
+                self.sock_opened = True
+                # Sending message to connected primary
+                try:
+                    cmd_str = spp_common.MAIN2PRIMARY.get(True)
+                    conn.send(cmd_str)  # send only takes string
+                except KeyError:
+                    break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while sending msg in primarythread()!")
+                    break
+
+                # Receiving from primary
+                try:
+                    # 1024 stands for bytes of data to be received
+                    data = conn.recv(1024)
+                    if data:
+                        spp_common.PRIMARY2MAIN.put(
+                            "recv:%s:{%s}" % (str(addr), data))
+                    else:
+                        spp_common.PRIMARY2MAIN.put("closing:" + str(addr))
+                        conn.close()
+                        self.sock_opened = False
+                        break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while receiving msg in primarythread()!")
+                    break
diff --git a/src/controller/spp.py b/src/controller/spp.py
index 0515193..d0d7bc9 100644
--- a/src/controller/spp.py
+++ b/src/controller/spp.py
@@ -4,8 +4,8 @@
 from __future__ import print_function
 
 import argparse
-from Queue import Queue
-import select
+from conn_thread import AcceptThread
+from conn_thread import PrimaryThread
 from shell import Shell
 import socket
 import SocketServer
@@ -58,245 +58,6 @@ class CmdRequestHandler(SocketServer.BaseRequestHandler):
             self.request.send("")
 
 
-class ConnectionThread(threading.Thread):
-    """Manage connection between controller and secondary"""
-
-    def __init__(self, client_id, conn):
-        super(ConnectionThread, self).__init__()
-        self.daemon = True
-
-        self.client_id = client_id
-        self.conn = conn
-        self.stop_event = threading.Event()
-        self.conn_opened = False
-
-    def stop(self):
-        self.stop_event.set()
-
-    def run(self):
-        cmd_str = 'hello'
-
-        # infinite loop so that function do not terminate and thread do not
-        # end.
-        while True:
-            try:
-                _, _, _ = select.select(
-                    [self.conn, ], [self.conn, ], [], 5)
-            except select.error:
-                break
-
-            # Sending message to connected secondary
-            try:
-                cmd_str = spp_common.MAIN2SEC[self.client_id].get(True)
-                self.conn.send(cmd_str)  # send only takes string
-            except KeyError:
-                break
-            except Exception as excep:
-                print(excep, ",Error while sending msg in connectionthread()!")
-                break
-
-            # Receiving from secondary
-            try:
-                # 1024 stands for bytes of data to be received
-                data = self.conn.recv(1024)
-                if data:
-                    spp_common.SEC2MAIN[self.client_id].put(
-                        "recv:%s:{%s}" % (str(self.conn.fileno()), data))
-                else:
-                    spp_common.SEC2MAIN[self.client_id].put(
-                        "closing:" + str(self.conn))
-                    break
-            except Exception as excep:
-                print(
-                    excep, ",Error while receiving msg in connectionthread()!")
-                break
-
-        spp_common.SECONDARY_LIST.remove(self.client_id)
-        self.conn.close()
-
-
-class AcceptThread(threading.Thread):
-    """Manage connection"""
-
-    def __init__(self, host, port):
-        super(AcceptThread, self).__init__()
-        self.daemon = True
-
-        # Creating secondary socket object
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-        # Binding secondary socket to a address. bind() takes tuple of host
-        # and port.
-        self.sock.bind((host, port))
-
-        # Listening secondary at the address
-        self.sock.listen(spp_common.MAX_SECONDARY)
-
-        self.stop_event = threading.Event()
-        self.sock_opened = False
-
-    def getclientid(self, conn):
-        """Get client_id from client"""
-
-        try:
-            conn.send("_get_client_id")
-        except KeyError:
-            return -1
-
-        data = conn.recv(1024)
-        if data is None:
-            return -1
-
-        if logger is not None:
-            logger.debug("data: %s" % data)
-        client_id = int(data.strip('\0'))
-
-        if client_id < 0 or client_id > spp_common.MAX_SECONDARY:
-            logger.debug("Failed to get client_id: %d" % client_id)
-            return -1
-
-        found = 0
-        for i in spp_common.SECONDARY_LIST:
-            if client_id == i:
-                found = 1
-                break
-
-        if found == 0:
-            return client_id
-
-        # client_id in use, find a free one
-        free_client_id = -1
-        for i in range(spp_common.MAX_SECONDARY):
-            found = -1
-            for j in spp_common.SECONDARY_LIST:
-                if i == j:
-                    found = i
-                    break
-            if found == -1:
-                free_client_id = i
-                break
-
-        if logger is not None:
-            logger.debug("Found free_client_id: %d" % free_client_id)
-
-        if free_client_id < 0:
-            return -1
-
-        conn.send("_set_client_id %u" % free_client_id)
-        data = conn.recv(1024)
-
-        return free_client_id
-
-    def stop(self):
-        if self.sock_opened is True:
-            try:
-                self.sock.shutdown(socket.SHUT_RDWR)
-            except socket.error as excep:
-                print(excep, ", Error while closing sock in AcceptThread!")
-                traceback.print_exc()
-        self.sock.close()
-        self.stop_event.set()
-
-    def run(self):
-        try:
-            while True:
-                # Accepting incoming connections
-                conn, _ = self.sock.accept()
-
-                client_id = self.getclientid(conn)
-                if client_id < 0:
-                    break
-
-                # Creating new thread.
-                # Calling secondarythread function for this function and
-                # passing conn as argument.
-                spp_common.SECONDARY_LIST.append(client_id)
-                spp_common.MAIN2SEC[client_id] = Queue()
-                spp_common.SEC2MAIN[client_id] = Queue()
-                connection_thread = ConnectionThread(client_id, conn)
-                connection_thread.daemon = True
-                connection_thread.start()
-
-                spp_common.SECONDARY_COUNT += 1
-        except Exception as excep:
-            print(excep, ", Error in AcceptThread!")
-            traceback.print_exc()
-            self.sock_opened = False
-            self.sock.close()
-
-
-class PrimaryThread(threading.Thread):
-
-    def __init__(self, host, port):
-        super(PrimaryThread, self).__init__()
-        self.daemon = True
-
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        # Binding primary socket to a address. bind() takes tuple of host
-        # and port.
-        self.sock.bind((host, port))
-
-        # Listening primary at the address
-        self.sock.listen(1)  # 5 denotes the number of clients can queue
-
-        self.stop_event = threading.Event()
-        self.sock_opened = False
-
-    def stop(self):
-        if self.sock_opened is True:
-            self.sock.shutdown(socket.SHUT_RDWR)
-        self.sock.close()
-        self.stop_event.set()
-
-    def run(self):
-        cmd_str = ''
-
-        while True:
-            # waiting for connection
-            spp_common.PRIMARY = False
-            conn, addr = self.sock.accept()
-            spp_common.PRIMARY = True
-
-            while conn:
-                try:
-                    _, _, _ = select.select([conn, ], [conn, ], [], 5)
-                except select.error:
-                    break
-
-                self.sock_opened = True
-                # Sending message to connected primary
-                try:
-                    cmd_str = spp_common.MAIN2PRIMARY.get(True)
-                    conn.send(cmd_str)  # send only takes string
-                except KeyError:
-                    break
-                except Exception as excep:
-                    print(
-                        excep,
-                        ", Error while sending msg in primarythread()!")
-                    break
-
-                # Receiving from primary
-                try:
-                    # 1024 stands for bytes of data to be received
-                    data = conn.recv(1024)
-                    if data:
-                        spp_common.PRIMARY2MAIN.put(
-                            "recv:%s:{%s}" % (str(addr), data))
-                    else:
-                        spp_common.PRIMARY2MAIN.put("closing:" + str(addr))
-                        conn.close()
-                        self.sock_opened = False
-                        break
-                except Exception as excep:
-                    print(
-                        excep,
-                        ", Error while receiving msg in primarythread()!")
-                    break
-
-
 def main(argv):
     """main"""
 
-- 
2.13.1

  parent reply	other threads:[~2018-03-06 10:51 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-03-06 10:50 [spp] [PATCH 00/13] Change structure of SPP controller ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 01/13] spp: move controller to sub directory ogawa.yasufumi
2018-03-06 10:50 ` ogawa.yasufumi [this message]
2018-03-06 10:50 ` [spp] [PATCH 03/13] controller: aggregate logger to spp_common.py ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 04/13] controller: add load command ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 05/13] controller: move common methods to shell_lib ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 06/13] controller: add filter for py to compl_common ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 07/13] controller: refactor shell.py ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 08/13] controller: change logger output to logfile ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 09/13] controller: add do_topo to shell.py ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 10/13] controller: add topo.py ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 11/13] controller: add topo_subgraph command ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 12/13] controller: add cat and less command ogawa.yasufumi
2018-03-06 10:50 ` [spp] [PATCH 13/13] controller: create log directory ogawa.yasufumi
2018-03-27 23:41 ` [spp] [PATCH 00/13] Change structure of SPP controller Ferruh Yigit

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180306105055.65210-3-ogawa.yasufumi@lab.ntt.co.jp \
    --to=ogawa.yasufumi@lab.ntt.co.jp \
    --cc=ferruh.yigit@intel.com \
    --cc=spp@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).