Soft Patch Panel
 help / color / mirror / Atom feed
* [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8
@ 2017-12-08  8:31 ogawa.yasufumi
  2017-12-08  8:31 ` [spp] [PATCH 2/4] spp: add classes for thread management ogawa.yasufumi
  2017-12-08 18:23 ` [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 Ferruh Yigit
  0 siblings, 2 replies; 3+ messages in thread
From: ogawa.yasufumi @ 2017-12-08  8:31 UTC (permalink / raw)
  To: spp; +Cc: ferruh.yigit, ogawa.yasufumi

From: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>

This update is for revising coding style to pep8. Syntax is checked
with flake8[1].

[1] http://flake8.pycqa.org/en/latest/

Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
---
 src/spp.py | 211 +++++++++++++++++++++++++++++++++----------------------------
 1 file changed, 116 insertions(+), 95 deletions(-)

diff --git a/src/spp.py b/src/spp.py
index ce8aff0..1ec2983 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -2,33 +2,36 @@
 """Soft Patch Panel"""
 
 from __future__ import print_function
-from Queue import Queue, Empty
-from thread import start_new_thread
-from threading import Thread
-import cmd
+
 import argparse
+import cmd
+import json
+from Queue import Empty
+from Queue import Queue
+import re
 import select
 import socket
-import sys
-import re
-#import pdb; pdb.set_trace()
-
 import SocketServer
-import readline
+import sys
 import threading
-import json
+from threading import Thread
 
+# Turn true if activate logger to debug remote command.
 logger = None
 
-# Comment out to activate debug logging
-#from logging import getLogger,StreamHandler,Formatter,DEBUG
-#logger = getLogger(__name__)
-#handler = StreamHandler()
-#handler.setLevel(DEBUG)
-#formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
-#handler.setFormatter(formatter)
-#logger.setLevel(DEBUG)
-#logger.addHandler(handler)
+if logger is not None:
+    from logging import DEBUG
+    from logging import Formatter
+    from logging import getLogger
+    from logging import StreamHandler
+    logger = getLogger(__name__)
+    handler = StreamHandler()
+    handler.setLevel(DEBUG)
+    formatter = Formatter(
+        '%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
+    handler.setFormatter(formatter)
+    logger.setLevel(DEBUG)
+    logger.addHandler(handler)
 
 
 CMD_OK = "OK"
@@ -40,6 +43,7 @@ RCMD_EXECUTE_QUEUE = Queue()
 RCMD_RESULT_QUEUE = Queue()
 REMOTE_COMMAND = "RCMD"
 
+
 class CmdRequestHandler(SocketServer.BaseRequestHandler):
     """Request handler for getting message from remote entities"""
 
@@ -56,15 +60,15 @@ class CmdRequestHandler(SocketServer.BaseRequestHandler):
             CmdRequestHandler.CMD.onecmd(self.data)
             ret = RCMD_RESULT_QUEUE.get()
             if (ret is not None):
-                if logger != None:
+                if logger is not None:
                     logger.debug("ret:%s" % ret)
                 self.request.send(ret)
             else:
-                if logger != None:
+                if logger is not None:
                     logger.debug("ret is none")
                 self.request.send("")
         else:
-            if logger != None:
+            if logger is not None:
                 logger.debug("CMD is None")
             self.request.send("")
 
@@ -85,52 +89,54 @@ PRIMARY = ''
 SECONDARY_LIST = []
 SECONDARY_COUNT = 0
 
-#init primary comm channel
+# init primary comm channel
 MAIN2PRIMARY = Queue()
 PRIMARY2MAIN = Queue()
 
-#init secondary comm channel list
+# init secondary comm channel list
 MAIN2SEC = GrowingList()
 SEC2MAIN = GrowingList()
 
+
 def connectionthread(name, client_id, conn, m2s, s2m):
     """Manage secondary process connections"""
 
     cmd_str = 'hello'
 
-    #infinite loop so that function do not terminate and thread do not end.
+    # infinite loop so that function do not terminate and thread do not end.
     while True:
         try:
-            _, _, _ = select.select([conn,], [conn,], [], 5)
+            _, _, _ = select.select([conn, ], [conn, ], [], 5)
         except select.error:
             break
 
-        #Sending message to connected secondary
+        # Sending message to connected secondary
         try:
             cmd_str = m2s.get(True)
-            conn.send(cmd_str) #send only takes string
+            conn.send(cmd_str)  # send only takes string
         except KeyError:
             break
-        except Exception, excep:
+        except Exception as excep:
             print(excep, ",Error while sending msg in connectionthread()!")
             break
 
-        #Receiving from secondary
+        # Receiving from secondary
         try:
-            data = conn.recv(1024) # 1024 stands for bytes of data to be received
+            # 1024 stands for bytes of data to be received
+            data = conn.recv(1024)
             if data:
-                #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}")
                 s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
             else:
                 s2m.put("closing:" + str(conn))
                 break
-        except Exception, excep:
+        except Exception as excep:
             print(excep, ",Error while receiving msg in connectionthread()!")
             break
 
     SECONDARY_LIST.remove(client_id)
     conn.close()
 
+
 def getclientid(conn):
     """Get client_id from client"""
 
@@ -140,7 +146,7 @@ def getclientid(conn):
         return -1
 
     data = conn.recv(1024)
-    if data == None:
+    if data is None:
         return -1
 
     client_id = int(data.strip('\0'))
@@ -177,6 +183,7 @@ def getclientid(conn):
 
     return free_client_id
 
+
 def acceptthread(sock, main2sec, sec2main):
     """Listen for secondary processes"""
 
@@ -184,29 +191,32 @@ def acceptthread(sock, main2sec, sec2main):
 
     try:
         while True:
-            #Accepting incoming connections
+            # Accepting incoming connections
             conn, _ = sock.accept()
 
             client_id = getclientid(conn)
             if client_id < 0:
                 break
 
-            #Creating new thread.
-            #Calling secondarythread function for this function and passing
-            #conn as argument.
-
+            # Creating new thread.
+            # Calling secondarythread function for this function and passing
+            # conn as argument.
             SECONDARY_LIST.append(client_id)
             main2sec[client_id] = Queue()
             sec2main[client_id] = Queue()
-            start_new_thread(connectionthread,
-                             ('secondary', client_id, conn,
-                              main2sec[client_id],
-                              sec2main[client_id], ))
+            connection_thread = Thread(target=connectionthread,
+                                       args=('secondary', client_id, conn,
+                                             main2sec[client_id],
+                                             sec2main[client_id]))
+            connection_thread.daemon = True
+            connection_thread.start()
+
             SECONDARY_COUNT += 1
-    except Exception, excep:
+    except Exception as excep:
         print(excep, ", Error in acceptthread()!")
         sock.close()
 
+
 def command_primary(command):
     """Send command to primary process"""
 
@@ -220,6 +230,7 @@ def command_primary(command):
         print (recv)
         return CMD_NOTREADY, recv
 
+
 def command_secondary(sec_id, command):
     """Send command to secondary process with sec_id"""
 
@@ -233,6 +244,7 @@ def command_secondary(sec_id, command):
         print(message)
         return CMD_NOTREADY, message
 
+
 def get_status():
     secondary = []
     for i in SECONDARY_LIST:
@@ -243,6 +255,7 @@ def get_status():
         }
     return stat
 
+
 def print_status():
     """Display information about connected clients"""
 
@@ -252,6 +265,7 @@ def print_status():
     for i in SECONDARY_LIST:
         print ("Connected secondary id: %d" % i)
 
+
 def primarythread(sock, main2primary, primary2main):
     """Manage primary process connection"""
 
@@ -259,43 +273,44 @@ def primarythread(sock, main2primary, primary2main):
     cmd_str = ''
 
     while True:
-        #waiting for connection
+        # waiting for connection
         PRIMARY = False
         conn, addr = sock.accept()
         PRIMARY = True
 
         while conn:
             try:
-                _, _, _ = select.select([conn,], [conn,], [], 5)
+                _, _, _ = select.select([conn, ], [conn, ], [], 5)
             except select.error:
                 break
 
-            #Sending message to connected primary
+            # Sending message to connected primary
             try:
                 cmd_str = main2primary.get(True)
-                conn.send(cmd_str) #send only takes string
+                conn.send(cmd_str)  # send only takes string
             except KeyError:
                 break
-            except Exception, excep:
+            except Exception as excep:
                 print(excep, ", Error while sending msg in primarythread()!")
                 break
 
-            #Receiving from primary
+            # Receiving from primary
             try:
-                data = conn.recv(1024) # 1024 stands for bytes of data to be received
+                # 1024 stands for bytes of data to be received
+                data = conn.recv(1024)
                 if data:
-                    #primary2main.put("recv:" + str(addr) + ":" + "{" + data + "}")
                     primary2main.put("recv:%s:{%s}" % (str(addr), data))
                 else:
                     primary2main.put("closing:" + str(addr))
                     conn.close()
                     break
-            except Exception, excep:
+            except Exception as excep:
                 print(excep, ", Error while receiving msg in primarythread()!")
                 break
 
     print ("primary communication thread end")
 
+
 def close_all_secondary():
     """Exit all secondary processes"""
 
@@ -308,6 +323,7 @@ def close_all_secondary():
         command_secondary(i, 'exit')
     SECONDARY_COUNT = 0
 
+
 def check_sec_cmds(cmds):
     """Validate secondary commands before sending"""
 
@@ -338,15 +354,15 @@ def check_sec_cmds(cmds):
 
     return valid
 
+
 def clean_sec_cmd(cmdstr):
     """remove unwanted spaces to avoid invalid command error"""
-     
+
     tmparg = re.sub(r'\s+', " ", cmdstr)
     res = re.sub(r'\s?;\s?', ";", tmparg)
     return res
 
 
-
 class Shell(cmd.Cmd):
     """SPP command prompt"""
 
@@ -368,7 +384,7 @@ class Shell(cmd.Cmd):
             completions = [p
                            for p in self.PRI_CMDS
                            if p.startswith(text)
-                          ]
+                           ]
         return completions
 
     def complete_sec(self, text, line, begidx, endidx):
@@ -382,18 +398,18 @@ class Shell(cmd.Cmd):
                 if not (";" in cleaned_line):
                     tmplist = [str(i) for i in SECONDARY_LIST]
                     completions = [p+";"
-                            for p in tmplist
-                            if p.startswith(text)
-                            ]
+                                   for p in tmplist
+                                   if p.startswith(text)
+                                   ]
                 elif cleaned_line[-1] == ";":
                     completions = self.SEC_CMDS[:]
                 else:
                     seccmd = cleaned_line.split(";")[1]
                     if cleaned_line[-1] != " ":
                         completions = [p
-                                for p in self.SEC_CMDS
-                                if p.startswith(seccmd)
-                                ]
+                                       for p in self.SEC_CMDS
+                                       if p.startswith(seccmd)
+                                       ]
                     elif ("add" in seccmd) or ("del" in seccmd):
                         completions = self.SEC_SUBCMDS[:]
                     else:
