From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from tama500.ecl.ntt.co.jp (tama500.ecl.ntt.co.jp [129.60.39.148]) by dpdk.org (Postfix) with ESMTP id 46205377E for ; Thu, 29 Jun 2017 11:19:52 +0200 (CEST) Received: from vc2.ecl.ntt.co.jp (vc2.ecl.ntt.co.jp [129.60.86.154]) by tama500.ecl.ntt.co.jp (8.13.8/8.13.8) with ESMTP id v5T9JpW4027278; Thu, 29 Jun 2017 18:19:51 +0900 Received: from vc2.ecl.ntt.co.jp (localhost [127.0.0.1]) by vc2.ecl.ntt.co.jp (Postfix) with ESMTP id 245425F631; Thu, 29 Jun 2017 18:19:51 +0900 (JST) Received: from jcms-pop21.ecl.ntt.co.jp (jcms-pop21.ecl.ntt.co.jp [129.60.87.134]) by vc2.ecl.ntt.co.jp (Postfix) with ESMTP id 17E3D5F64D; Thu, 29 Jun 2017 18:19:51 +0900 (JST) Received: from [IPv6:::1] (watercress.nslab.ecl.ntt.co.jp [129.60.13.73]) by jcms-pop21.ecl.ntt.co.jp (Postfix) with ESMTPSA id 124DE400C2B; Thu, 29 Jun 2017 18:19:51 +0900 (JST) From: Yasufumi Ogawa References: <894235f1-d439-8db2-73ab-55b16a474084@lab.ntt.co.jp> Message-ID: <16cdf992-bddd-8d79-b503-00f28f6b202c@lab.ntt.co.jp> Date: Thu, 29 Jun 2017 18:19:37 +0900 User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Thunderbird/52.2.1 MIME-Version: 1.0 In-Reply-To: <894235f1-d439-8db2-73ab-55b16a474084@lab.ntt.co.jp> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit To: sy.jong.choi@intel.com, spp@dpdk.org X-TM-AS-MML: disable X-Mailman-Approved-At: Thu, 29 Jun 2017 14:25:23 +0200 Subject: Re: [spp] Proposal for adding port for external management tools X-BeenThere: spp@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: Soft Patch Panel List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 29 Jun 2017 09:19:53 -0000 Signed-off-by: Yasufumi Ogawa diff --git a/src/spp.py b/src/spp.py index b937b5a..54d7c67 100755 --- a/src/spp.py +++ b/src/spp.py @@ -2,14 +2,72 @@ """Soft Patch Panel""" from __future__ import print_function -from Queue import Queue +from Queue import Queue, Empty from thread import start_new_thread from threading import Thread import cmd -import getopt +import argparse import select import socket import sys +import re +#import pdb; pdb.set_trace() + +import SocketServer +import readline +import threading +import json + +logger = None + +# Uncomment if you use logger +#from logging import getLogger,StreamHandler,Formatter,DEBUG +#logger = getLogger(__name__) +#handler = StreamHandler() +#handler.setLevel(DEBUG) +#formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s') +#handler.setFormatter(formatter) +#logger.setLevel(DEBUG) +#logger.addHandler(handler) + + +CMD_OK = "OK" +CMD_NG = "NG" +CMD_NOTREADY = "NOTREADY" +CMD_ERROR = "ERROR" + +RCMD_EXECUTE_QUEUE = Queue() +RCMD_RESULT_QUEUE = Queue() +REMOTE_COMMAND = "RCMD" + +class CmdRequestHandler(SocketServer.BaseRequestHandler): + """Request handler for getting message from remote entities""" + + CMD = None # contains a instance of Shell class + + def handle(self): + self.data = self.request.recv(1024).strip() + cur_thread = threading.currentThread() + print(cur_thread.getName()) + print(self.client_address[0]) + print(self.data) + if CmdRequestHandler.CMD is not None: + RCMD_EXECUTE_QUEUE.put(REMOTE_COMMAND) + CmdRequestHandler.CMD.onecmd(self.data) + ret = RCMD_RESULT_QUEUE.get() + if (ret is not None): + if logger != None: + logger.debug("ret:%s" % ret) + self.request.send(ret) + else: + if logger != None: + logger.debug("ret is none") + self.request.send("") + else: + if logger != None: + logger.debug("CMD is None") + self.request.send("") + class GrowingList(list): """GrowingList""" @@ -19,6 +77,7 @@ class GrowingList(list): self.extend([None]*(index + 1 - len(self))) list.__setitem__(self, index, value) +# Maximum num of sock queues for secondaries MAX_SECONDARY = 16 # init @@ -53,19 +112,20 @@ def connectionthread(name, client_id, conn, m2s, s2m): except KeyError: break except Exception, excep: - print (str(excep)) + print(excep, ",Error while sending msg in connectionthread()!") break #Receiving from secondary try: data = conn.recv(1024) # 1024 stands for bytes of data to be received if data: - s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}") + #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}") + s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data)) else: s2m.put("closing:" + str(conn)) break except Exception, excep: - print (str(excep)) + print(excep, ",Error while receiving msg in connectionthread()!") break SECONDARY_LIST.remove(client_id) @@ -144,7 +204,7 @@ def acceptthread(sock, main2sec, sec2main): sec2main[client_id], )) SECONDARY_COUNT += 1 except Exception, excep: - print (str(excep)) + print(excep, ", Error in acceptthread()!") sock.close() def command_primary(command): @@ -152,24 +212,42 @@ def command_primary(command): if PRIMARY: MAIN2PRIMARY.put(command) - print (PRIMARY2MAIN.get(True)) + recv = PRIMARY2MAIN.get(True) + print (recv) + return CMD_OK, recv else: - print ("primary not started") + recv = "primary not started" + print (recv) + return CMD_NOTREADY, recv def command_secondary(sec_id, command): """Send command to secondary process with sec_id""" if sec_id in SECONDARY_LIST: MAIN2SEC[sec_id].put(command) - print (SEC2MAIN[sec_id].get(True)) + recv = SEC2MAIN[sec_id].get(True) + print (recv) + return CMD_OK, recv else: - print ("secondary id %d not exist" % sec_id) + message = "secondary id %d not exist" % sec_id + print(message) + return CMD_NOTREADY, message + +def get_status(): + secondary = [] + for i in SECONDARY_LIST: + secondary.append("%d" % i) + stat = { + "primary": "%d" % PRIMARY, + "secondary": secondary + } + return stat def print_status(): """Display information about connected clients""" print ("Soft Patch Panel Status :") - print ("primary: %d" % PRIMARY) + print ("primary: %d" % PRIMARY) # "primary: 1" if PRIMA == True print ("secondary count: %d" % len(SECONDARY_LIST)) for i in SECONDARY_LIST: print ("Connected secondary id: %d" % i) @@ -199,20 +277,21 @@ def primarythread(sock, main2primary, primary2main): except KeyError: break except Exception, excep: - print (str(excep)) + print(excep, ", Error while sending msg in primarythread()!") break #Receiving from primary try: data = conn.recv(1024) # 1024 stands for bytes of data to be received if data: - primary2main.put("recv:" + str(addr) + ":" + "{" + data + "}") + #primary2main.put("recv:" + str(addr) + ":" + "{" + data + "}") + primary2main.put("recv:%s:{%s}" % (str(addr), data)) else: primary2main.put("closing:" + str(addr)) conn.close() break except Exception, excep: - print (str(excep)) + print(excep, ", Error while receiving msg in primarythread()!") break print ("primary communication thread end") @@ -259,6 +338,14 @@ def check_sec_cmds(cmds): return valid +def clean_sec_cmd(cmdstr): + """remove unwanted spaces to avoid invalid command error""" + + tmparg = re.sub(r'\s+', " ", cmdstr) + res = re.sub(r'\s?;\s?', ";", tmparg) + return res + + class Shell(cmd.Cmd): """SPP command prompt""" @@ -266,17 +353,19 @@ class Shell(cmd.Cmd): prompt = 'spp > ' recorded_file = None - COMMANDS = ['status', 'add', 'patch', 'ring', 'vhost', - 'reset', 'exit', 'forward', 'stop', 'clear'] + PRI_CMDS = ['status', 'exit', 'clear'] + SEC_CMDS = ['status', 'exit', 'forward', 'stop', 'add', 'patch', 'del'] + SEC_SUBCMDS = ['vhost', 'ring'] + BYE_CMDS = ['sec', 'all'] def complete_pri(self, text, line, begidx, endidx): """Completion for primary process commands""" if not text: - completions = self.COMMANDS[:] + completions = self.PRI_CMDS[:] else: completions = [p - for p in self.COMMANDS + for p in self.PRI_CMDS if p.startswith(text) ] return completions @@ -284,68 +373,152 @@ class Shell(cmd.Cmd): def complete_sec(self, text, line, begidx, endidx): """Completion for secondary process commands""" + try: + cleaned_line = clean_sec_cmd(line) + if len(cleaned_line.split()) == 1: + completions = [str(i)+";" for i in SECONDARY_LIST] + elif len(cleaned_line.split()) == 2: + if not (";" in cleaned_line): + tmplist = [str(i) for i in SECONDARY_LIST] + completions = [p+";" + for p in tmplist + if p.startswith(text) + ] + elif cleaned_line[-1] == ";": + completions = self.SEC_CMDS[:] + else: + seccmd = cleaned_line.split(";")[1] + if cleaned_line[-1] != " ": + completions = [p + for p in self.SEC_CMDS + if p.startswith(seccmd) + ] + elif ("add" in seccmd) or ("del" in seccmd): + completions = self.SEC_SUBCMDS[:] + else: + completions = [] + elif len(cleaned_line.split()) == 3: + subcmd = cleaned_line.split()[-1] + if ("add" == subcmd) or ("del" == subcmd): + completions = self.SEC_SUBCMDS[:] + else: + if cleaned_line[-1] == " ": + completions = [] + else: + completions = [p + for p in self.SEC_SUBCMDS + if p.startswith(subcmd) + ] + else: + completions = [] + return completions + except Exception, e: + print(len(cleaned_line.split())) + print(e) + + def complete_bye(self, text, line, begidx, endidx): + """Completion for bye commands""" + if not text: - completions = self.COMMANDS[:] + completions = self.BYE_CMDS[:] else: completions = [p - for p in self.COMMANDS + for p in self.BYE_CMDS if p.startswith(text) ] return completions + def response(self, result, message): + """Enqueue message from other than CLI""" + try: + rcmd = RCMD_EXECUTE_QUEUE.get(False) + except Empty: + return + + if (rcmd == REMOTE_COMMAND): + param = result + '\n' + message + RCMD_RESULT_QUEUE.put(param) + else: + if logger != None: + logger.debug("unknown remote command = %s" % rcmd) + def do_status(self, _): """Display Soft Patch Panel Status""" print_status() + stat = get_status() + self.response(CMD_OK, json.dumps(stat)) def do_pri(self, command): """Send command to primary process""" - if command and command in self.COMMANDS: - command_primary(command) + if command and command in self.PRI_CMDS: + result, message = command_primary(command) + self.response(result, message) else: - print ("primary invalid command") + message = "primary invalid command" + print(message) + self.response(CMD_ERROR, message) def do_sec(self, arg): """Send command to secondary process""" - cmds = arg.split(';') + # remove unwanted spaces to avoid invalid command error + tmparg = clean_sec_cmd(arg) + cmds = tmparg.split(';') if len(cmds) < 2: - print ("error") + message = "error" + print(message) + self.response(CMD_ERROR, message) elif str.isdigit(cmds[0]): sec_id = int(cmds[0]) if check_sec_cmds(cmds[1]): - command_secondary(sec_id, cmds[1]) + result, message = command_secondary(sec_id, cmds[1]) + self.response(result, message) else: - print ("invalid cmd") + message = "invalid cmd" + print(message) + self.response(CMD_ERROR, message) else: print (cmds[0]) print ("first %s" % cmds[1]) + self.response(CMD_ERROR, "invalid format") - def do_record(self, arg): + def do_record(self, fname): """Save future commands to filename: RECORD filename.cmd""" - self.recorded_file = open(arg, 'w') + if fname == '': + print("Record file is required!") + else: + self.recorded_file = open(fname, 'w') + self.response(CMD_OK, "record") - def do_playback(self, arg): + def do_playback(self, fname): """Playback commands from a file: PLAYBACK filename.cmd""" - - self.close() - try: - with open(arg) as recorded_file: - lines = [] - for line in recorded_file: - if line.strip().startswith("#"): - continue - lines.append(line) - self.cmdqueue.extend(lines) - except IOError: - print ("Error: File does not exist.") + + if fname == '': + print("Record file is required!") + else: + self.close() + try: + with open(fname) as recorded_file: + lines = [] + for line in recorded_file: + if line.strip().startswith("#"): + continue + lines.append(line) + self.cmdqueue.extend(lines) + self.response(CMD_OK, "playback") + except IOError: + message = "Error: File does not exist." + print(message) + self.response(CMD_NG, message) def precmd(self, line): line = line.lower() - if self.recorded_file and 'playback' not in line: - print(line, file=self.recorded_file) + if self.recorded_file: + if not (('playback' in line) or ('bye' in line)): + print(line, file=self.recorded_file) return line def close(self): @@ -370,30 +543,42 @@ class Shell(cmd.Cmd): self.close() return True + def main(argv): """main""" - # Defining server address and port - host = '' #'localhost' or '127.0.0.1' or '' are all same - - try: - opts, _ = getopt.getopt(argv, "p:s:h", ["help", "primary = ", "secondary"]) - except getopt.GetoptError: - print ('spp.py -p -s ') - sys.exit(2) - for opt, arg in opts: - if opt in ("-h", "--help"): - print ('spp.py -p -s ') - sys.exit() - elif opt in ("-p", "--primary"): - primary_port = int(arg) - print ("primary port : %d" % primary_port) - elif opt in ("-s", "--secondary"): - secondary_port = int(arg) - print ('secondary port : %d' % secondary_port) + parser = argparse.ArgumentParser(description="SPP Controller") + + parser.add_argument( + "-p", "--pri-port", + type=int, default=5555, + help="primary port number") + parser.add_argument( + "-s", "--sec-port", + type=int, default=6666, + help="secondary port number") + parser.add_argument( + "-m", "--mng-port", + type=int, default=7777, + help="management port number") + parser.add_argument( + "-ip", "--ipaddr", + type=str, default='', #'localhost' or '127.0.0.1' or '' are all same + help="IP address") + args = parser.parse_args() + + host = args.ipaddr + primary_port = args.pri_port + secondary_port = args.sec_port + management_port = args.mng_port + + print("primary port : %d" % primary_port) + print('secondary port : %d' % secondary_port) + print('management port : %d' % management_port) #Creating primary socket object primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #Binding primary socket to a address. bind() takes tuple of host and port. primary_sock.bind((host, primary_port)) @@ -408,6 +593,7 @@ def main(argv): #Creating secondary socket object secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #Binding secondary socket to a address. bind() takes tuple of host and port. secondary_sock.bind((host, secondary_port)) @@ -419,13 +605,31 @@ def main(argv): start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN)) shell = Shell() + + # Run request handler as a TCP server thread + SocketServer.ThreadingTCPServer.allow_reuse_address = True + CmdRequestHandler.CMD = shell + command_server = SocketServer.ThreadingTCPServer((host, management_port),CmdRequestHandler) + + t = threading.Thread(target=command_server.serve_forever) + t.setDaemon(True) + t.start() + shell.cmdloop() shell = None - primary_sock.shutdown(socket.SHUT_RDWR) - primary_sock.close() - secondary_sock.shutdown(socket.SHUT_RDWR) - secondary_sock.close() + try: + primary_sock.shutdown(socket.SHUT_RDWR) + primary_sock.close() + except socket.error, excep: + print(excep, ", Error while closing primary_sock in main()!") + + try: + secondary_sock.shutdown(socket.SHUT_RDWR) + secondary_sock.close() + except socket.error, excep: + print(excep, ", Error while closing secondary_sock in main()!") + if __name__ == "__main__": main(sys.argv[1:])