From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mogw0732.ocn.ad.jp (mogw0732.ocn.ad.jp [153.149.232.33]) by dpdk.org (Postfix) with ESMTP id 2A3F8377E for ; Tue, 18 Jul 2017 20:58:07 +0200 (CEST) Received: from mf-smf-ucb011.ocn.ad.jp (mf-smf-ucb011.ocn.ad.jp [153.149.228.228]) by mogw0732.ocn.ad.jp (Postfix) with ESMTP id AA45F10801DA; Wed, 19 Jul 2017 03:58:05 +0900 (JST) Received: from mf-smf-ucb011.ocn.ad.jp (mf-smf-ucb011 [153.149.228.228]) by mf-smf-ucb011.ocn.ad.jp (Postfix) with ESMTP id 8EDC890022E; Wed, 19 Jul 2017 03:58:05 +0900 (JST) Received: from ntt.pod01.mv-mta-ucb022 (mv-mta-ucb022.ocn.ad.jp [153.149.142.85]) by mf-smf-ucb011.ocn.ad.jp (Switch-3.3.4/Switch-3.3.4) with ESMTP id v6IIw5IQ006457; Wed, 19 Jul 2017 03:58:05 +0900 Received: from smtp.ocn.ne.jp ([153.149.227.165]) by ntt.pod01.mv-mta-ucb022 with id mJy41v0033akymp01Jy4nY; Tue, 18 Jul 2017 18:58:05 +0000 Received: from localhost.localdomain (p3469148-ipngn19901marunouchi.tokyo.ocn.ne.jp [153.229.6.148]) by smtp.ocn.ne.jp (Postfix) with ESMTPA; Wed, 19 Jul 2017 03:58:04 +0900 (JST) From: ogawa.yasufumi@lab.ntt.co.jp To: spp@dpdk.org Cc: ferruh.yigit@intel.com, sy.jong.choi@intel.com, Yasufumi Ogawa Date: Wed, 19 Jul 2017 03:57:27 +0900 Message-Id: <20170718185729.76668-3-ogawa.yasufumi@lab.ntt.co.jp> X-Mailer: git-send-email 2.13.1 In-Reply-To: <20170718185729.76668-1-ogawa.yasufumi@lab.ntt.co.jp> References: <20170718185729.76668-1-ogawa.yasufumi@lab.ntt.co.jp> Subject: [spp] [PATCH 3/5] Add management port X-BeenThere: spp@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: Soft Patch Panel List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 18 Jul 2017 18:58:08 -0000 From: Yasufumi Ogawa - Management port is a TCP port for send/recv SPP commands from external functions. - Replace args parser for getopts to argparse to be more understandable and easily maintained. - Add descriptions in exception messages to find which the error is occured. Signed-off-by: Yasufumi Ogawa --- src/spp.py | 211 ++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 168 insertions(+), 43 deletions(-) diff --git a/src/spp.py b/src/spp.py index c383820..99165ed 100755 --- a/src/spp.py +++ b/src/spp.py @@ -2,17 +2,67 @@ """Soft Patch Panel""" from __future__ import print_function -from Queue import Queue +from Queue import Queue, Empty from thread import start_new_thread from threading import Thread import cmd -import getopt +import argparse import select import socket import sys import re #import pdb; pdb.set_trace() +import SocketServer +import readline +import threading +import json + +from logging import getLogger,StreamHandler,Formatter,DEBUG +logger = getLogger(__name__) +handler = StreamHandler() +handler.setLevel(DEBUG) +formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s') +handler.setFormatter(formatter) +logger.setLevel(DEBUG) +logger.addHandler(handler) + + +CMD_OK = "OK" +CMD_NG = "NG" +CMD_NOTREADY = "NOTREADY" +CMD_ERROR = "ERROR" + +RCMD_EXECUTE_QUEUE = Queue() +RCMD_RESULT_QUEUE = Queue() +REMOTE_COMMAND = "RCMD" + +class CmdRequestHandler(SocketServer.BaseRequestHandler): + """Request handler for getting message from remote entities""" + + CMD = None # contains a instance of Shell class + + def handle(self): + self.data = self.request.recv(1024).strip() + cur_thread = threading.currentThread() + print(cur_thread.getName()) + print(self.client_address[0]) + print(self.data) + if CmdRequestHandler.CMD is not None: + RCMD_EXECUTE_QUEUE.put(REMOTE_COMMAND) + CmdRequestHandler.CMD.onecmd(self.data) + ret = RCMD_RESULT_QUEUE.get() + if (ret is not None): + logger.debug("ret:%s" % ret) + self.request.send(ret) + else: + logger.debug("ret is none") + self.request.send("") + else: + logger.debug("CMD is None") + self.request.send("") + + class GrowingList(list): """GrowingList""" @@ -21,6 +71,7 @@ class GrowingList(list): self.extend([None]*(index + 1 - len(self))) list.__setitem__(self, index, value) +# Maximum num of sock queues for secondaries MAX_SECONDARY = 16 # init @@ -55,19 +106,20 @@ def connectionthread(name, client_id, conn, m2s, s2m): except KeyError: break except Exception, excep: - print (str(excep)) + print(excep, ",Error while sending msg in connectionthread()!") break #Receiving from secondary try: data = conn.recv(1024) # 1024 stands for bytes of data to be received if data: - s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}") + #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}") + s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data)) else: s2m.put("closing:" + str(conn)) break except Exception, excep: - print (str(excep)) + print(excep, ",Error while receiving msg in connectionthread()!") break SECONDARY_LIST.remove(client_id) @@ -146,7 +198,7 @@ def acceptthread(sock, main2sec, sec2main): sec2main[client_id], )) SECONDARY_COUNT += 1 except Exception, excep: - print (str(excep)) + print(excep, ", Error in acceptthread()!") sock.close() def command_primary(command): @@ -154,24 +206,42 @@ def command_primary(command): if PRIMARY: MAIN2PRIMARY.put(command) - print (PRIMARY2MAIN.get(True)) + recv = PRIMARY2MAIN.get(True) + print (recv) + return CMD_OK, recv else: - print ("primary not started") + recv = "primary not started" + print (recv) + return CMD_NOTREADY, recv def command_secondary(sec_id, command): """Send command to secondary process with sec_id""" if sec_id in SECONDARY_LIST: MAIN2SEC[sec_id].put(command) - print (SEC2MAIN[sec_id].get(True)) + recv = SEC2MAIN[sec_id].get(True) + print (recv) + return CMD_OK, recv else: - print ("secondary id %d not exist" % sec_id) + message = "secondary id %d not exist" % sec_id + print(message) + return CMD_NOTREADY, message + +def get_status(): + secondary = [] + for i in SECONDARY_LIST: + secondary.append("%d" % i) + stat = { + "primary": "%d" % PRIMARY, + "secondary": secondary + } + return stat def print_status(): """Display information about connected clients""" print ("Soft Patch Panel Status :") - print ("primary: %d" % PRIMARY) + print ("primary: %d" % PRIMARY) # "primary: 1" if PRIMA == True print ("secondary count: %d" % len(SECONDARY_LIST)) for i in SECONDARY_LIST: print ("Connected secondary id: %d" % i) @@ -201,7 +271,7 @@ def primarythread(sock, main2primary, primary2main): except KeyError: break except Exception, excep: - print (str(excep)) + print(excep, ", Error while sending msg in primarythread()!") break #Receiving from primary @@ -214,7 +284,7 @@ def primarythread(sock, main2primary, primary2main): conn.close() break except Exception, excep: - print (str(excep)) + print(excep, ", Error while receiving msg in primarythread()!") break print ("primary communication thread end") @@ -268,6 +338,7 @@ class Shell(cmd.Cmd): prompt = 'spp > ' recorded_file = None + # TODO define pri_commands and sec_commands if there are difference COMMANDS = ['status', 'add', 'patch', 'ring', 'vhost', 'reset', 'exit', 'forward', 'stop', 'clear'] @@ -295,42 +366,67 @@ class Shell(cmd.Cmd): ] return completions + def response(self, result, message): + """Enqueue message from other than CLI""" + try: + rcmd = RCMD_EXECUTE_QUEUE.get(False) + except Empty: + return + + if (rcmd == REMOTE_COMMAND): + param = result + '\n' + message + RCMD_RESULT_QUEUE.put(param) + else: + logger.debug("unknown remote command = %s" % rcmd) + def do_status(self, _): """Display Soft Patch Panel Status""" print_status() + stat = get_status() + self.response(CMD_OK, json.dumps(stat)) def do_pri(self, command): """Send command to primary process""" if command and command in self.COMMANDS: - command_primary(command) + result, message = command_primary(command) + self.response(result, message) else: - print ("primary invalid command") + message = "primary invalid command" + print(message) + self.response(CMD_ERROR, ret) def do_sec(self, arg): """Send command to secondary process""" - # remove unwanted space to avoid invalid command error + # remove unwanted spaces to avoid invalid command error tmparg = re.sub(r'\s+', " ", arg) tmparg = re.sub(r'\s?;\s?', ";", tmparg) cmds = tmparg.split(';') if len(cmds) < 2: - print ("error") + message = "error" + print(message) + self.response(CMD_ERROR, message) elif str.isdigit(cmds[0]): sec_id = int(cmds[0]) if check_sec_cmds(cmds[1]): - command_secondary(sec_id, cmds[1]) + result, message = command_secondary(sec_id, cmds[1]) + self.response(result, message) else: - print ("invalid cmd") + message = "invalid cmd" + print(message) + self.response(CMD_ERROR, message) else: print (cmds[0]) print ("first %s" % cmds[1]) + self.response(CMD_ERROR, "invalid format") def do_record(self, arg): """Save future commands to filename: RECORD filename.cmd""" self.recorded_file = open(arg, 'w') + self.response(CMD_OK, "record") def do_playback(self, arg): """Playback commands from a file: PLAYBACK filename.cmd""" @@ -344,8 +440,11 @@ class Shell(cmd.Cmd): continue lines.append(line) self.cmdqueue.extend(lines) + self.response(CMD_OK, "playback") except IOError: - print ("Error: File does not exist.") + message = "Error: File does not exist." + print(message) + self.response(CMD_NG, message) def precmd(self, line): line = line.lower() @@ -379,25 +478,34 @@ class Shell(cmd.Cmd): def main(argv): """main""" - # Defining server address and port - host = '' #'localhost' or '127.0.0.1' or '' are all same - - try: - opts, _ = getopt.getopt(argv, "p:s:h", ["help", "primary = ", "secondary"]) - except getopt.GetoptError: - print ('spp.py -p -s ') - sys.exit(2) - - for opt, arg in opts: - if opt in ("-h", "--help"): - print ('spp.py -p -s ') - sys.exit() - elif opt in ("-p", "--primary"): - primary_port = int(arg) - print ("primary port : %d" % primary_port) - elif opt in ("-s", "--secondary"): - secondary_port = int(arg) - print ('secondary port : %d' % secondary_port) + parser = argparse.ArgumentParser(description="SPP Controller") + + parser.add_argument( + "-p", "--pri-port", + type=int, default=5555, + help="primary port number") + parser.add_argument( + "-s", "--sec-port", + type=int, default=6666, + help="secondary port number") + parser.add_argument( + "-m", "--mng-port", + type=int, default=7777, + help="management port number") + parser.add_argument( + "-ip", "--ipaddr", + type=str, default='', #'localhost' or '127.0.0.1' or '' are all same + help="IP address") + args = parser.parse_args() + + host = args.ipaddr + primary_port = args.pri_port + secondary_port = args.sec_port + management_port = args.mng_port + + print("primary port : %d" % primary_port) + print('secondary port : %d' % secondary_port) + print('management port : %d' % management_port) #Creating primary socket object primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -428,13 +536,30 @@ def main(argv): start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN)) shell = Shell() + + # Run request handler as a TCP server thread + SocketServer.ThreadingTCPServer.allow_reuse_address = True + CmdRequestHandler.CMD = shell + command_server = SocketServer.ThreadingTCPServer((host, management_port),CmdRequestHandler) + + t = threading.Thread(target=command_server.serve_forever) + t.setDaemon(True) + t.start() + shell.cmdloop() shell = None - primary_sock.shutdown(socket.SHUT_RDWR) - primary_sock.close() - secondary_sock.shutdown(socket.SHUT_RDWR) - secondary_sock.close() + try: + primary_sock.shutdown(socket.SHUT_RDWR) + primary_sock.close() + except socket.error, excep: + print(excep, ", Error while closing primary_sock in main()!") + + try: + secondary_sock.shutdown(socket.SHUT_RDWR) + secondary_sock.close() + except socket.error, excep: + print(excep, ", Error while closing primary_sock in main()!") if __name__ == "__main__": -- 2.13.1