@@ -407,13 +423,13 @@ class Shell(cmd.Cmd):
                         completions = []
                     else:
                         completions = [p
-                                for p in self.SEC_SUBCMDS
-                                if p.startswith(subcmd)
-                                ]
+                                       for p in self.SEC_SUBCMDS
+                                       if p.startswith(subcmd)
+                                       ]
             else:
                 completions = []
             return completions
-        except Exception, e:
+        except Exception as e:
             print(len(cleaned_line.split()))
             print(e)
 
@@ -426,7 +442,7 @@ class Shell(cmd.Cmd):
             completions = [p
                            for p in self.BYE_CMDS
                            if p.startswith(text)
-                          ]
+                           ]
         return completions
 
     def response(self, result, message):
@@ -440,7 +456,7 @@ class Shell(cmd.Cmd):
             param = result + '\n' + message
             RCMD_RESULT_QUEUE.put(param)
         else:
-            if logger != None:
+            if logger is not None:
                 logger.debug("unknown remote command = %s" % rcmd)
 
     def do_status(self, _):
@@ -496,7 +512,7 @@ class Shell(cmd.Cmd):
 
     def do_playback(self, fname):
         """Playback commands from a file:  PLAYBACK filename.cmd"""
-        
+
         if fname == '':
             print("Record file is required!")
         else:
