Soft Patch Panel
 help / color / mirror / Atom feed
From: ogawa.yasufumi@lab.ntt.co.jp
To: spp@dpdk.org
Cc: ferruh.yigit@intel.com, ogawa.yasufumi@lab.ntt.co.jp
Subject: [spp] [PATCH 2/4] spp: add classes for thread management
Date: Fri,  8 Dec 2017 17:31:32 +0900	[thread overview]
Message-ID: <20171208083134.22735-2-ogawa.yasufumi@lab.ntt.co.jp> (raw)
In-Reply-To: <20171208083134.22735-1-ogawa.yasufumi@lab.ntt.co.jp>

From: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>

In spp.py, threads are created with global methods, primarythread,
connectionthread and acceptthread. It makes the code hard to understand.

This update defines classes for these threads as sub-classes of
'threading.Thread' and global methods are moved into run() of each of
the classes. This update also adds stop() in the classes to terminate
them clearly and fix bug for 'Socket is not connected' error occured
when called 'sock.shutdown(socket.SHUT_RDWR)'.

Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
---
 src/spp.py | 325 ++++++++++++++++++++++++++++++++++++-------------------------
 1 file changed, 190 insertions(+), 135 deletions(-)

diff --git a/src/spp.py b/src/spp.py
index 1ec2983..212d7ea 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -14,12 +14,12 @@ import socket
 import SocketServer
 import sys
 import threading
-from threading import Thread
+import traceback
 
 # Turn true if activate logger to debug remote command.
-logger = None
+logger = True
 
-if logger is not None:
+if logger is True:
     from logging import DEBUG
     from logging import Formatter
     from logging import getLogger
@@ -28,7 +28,7 @@ if logger is not None:
     handler = StreamHandler()
     handler.setLevel(DEBUG)
     formatter = Formatter(
-        '%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
+        '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s')
     handler.setFormatter(formatter)
     logger.setLevel(DEBUG)
     logger.addHandler(handler)
@@ -98,43 +98,61 @@ MAIN2SEC = GrowingList()
 SEC2MAIN = GrowingList()
 
 
-def connectionthread(name, client_id, conn, m2s, s2m):
-    """Manage secondary process connections"""
+class ConnectionThread(threading.Thread):
 
-    cmd_str = 'hello'
+    def __init__(self, client_id, conn, m2s, s2m):
+        super(ConnectionThread, self).__init__()
+        self.daemon = True
 
-    # infinite loop so that function do not terminate and thread do not end.
-    while True:
-        try:
-            _, _, _ = select.select([conn, ], [conn, ], [], 5)
-        except select.error:
-            break
+        self.client_id = client_id
+        self.conn = conn
+        self.m2s = m2s
+        self.s2m = s2m
+        self.stop_event = threading.Event()
+        self.conn_opened = False
 
-        # Sending message to connected secondary
-        try:
-            cmd_str = m2s.get(True)
-            conn.send(cmd_str)  # send only takes string
-        except KeyError:
-            break
-        except Exception as excep:
-            print(excep, ",Error while sending msg in connectionthread()!")
-            break
+    def stop(self):
+        self.stop_event.set()
 
-        # Receiving from secondary
-        try:
-            # 1024 stands for bytes of data to be received
-            data = conn.recv(1024)
-            if data:
-                s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
-            else:
-                s2m.put("closing:" + str(conn))
+    def run(self):
+        cmd_str = 'hello'
+
+        # infinite loop so that function do not terminate and thread do not
+        # end.
+        while True:
+            try:
+                _, _, _ = select.select(
+                    [self.conn, ], [self.conn, ], [], 5)
+            except select.error:
                 break
-        except Exception as excep:
-            print(excep, ",Error while receiving msg in connectionthread()!")
-            break
 
-    SECONDARY_LIST.remove(client_id)
-    conn.close()
+            # Sending message to connected secondary
+            try:
+                cmd_str = self.m2s.get(True)
+                self.conn.send(cmd_str)  # send only takes string
+            except KeyError:
+                break
+            except Exception as excep:
+                print(excep, ",Error while sending msg in connectionthread()!")
+                break
+
+            # Receiving from secondary
+            try:
+                # 1024 stands for bytes of data to be received
+                data = self.conn.recv(1024)
+                if data:
+                    self.s2m.put(
+                        "recv:%s:{%s}" % (str(self.conn.fileno()), data))
+                else:
+                    self.s2m.put("closing:" + str(self.conn))
+                    break
+            except Exception as excep:
+                print(
+                    excep, ",Error while receiving msg in connectionthread()!")
+                break
+
+        SECONDARY_LIST.remove(self.client_id)
+        self.conn.close()
 
 
 def getclientid(conn):
