From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mogw0934.ocn.ad.jp (mogw0934.ocn.ad.jp [153.149.227.40]) by dpdk.org (Postfix) with ESMTP id 23AF84F9A for ; Tue, 6 Mar 2018 11:51:09 +0100 (CET) Received: from mf-smf-ucb026c3 (mf-smf-ucb026c3.ocn.ad.jp [153.153.66.168]) by mogw0934.ocn.ad.jp (Postfix) with ESMTP id 925BF68023E; Tue, 6 Mar 2018 19:51:07 +0900 (JST) Received: from ntt.pod01.mv-mta-ucb029 ([153.149.230.163]) by mf-smf-ucb026c3 with ESMTP id tABUe5PWor9Q8tABXef0G2; Tue, 06 Mar 2018 19:51:07 +0900 Received: from smtp.ocn.ne.jp ([153.149.227.165]) by ntt.pod01.mv-mta-ucb029 with id Jar71x0073akymp01ar7L5; Tue, 06 Mar 2018 10:51:07 +0000 Received: from localhost.localdomain (sp1-66-103-93.msc.spmode.ne.jp [1.66.103.93]) by smtp.ocn.ne.jp (Postfix) with ESMTPA; Tue, 6 Mar 2018 19:51:07 +0900 (JST) From: ogawa.yasufumi@lab.ntt.co.jp To: ferruh.yigit@intel.com, spp@dpdk.org Cc: Yasufumi Ogawa Date: Tue, 6 Mar 2018 19:50:44 +0900 Message-Id: <20180306105055.65210-3-ogawa.yasufumi@lab.ntt.co.jp> X-Mailer: git-send-email 2.13.1 In-Reply-To: <20180306105055.65210-1-ogawa.yasufumi@lab.ntt.co.jp> References: <20180306105055.65210-1-ogawa.yasufumi@lab.ntt.co.jp> Subject: [spp] [PATCH 02/13] controller: move connection threads X-BeenThere: spp@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: Soft Patch Panel List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 06 Mar 2018 10:51:10 -0000 From: Yasufumi Ogawa There are three classes in 'spp.py' for managing connection as threads. This update is for separating them from main file to improve maintainability. Signed-off-by: Yasufumi Ogawa --- src/controller/conn_thread.py | 259 ++++++++++++++++++++++++++++++++++++++++++ src/controller/spp.py | 243 +-------------------------------------- 2 files changed, 261 insertions(+), 241 deletions(-) create mode 100644 src/controller/conn_thread.py diff --git a/src/controller/conn_thread.py b/src/controller/conn_thread.py new file mode 100644 index 0000000..7ba3b00 --- /dev/null +++ b/src/controller/conn_thread.py @@ -0,0 +1,259 @@ +from Queue import Queue +import select +import socket +import spp_common +import threading +import traceback + +# Turn true if activate logger to debug remote command. +logger = None + +if logger is True: + import logging + logger = logging.getLogger(__name__) + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s') + handler.setFormatter(formatter) + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + + +class ConnectionThread(threading.Thread): + """Manage connection between controller and secondary""" + + def __init__(self, client_id, conn): + super(ConnectionThread, self).__init__() + self.daemon = True + + self.client_id = client_id + self.conn = conn + self.stop_event = threading.Event() + self.conn_opened = False + + def stop(self): + self.stop_event.set() + + def run(self): + cmd_str = 'hello' + + # infinite loop so that function do not terminate and thread do not + # end. + while True: + try: + _, _, _ = select.select( + [self.conn, ], [self.conn, ], [], 5) + except select.error: + break + + # Sending message to connected secondary + try: + cmd_str = spp_common.MAIN2SEC[self.client_id].get(True) + self.conn.send(cmd_str) # send only takes string + except KeyError: + break + except Exception as excep: + print(excep, ",Error while sending msg in connectionthread()!") + break + + # Receiving from secondary + try: + # 1024 stands for bytes of data to be received + data = self.conn.recv(1024) + if data: + spp_common.SEC2MAIN[self.client_id].put( + "recv:%s:{%s}" % (str(self.conn.fileno()), data)) + else: + spp_common.SEC2MAIN[self.client_id].put( + "closing:" + str(self.conn)) + break + except Exception as excep: + print( + excep, ",Error while receiving msg in connectionthread()!") + break + + spp_common.SECONDARY_LIST.remove(self.client_id) + self.conn.close() + + +class AcceptThread(threading.Thread): + """Manage connection""" + + def __init__(self, host, port): + super(AcceptThread, self).__init__() + self.daemon = True + + # Creating secondary socket object + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Binding secondary socket to a address. bind() takes tuple of host + # and port. + self.sock.bind((host, port)) + + # Listening secondary at the address + self.sock.listen(spp_common.MAX_SECONDARY) + + self.stop_event = threading.Event() + self.sock_opened = False + + def getclientid(self, conn): + """Get client_id from client""" + + try: + conn.send("_get_client_id") + except KeyError: + return -1 + + data = conn.recv(1024) + if data is None: + return -1 + + if logger is not None: + logger.debug("data: %s" % data) + client_id = int(data.strip('\0')) + + if client_id < 0 or client_id > spp_common.MAX_SECONDARY: + logger.debug("Failed to get client_id: %d" % client_id) + return -1 + + found = 0 + for i in spp_common.SECONDARY_LIST: + if client_id == i: + found = 1 + break + + if found == 0: + return client_id + + # client_id in use, find a free one + free_client_id = -1 + for i in range(spp_common.MAX_SECONDARY): + found = -1 + for j in spp_common.SECONDARY_LIST: + if i == j: + found = i + break + if found == -1: + free_client_id = i + break + + if logger is not None: + logger.debug("Found free_client_id: %d" % free_client_id) + + if free_client_id < 0: + return -1 + + conn.send("_set_client_id %u" % free_client_id) + data = conn.recv(1024) + + return free_client_id + + def stop(self): + if self.sock_opened is True: + try: + self.sock.shutdown(socket.SHUT_RDWR) + except socket.error as excep: + print(excep, ", Error while closing sock in AcceptThread!") + traceback.print_exc() + self.sock.close() + self.stop_event.set() + + def run(self): + try: + while True: + # Accepting incoming connections + conn, _ = self.sock.accept() + + client_id = self.getclientid(conn) + if client_id < 0: + break + + # Creating new thread. + # Calling secondarythread function for this function and + # passing conn as argument. + spp_common.SECONDARY_LIST.append(client_id) + spp_common.MAIN2SEC[client_id] = Queue() + spp_common.SEC2MAIN[client_id] = Queue() + connection_thread = ConnectionThread(client_id, conn) + connection_thread.daemon = True + connection_thread.start() + + spp_common.SECONDARY_COUNT += 1 + except Exception as excep: + print(excep, ", Error in AcceptThread!") + traceback.print_exc() + self.sock_opened = False + self.sock.close() + + +class PrimaryThread(threading.Thread): + + def __init__(self, host, port): + super(PrimaryThread, self).__init__() + self.daemon = True + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # Binding primary socket to a address. bind() takes tuple of host + # and port. + self.sock.bind((host, port)) + + # Listening primary at the address + self.sock.listen(1) # 5 denotes the number of clients can queue + + self.stop_event = threading.Event() + self.sock_opened = False + + def stop(self): + if self.sock_opened is True: + self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + self.stop_event.set() + + def run(self): + cmd_str = '' + + while True: + # waiting for connection + spp_common.PRIMARY = False + conn, addr = self.sock.accept() + spp_common.PRIMARY = True + + while conn: + try: + _, _, _ = select.select([conn, ], [conn, ], [], 5) + except select.error: + break + + self.sock_opened = True + # Sending message to connected primary + try: + cmd_str = spp_common.MAIN2PRIMARY.get(True) + conn.send(cmd_str) # send only takes string + except KeyError: + break + except Exception as excep: + print( + excep, + ", Error while sending msg in primarythread()!") + break + + # Receiving from primary + try: + # 1024 stands for bytes of data to be received + data = conn.recv(1024) + if data: + spp_common.PRIMARY2MAIN.put( + "recv:%s:{%s}" % (str(addr), data)) + else: + spp_common.PRIMARY2MAIN.put("closing:" + str(addr)) + conn.close() + self.sock_opened = False + break + except Exception as excep: + print( + excep, + ", Error while receiving msg in primarythread()!") + break diff --git a/src/controller/spp.py b/src/controller/spp.py index 0515193..d0d7bc9 100644 --- a/src/controller/spp.py +++ b/src/controller/spp.py @@ -4,8 +4,8 @@ from __future__ import print_function import argparse -from Queue import Queue -import select +from conn_thread import AcceptThread +from conn_thread import PrimaryThread from shell import Shell import socket import SocketServer @@ -58,245 +58,6 @@ class CmdRequestHandler(SocketServer.BaseRequestHandler): self.request.send("") -class ConnectionThread(threading.Thread): - """Manage connection between controller and secondary""" - - def __init__(self, client_id, conn): - super(ConnectionThread, self).__init__() - self.daemon = True - - self.client_id = client_id - self.conn = conn - self.stop_event = threading.Event() - self.conn_opened = False - - def stop(self): - self.stop_event.set() - - def run(self): - cmd_str = 'hello' - - # infinite loop so that function do not terminate and thread do not - # end. - while True: - try: - _, _, _ = select.select( - [self.conn, ], [self.conn, ], [], 5) - except select.error: - break - - # Sending message to connected secondary - try: - cmd_str = spp_common.MAIN2SEC[self.client_id].get(True) - self.conn.send(cmd_str) # send only takes string - except KeyError: - break - except Exception as excep: - print(excep, ",Error while sending msg in connectionthread()!") - break - - # Receiving from secondary - try: - # 1024 stands for bytes of data to be received - data = self.conn.recv(1024) - if data: - spp_common.SEC2MAIN[self.client_id].put( - "recv:%s:{%s}" % (str(self.conn.fileno()), data)) - else: - spp_common.SEC2MAIN[self.client_id].put( - "closing:" + str(self.conn)) - break - except Exception as excep: - print( - excep, ",Error while receiving msg in connectionthread()!") - break - - spp_common.SECONDARY_LIST.remove(self.client_id) - self.conn.close() - - -class AcceptThread(threading.Thread): - """Manage connection""" - - def __init__(self, host, port): - super(AcceptThread, self).__init__() - self.daemon = True - - # Creating secondary socket object - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # Binding secondary socket to a address. bind() takes tuple of host - # and port. - self.sock.bind((host, port)) - - # Listening secondary at the address - self.sock.listen(spp_common.MAX_SECONDARY) - - self.stop_event = threading.Event() - self.sock_opened = False - - def getclientid(self, conn): - """Get client_id from client""" - - try: - conn.send("_get_client_id") - except KeyError: - return -1 - - data = conn.recv(1024) - if data is None: - return -1 - - if logger is not None: - logger.debug("data: %s" % data) - client_id = int(data.strip('\0')) - - if client_id < 0 or client_id > spp_common.MAX_SECONDARY: - logger.debug("Failed to get client_id: %d" % client_id) - return -1 - - found = 0 - for i in spp_common.SECONDARY_LIST: - if client_id == i: - found = 1 - break - - if found == 0: - return client_id - - # client_id in use, find a free one - free_client_id = -1 - for i in range(spp_common.MAX_SECONDARY): - found = -1 - for j in spp_common.SECONDARY_LIST: - if i == j: - found = i - break - if found == -1: - free_client_id = i - break - - if logger is not None: - logger.debug("Found free_client_id: %d" % free_client_id) - - if free_client_id < 0: - return -1 - - conn.send("_set_client_id %u" % free_client_id) - data = conn.recv(1024) - - return free_client_id - - def stop(self): - if self.sock_opened is True: - try: - self.sock.shutdown(socket.SHUT_RDWR) - except socket.error as excep: - print(excep, ", Error while closing sock in AcceptThread!") - traceback.print_exc() - self.sock.close() - self.stop_event.set() - - def run(self): - try: - while True: - # Accepting incoming connections - conn, _ = self.sock.accept() - - client_id = self.getclientid(conn) - if client_id < 0: - break - - # Creating new thread. - # Calling secondarythread function for this function and - # passing conn as argument. - spp_common.SECONDARY_LIST.append(client_id) - spp_common.MAIN2SEC[client_id] = Queue() - spp_common.SEC2MAIN[client_id] = Queue() - connection_thread = ConnectionThread(client_id, conn) - connection_thread.daemon = True - connection_thread.start() - - spp_common.SECONDARY_COUNT += 1 - except Exception as excep: - print(excep, ", Error in AcceptThread!") - traceback.print_exc() - self.sock_opened = False - self.sock.close() - - -class PrimaryThread(threading.Thread): - - def __init__(self, host, port): - super(PrimaryThread, self).__init__() - self.daemon = True - - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # Binding primary socket to a address. bind() takes tuple of host - # and port. - self.sock.bind((host, port)) - - # Listening primary at the address - self.sock.listen(1) # 5 denotes the number of clients can queue - - self.stop_event = threading.Event() - self.sock_opened = False - - def stop(self): - if self.sock_opened is True: - self.sock.shutdown(socket.SHUT_RDWR) - self.sock.close() - self.stop_event.set() - - def run(self): - cmd_str = '' - - while True: - # waiting for connection - spp_common.PRIMARY = False - conn, addr = self.sock.accept() - spp_common.PRIMARY = True - - while conn: - try: - _, _, _ = select.select([conn, ], [conn, ], [], 5) - except select.error: - break - - self.sock_opened = True - # Sending message to connected primary - try: - cmd_str = spp_common.MAIN2PRIMARY.get(True) - conn.send(cmd_str) # send only takes string - except KeyError: - break - except Exception as excep: - print( - excep, - ", Error while sending msg in primarythread()!") - break - - # Receiving from primary - try: - # 1024 stands for bytes of data to be received - data = conn.recv(1024) - if data: - spp_common.PRIMARY2MAIN.put( - "recv:%s:{%s}" % (str(addr), data)) - else: - spp_common.PRIMARY2MAIN.put("closing:" + str(addr)) - conn.close() - self.sock_opened = False - break - except Exception as excep: - print( - excep, - ", Error while receiving msg in primarythread()!") - break - - def main(argv): """main""" -- 2.13.1