@@ -549,23 +565,23 @@ def main(argv):
     """main"""
 
     parser = argparse.ArgumentParser(description="SPP Controller")
-    
+
     parser.add_argument(
-            "-p", "--pri-port",
-            type=int, default=5555,
-            help="primary port number")
+        "-p", "--pri-port",
+        type=int, default=5555,
+        help="primary port number")
     parser.add_argument(
-            "-s", "--sec-port",
-            type=int, default=6666,
-            help="secondary port number")
+        "-s", "--sec-port",
+        type=int, default=6666,
+        help="secondary port number")
     parser.add_argument(
-            "-m", "--mng-port",
-            type=int, default=7777,
-            help="management port number")
+        "-m", "--mng-port",
+        type=int, default=7777,
+        help="management port number")
     parser.add_argument(
-            "-ip", "--ipaddr",
-            type=str, default='', #'localhost' or '127.0.0.1' or '' are all same
-            help="IP address")
+        "-ip", "--ipaddr",
+        type=str, default='',  # 'localhost' or '127.0.0.1' or '' are all same
+        help="IP address")
     args = parser.parse_args()
 
     host = args.ipaddr
@@ -577,41 +593,46 @@ def main(argv):
     print('secondary port : %d' % secondary_port)
     print('management port : %d' % management_port)
 