@@ -149,9 +167,12 @@ def getclientid(conn):
     if data is None:
         return -1
 
+    if logger is not None:
+        logger.debug("data: %s" % data)
     client_id = int(data.strip('\0'))
 
     if client_id < 0 or client_id > MAX_SECONDARY:
+        logger.debug("Failed to get client_id: %d" % client_id)
         return -1
 
     found = 0
@@ -175,6 +196,9 @@ def getclientid(conn):
             free_client_id = i
             break
 
+    if logger is not None:
+        logger.debug("Found free_client_id: %d" % free_client_id)
+
     if free_client_id < 0:
         return -1
 
@@ -184,37 +208,69 @@ def getclientid(conn):
     return free_client_id
 
 
-def acceptthread(sock, main2sec, sec2main):
-    """Listen for secondary processes"""
+class AcceptThread(threading.Thread):
 
-    global SECONDARY_COUNT
+    def __init__(self, host, port, main2sec, sec2main):
+        super(AcceptThread, self).__init__()
+        self.daemon = True
 
-    try:
-        while True:
-            # Accepting incoming connections
-            conn, _ = sock.accept()
+        # Creating secondary socket object
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-            client_id = getclientid(conn)
-            if client_id < 0:
-                break
+        # Binding secondary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
+
+        # Listening secondary at the address
+        self.sock.listen(MAX_SECONDARY)
+
+        self.main2sec = main2sec
+        self.sec2main = sec2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
+
+    def stop(self):
+        if self.sock_opened is True:
+            try:
+                self.sock.shutdown(socket.SHUT_RDWR)
+            except socket.error as excep:
+                print(excep, ", Error while closing sock in AcceptThread!")
+                traceback.print_exc()
+        self.sock.close()
+        self.stop_event.set()
 
-            # Creating new thread.
-            # Calling secondarythread function for this function and passing
-            # conn as argument.
-            SECONDARY_LIST.append(client_id)
-            main2sec[client_id] = Queue()
-            sec2main[client_id] = Queue()
-            connection_thread = Thread(target=connectionthread,
-                                       args=('secondary', client_id, conn,
-                                             main2sec[client_id],
-                                             sec2main[client_id]))
-            connection_thread.daemon = True
-            connection_thread.start()
-
-            SECONDARY_COUNT += 1
-    except Exception as excep:
-        print(excep, ", Error in acceptthread()!")
-        sock.close()
+    def run(self):
+        global SECONDARY_COUNT
+
+        try:
+            while True:
+                # Accepting incoming connections
+                conn, _ = self.sock.accept()
+
+                client_id = getclientid(conn)
+                if client_id < 0:
+                    break
+
+                # Creating new thread.
+                # Calling secondarythread function for this function and
+                # passing conn as argument.
+                SECONDARY_LIST.append(client_id)
+                self.main2sec[client_id] = Queue()
+                self.sec2main[client_id] = Queue()
+                connection_thread = ConnectionThread(
+                    client_id, conn,
+                    self.main2sec[client_id],
+                    self.sec2main[client_id])
+                connection_thread.daemon = True
+                connection_thread.start()
+
+                SECONDARY_COUNT += 1
+        except Exception as excep:
+            print(excep, ", Error in AcceptThread!")
+            traceback.print_exc()
+            self.sock_opened = False
+            self.sock.close()
 
 
 def command_primary(command):
@@ -266,49 +322,78 @@ def print_status():
         print ("Connected secondary id: %d" % i)
 
 
-def primarythread(sock, main2primary, primary2main):
-    """Manage primary process connection"""
+class PrimaryThread(threading.Thread):
 
-    global PRIMARY
-    cmd_str = ''
+    def __init__(self, host, port, main2primary, primary2main):
+        super(PrimaryThread, self).__init__()
+        self.daemon = True
 
-    while True:
-        # waiting for connection
-        PRIMARY = False
-        conn, addr = sock.accept()
-        PRIMARY = True
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        # Binding primary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
 
-        while conn:
-            try:
-                _, _, _ = select.select([conn, ], [conn, ], [], 5)
-            except select.error:
-                break
+        # Listening primary at the address
+        self.sock.listen(1)  # 5 denotes the number of clients can queue
 