-    #Creating primary socket object
+    # Creating primary socket object
     primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-    #Binding primary socket to a address. bind() takes tuple of host and port.
+    # Binding primary socket to a address. bind() takes tuple of host and port.
     primary_sock.bind((host, primary_port))
 
-    #Listening primary at the address
-    primary_sock.listen(1) #5 denotes the number of clients can queue
+    # Listening primary at the address
+    primary_sock.listen(1)  # 5 denotes the number of clients can queue
 
     primary_thread = Thread(target=primarythread,
-                            args=(primary_sock, MAIN2PRIMARY, PRIMARY2MAIN,))
+                            args=(primary_sock, MAIN2PRIMARY, PRIMARY2MAIN))
     primary_thread.daemon = True
     primary_thread.start()
 
-    #Creating secondary socket object
+    # Creating secondary socket object
     secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-    #Binding secondary socket to a address. bind() takes tuple of host and port.
+    # Binding secondary socket to a address. bind() takes tuple of host
+    # and port.
     secondary_sock.bind((host, secondary_port))
 
-    #Listening secondary at the address
+    # Listening secondary at the address
     secondary_sock.listen(MAX_SECONDARY)
 
     # secondary process handling thread
-    start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN))
+    accept_thread = Thread(target=acceptthread,
+                           args=(secondary_sock, MAIN2SEC, SEC2MAIN))
+    accept_thread.daemon = True
+    accept_thread.start()
 
     shell = Shell()
 
     # Run request handler as a TCP server thread
     SocketServer.ThreadingTCPServer.allow_reuse_address = True
     CmdRequestHandler.CMD = shell
-    command_server = SocketServer.ThreadingTCPServer((host, management_port),CmdRequestHandler)
-            
+    command_server = SocketServer.ThreadingTCPServer(
+        (host, management_port), CmdRequestHandler)
+
     t = threading.Thread(target=command_server.serve_forever)
     t.setDaemon(True)
     t.start()
@@ -622,13 +643,13 @@ def main(argv):
     try:
         primary_sock.shutdown(socket.SHUT_RDWR)
         primary_sock.close()
-    except socket.error, excep:
+    except socket.error as excep:
         print(excep, ", Error while closing primary_sock in main()!")
 
     try:
         secondary_sock.shutdown(socket.SHUT_RDWR)
         secondary_sock.close()
-    except socket.error, excep:
+    except socket.error as excep:
         print(excep, ", Error while closing secondary_sock in main()!")
 
 
-- 
2.13.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [spp] [PATCH 2/4] spp: add classes for thread management
  2017-12-08  8:31 [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 ogawa.yasufumi
@ 2017-12-08  8:31 ` ogawa.yasufumi
  2017-12-08 18:23 ` [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 Ferruh Yigit
  1 sibling, 0 replies; 3+ messages in thread
From: ogawa.yasufumi @ 2017-12-08  8:31 UTC (permalink / raw)
  To: spp; +Cc: ferruh.yigit, ogawa.yasufumi

From: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>

In spp.py, threads are created with global methods, primarythread,
connectionthread and acceptthread. It makes the code hard to understand.

This update defines classes for these threads as sub-classes of
'threading.Thread' and global methods are moved into run() of each of
the classes. This update also adds stop() in the classes to terminate
them clearly and fix bug for 'Socket is not connected' error occured
when called 'sock.shutdown(socket.SHUT_RDWR)'.

Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
---
 src/spp.py | 325 ++++++++++++++++++++++++++++++++++++-------------------------
 1 file changed, 190 insertions(+), 135 deletions(-)

diff --git a/src/spp.py b/src/spp.py
index 1ec2983..212d7ea 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -14,12 +14,12 @@ import socket
 import SocketServer
 import sys
 import threading
-from threading import Thread
+import traceback
 
 # Turn true if activate logger to debug remote command.
-logger = None
+logger = True
 
-if logger is not None:
+if logger is True:
     from logging import DEBUG
     from logging import Formatter
     from logging import getLogger
@@ -28,7 +28,7 @@ if logger is not None:
     handler = StreamHandler()
     handler.setLevel(DEBUG)
     formatter = Formatter(
-        '%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
+        '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s')
     handler.setFormatter(formatter)
     logger.setLevel(DEBUG)
     logger.addHandler(handler)
@@ -98,43 +98,61 @@ MAIN2SEC = GrowingList()
 SEC2MAIN = GrowingList()
 
 
-def connectionthread(name, client_id, conn, m2s, s2m):
-    """Manage secondary process connections"""
+class ConnectionThread(threading.Thread):
 
-    cmd_str = 'hello'
+    def __init__(self, client_id, conn, m2s, s2m):
+        super(ConnectionThread, self).__init__()
+        self.daemon = True
 
-    # infinite loop so that function do not terminate and thread do not end.
-    while True:
-        try:
-            _, _, _ = select.select([conn, ], [conn, ], [], 5)
-        except select.error:
-            break
+        self.client_id = client_id
+        self.conn = conn
+        self.m2s = m2s
+        self.s2m = s2m
+        self.stop_event = threading.Event()
+        self.conn_opened = False
 
-        # Sending message to connected secondary
-        try:
-            cmd_str = m2s.get(True)
-            conn.send(cmd_str)  # send only takes string
-        except KeyError:
-            break
-        except Exception as excep:
-            print(excep, ",Error while sending msg in connectionthread()!")
-            break
+    def stop(self):
+        self.stop_event.set()
 
-        # Receiving from secondary
-        try:
-            # 1024 stands for bytes of data to be received
-            data = conn.recv(1024)
-            if data:
-                s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
-            else:
-                s2m.put("closing:" + str(conn))
+    def run(self):
+        cmd_str = 'hello'
+
+        # infinite loop so that function do not terminate and thread do not
+        # end.
+        while True:
+            try:
+                _, _, _ = select.select(
+                    [self.conn, ], [self.conn, ], [], 5)
+            except select.error:
                 break
-        except Exception as excep:
-            print(excep, ",Error while receiving msg in connectionthread()!")
-            break
 
-    SECONDARY_LIST.remove(client_id)
-    conn.close()
+            # Sending message to connected secondary
+            try:
+                cmd_str = self.m2s.get(True)
+                self.conn.send(cmd_str)  # send only takes string
+            except KeyError:
+                break
+            except Exception as excep:
+                print(excep, ",Error while sending msg in connectionthread()!")
+                break
+
+            # Receiving from secondary
+            try:
+                # 1024 stands for bytes of data to be received
+                data = self.conn.recv(1024)
+                if data:
+                    self.s2m.put(
+                        "recv:%s:{%s}" % (str(self.conn.fileno()), data))
+                else:
+                    self.s2m.put("closing:" + str(self.conn))
+                    break
+            except Exception as excep:
+                print(
+                    excep, ",Error while receiving msg in connectionthread()!")
+                break
+
+        SECONDARY_LIST.remove(self.client_id)
+        self.conn.close()
 
 
 def getclientid(conn):
@@ -149,9 +167,12 @@ def getclientid(conn):
     if data is None:
         return -1
 
+    if logger is not None:
+        logger.debug("data: %s" % data)
     client_id = int(data.strip('\0'))
 
     if client_id < 0 or client_id > MAX_SECONDARY:
+        logger.debug("Failed to get client_id: %d" % client_id)
         return -1
 
     found = 0
@@ -175,6 +196,9 @@ def getclientid(conn):
             free_client_id = i
             break
 
+    if logger is not None:
+        logger.debug("Found free_client_id: %d" % free_client_id)
+
     if free_client_id < 0:
         return -1
 
@@ -184,37 +208,69 @@ def getclientid(conn):
     return free_client_id
 
 
-def acceptthread(sock, main2sec, sec2main):
-    """Listen for secondary processes"""
+class AcceptThread(threading.Thread):
 
-    global SECONDARY_COUNT
+    def __init__(self, host, port, main2sec, sec2main):
+        super(AcceptThread, self).__init__()
+        self.daemon = True
 
-    try:
-        while True:
-            # Accepting incoming connections
-            conn, _ = sock.accept()
+        # Creating secondary socket object
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-            client_id = getclientid(conn)
-            if client_id < 0:
-                break
+        # Binding secondary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
+
+        # Listening secondary at the address
+        self.sock.listen(MAX_SECONDARY)
+
+        self.main2sec = main2sec
+        self.sec2main = sec2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
+
+    def stop(self):
+        if self.sock_opened is True:
+            try:
+                self.sock.shutdown(socket.SHUT_RDWR)
+            except socket.error as excep:
+                print(excep, ", Error while closing sock in AcceptThread!")
+                traceback.print_exc()
+        self.sock.close()
+        self.stop_event.set()
 
-            # Creating new thread.
-            # Calling secondarythread function for this function and passing
-            # conn as argument.
-            SECONDARY_LIST.append(client_id)
-            main2sec[client_id] = Queue()
-            sec2main[client_id] = Queue()
-            connection_thread = Thread(target=connectionthread,
-                                       args=('secondary', client_id, conn,
-                                             main2sec[client_id],
-                                             sec2main[client_id]))
-            connection_thread.daemon = True
-            connection_thread.start()
-
-            SECONDARY_COUNT += 1
-    except Exception as excep:
-        print(excep, ", Error in acceptthread()!")
-        sock.close()
+    def run(self):
+        global SECONDARY_COUNT
+
+        try:
+            while True:
+                # Accepting incoming connections
+                conn, _ = self.sock.accept()
+
+                client_id = getclientid(conn)
+                if client_id < 0:
+                    break
+
+                # Creating new thread.
+                # Calling secondarythread function for this function and
+                # passing conn as argument.
+                SECONDARY_LIST.append(client_id)
+                self.main2sec[client_id] = Queue()
+                self.sec2main[client_id] = Queue()
+                connection_thread = ConnectionThread(
+                    client_id, conn,
+                    self.main2sec[client_id],
+                    self.sec2main[client_id])
+                connection_thread.daemon = True
+                connection_thread.start()
+
+                SECONDARY_COUNT += 1
+        except Exception as excep:
+            print(excep, ", Error in AcceptThread!")
+            traceback.print_exc()
+            self.sock_opened = False
+            self.sock.close()
 
 
 def command_primary(command):
@@ -266,49 +322,78 @@ def print_status():
         print ("Connected secondary id: %d" % i)
 
 
-def primarythread(sock, main2primary, primary2main):
-    """Manage primary process connection"""
+class PrimaryThread(threading.Thread):
 
-    global PRIMARY
-    cmd_str = ''
+    def __init__(self, host, port, main2primary, primary2main):
+        super(PrimaryThread, self).__init__()
+        self.daemon = True
 
-    while True:
-        # waiting for connection
-        PRIMARY = False
-        conn, addr = sock.accept()
-        PRIMARY = True
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        # Binding primary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
 
-        while conn:
-            try:
-                _, _, _ = select.select([conn, ], [conn, ], [], 5)
-            except select.error:
-                break
+        # Listening primary at the address
+        self.sock.listen(1)  # 5 denotes the number of clients can queue
 
-            # Sending message to connected primary
-            try:
-                cmd_str = main2primary.get(True)
-                conn.send(cmd_str)  # send only takes string
-            except KeyError:
-                break
-            except Exception as excep:
-                print(excep, ", Error while sending msg in primarythread()!")
-                break
+        self.main2primary = main2primary
+        self.primary2main = primary2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
 
-            # Receiving from primary
-            try:
-                # 1024 stands for bytes of data to be received
-                data = conn.recv(1024)
-                if data:
-                    primary2main.put("recv:%s:{%s}" % (str(addr), data))
-                else:
-                    primary2main.put("closing:" + str(addr))
-                    conn.close()
+    def stop(self):
+        if self.sock_opened is True:
+            self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+        self.stop_event.set()
+
+    def run(self):
+        global PRIMARY
+        cmd_str = ''
+
+        while True:
+            # waiting for connection
+            PRIMARY = False
+            conn, addr = self.sock.accept()
+            PRIMARY = True
+
+            while conn:
+                try:
+                    _, _, _ = select.select([conn, ], [conn, ], [], 5)
+                except select.error:
                     break
-            except Exception as excep:
-                print(excep, ", Error while receiving msg in primarythread()!")
-                break
 
-    print ("primary communication thread end")
+                self.sock_opened = True
+                # Sending message to connected primary
+                try:
+                    cmd_str = self.main2primary.get(True)
+                    conn.send(cmd_str)  # send only takes string
+                except KeyError:
+                    break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while sending msg in primarythread()!")
+                    break
+
+                # Receiving from primary
+                try:
+                    # 1024 stands for bytes of data to be received
+                    data = conn.recv(1024)
+                    if data:
+                        self.primary2main.put(
+                            "recv:%s:{%s}" % (str(addr), data))
+                    else:
+                        self.primary2main.put("closing:" + str(addr))
+                        conn.close()
+                        self.sock_opened = False
+                        break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while receiving msg in primarythread()!")
+                    break
 
 
 def close_all_secondary():
@@ -593,36 +678,11 @@ def main(argv):
     print('secondary port : %d' % secondary_port)
     print('management port : %d' % management_port)
 
-    # Creating primary socket object
-    primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding primary socket to a address. bind() takes tuple of host and port.
-    primary_sock.bind((host, primary_port))
-
-    # Listening primary at the address
-    primary_sock.listen(1)  # 5 denotes the number of clients can queue
-
-    primary_thread = Thread(target=primarythread,
-                            args=(primary_sock, MAIN2PRIMARY, PRIMARY2MAIN))
-    primary_thread.daemon = True
+    primary_thread = PrimaryThread(
+        host, primary_port, MAIN2PRIMARY, PRIMARY2MAIN)
     primary_thread.start()
 
-    # Creating secondary socket object
-    secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding secondary socket to a address. bind() takes tuple of host
-    # and port.
-    secondary_sock.bind((host, secondary_port))
-
-    # Listening secondary at the address
-    secondary_sock.listen(MAX_SECONDARY)
-
-    # secondary process handling thread
-    accept_thread = Thread(target=acceptthread,
-                           args=(secondary_sock, MAIN2SEC, SEC2MAIN))
-    accept_thread.daemon = True
+    accept_thread = AcceptThread(host, secondary_port, MAIN2SEC, SEC2MAIN)
     accept_thread.start()
 
     shell = Shell()
@@ -641,16 +701,11 @@ def main(argv):
     shell = None
 
     try:
-        primary_sock.shutdown(socket.SHUT_RDWR)
-        primary_sock.close()
-    except socket.error as excep:
-        print(excep, ", Error while closing primary_sock in main()!")
-
-    try:
-        secondary_sock.shutdown(socket.SHUT_RDWR)
-        secondary_sock.close()
+        primary_thread.stop()
+        accept_thread.stop()
     except socket.error as excep:
-        print(excep, ", Error while closing secondary_sock in main()!")
+        print(excep, ", Error while terminating threads in main()!")
+        traceback.print_exc()
 
 
 if __name__ == "__main__":
-- 
2.13.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8
  2017-12-08  8:31 [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 ogawa.yasufumi
  2017-12-08  8:31 ` [spp] [PATCH 2/4] spp: add classes for thread management ogawa.yasufumi
@ 2017-12-08 18:23 ` Ferruh Yigit
  1 sibling, 0 replies; 3+ messages in thread
From: Ferruh Yigit @ 2017-12-08 18:23 UTC (permalink / raw)
  To: ogawa.yasufumi, spp

On 12/8/2017 12:31 AM, ogawa.yasufumi@lab.ntt.co.jp wrote:
> From: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>
> 
> This update is for revising coding style to pep8. Syntax is checked
> with flake8[1].
> 
> [1] http://flake8.pycqa.org/en/latest/
> 
> Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi@lab.ntt.co.jp>

Series applied, thanks.

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2017-12-08 18:23 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-12-08  8:31 [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 ogawa.yasufumi
2017-12-08  8:31 ` [spp] [PATCH 2/4] spp: add classes for thread management ogawa.yasufumi
2017-12-08 18:23 ` [spp] [PATCH 1/4] spp: refactor spp.py to comply with pep8 Ferruh Yigit

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).