-            # Sending message to connected primary
-            try:
-                cmd_str = main2primary.get(True)
-                conn.send(cmd_str)  # send only takes string
-            except KeyError:
-                break
-            except Exception as excep:
-                print(excep, ", Error while sending msg in primarythread()!")
-                break
+        self.main2primary = main2primary
+        self.primary2main = primary2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
 
-            # Receiving from primary
-            try:
-                # 1024 stands for bytes of data to be received
-                data = conn.recv(1024)
-                if data:
-                    primary2main.put("recv:%s:{%s}" % (str(addr), data))
-                else:
-                    primary2main.put("closing:" + str(addr))
-                    conn.close()
+    def stop(self):
+        if self.sock_opened is True:
+            self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+        self.stop_event.set()
+
+    def run(self):
+        global PRIMARY
+        cmd_str = ''
+
+        while True:
+            # waiting for connection
+            PRIMARY = False
+            conn, addr = self.sock.accept()
+            PRIMARY = True
+
+            while conn:
+                try:
+                    _, _, _ = select.select([conn, ], [conn, ], [], 5)
+                except select.error:
                     break
-            except Exception as excep:
-                print(excep, ", Error while receiving msg in primarythread()!")
-                break
 
-    print ("primary communication thread end")
+                self.sock_opened = True
+                # Sending message to connected primary
+                try:
+                    cmd_str = self.main2primary.get(True)
+                    conn.send(cmd_str)  # send only takes string
+                except KeyError:
+                    break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while sending msg in primarythread()!")
+                    break
+
+                # Receiving from primary
+                try:
+                    # 1024 stands for bytes of data to be received
+                    data = conn.recv(1024)
+                    if data:
+                        self.primary2main.put(
+                            "recv:%s:{%s}" % (str(addr), data))
+                    else:
+                        self.primary2main.put("closing:" + str(addr))
+                        conn.close()
+                        self.sock_opened = False
+                        break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while receiving msg in primarythread()!")
+                    break
 
 
 def close_all_secondary():
@@ -593,36 +678,11 @@ def main(argv):
     print('secondary port : %d' % secondary_port)
     print('management port : %d' % management_port)
 
-    # Creating primary socket object
-    primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding primary socket to a address. bind() takes tuple of host and port.
-    primary_sock.bind((host, primary_port))
-
-    # Listening primary at the address
-    primary_sock.listen(1)  # 5 denotes the number of clients can queue
-
-    primary_thread = Thread(target=primarythread,
-                            args=(primary_sock, MAIN2PRIMARY, PRIMARY2MAIN))
-    primary_thread.daemon = True
+    primary_thread = PrimaryThread(
+        host, primary_port, MAIN2PRIMARY, PRIMARY2MAIN)
     primary_thread.start()
 
-    # Creating secondary socket object
-    secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding secondary socket to a address. bind() takes tuple of host
-    # and port.
-    secondary_sock.bind((host, secondary_port))
-
-    # Listening secondary at the address
-    secondary_sock.listen(MAX_SECONDARY)
-
-    # secondary process handling thread
-    accept_thread = Thread(target=acceptthread,
-                           args=(secondary_sock, MAIN2SEC, SEC2MAIN))
-    accept_thread.daemon = True
+    accept_thread = AcceptThread(host, secondary_port, MAIN2SEC, SEC2MAIN)
     accept_thread.start()
 
     shell = Shell()
@@ -641,16 +701,11 @@ def main(argv):
     shell = None
 
     try:
-        primary_sock.shutdown(socket.SHUT_RDWR)
-        primary_sock.close()
-    except socket.error as excep:
-        print(excep, ", Error while closing primary_sock in main()!")
-
-    try:
-        secondary_sock.shutdown(socket.SHUT_RDWR)
-        secondary_sock.close()
+        primary_thread.stop()
+        accept_thread.stop()
     except socket.error as excep:
-        print(excep, ", Error while closing secondary_sock in main()!")
+        print(excep, ", Error while terminating threads in main()!")
+        traceback.print_exc()
 
 
 if __name__ == "__main__":
-- 
2.13.1

  reply	other threads:[~2017-12-08  8:31 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-12-08  8:31 [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 ogawa.yasufumi
2017-12-08  8:31 ` ogawa.yasufumi [this message]
2017-12-08 18:23 ` Ferruh Yigit

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20171208083134.22735-2-ogawa.yasufumi@lab.ntt.co.jp \
    --to=ogawa.yasufumi@lab.ntt.co.jp \
    --cc=ferruh.yigit@intel.com \
    --cc=spp@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).