From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from tama500.ecl.ntt.co.jp (tama500.ecl.ntt.co.jp [129.60.39.148]) by dpdk.org (Postfix) with ESMTP id AB6615F14 for ; Thu, 18 Oct 2018 13:25:33 +0200 (CEST) Received: from vc2.ecl.ntt.co.jp (vc2.ecl.ntt.co.jp [129.60.86.154]) by tama500.ecl.ntt.co.jp (8.13.8/8.13.8) with ESMTP id w9IBPWZM009273; Thu, 18 Oct 2018 20:25:32 +0900 Received: from vc2.ecl.ntt.co.jp (localhost [127.0.0.1]) by vc2.ecl.ntt.co.jp (Postfix) with ESMTP id 5393D639489; Thu, 18 Oct 2018 20:25:32 +0900 (JST) Received: from localhost.localdomain (unknown [129.60.13.51]) by vc2.ecl.ntt.co.jp (Postfix) with ESMTP id 34EAE63947C; Thu, 18 Oct 2018 20:25:32 +0900 (JST) From: ogawa.yasufumi@lab.ntt.co.jp To: spp@dpdk.org, ferruh.yigit@intel.com, ogawa.yasufumi@lab.ntt.co.jp Date: Thu, 18 Oct 2018 20:25:11 +0900 Message-Id: <20181018112518.77556-3-ogawa.yasufumi@lab.ntt.co.jp> X-Mailer: git-send-email 2.13.1 In-Reply-To: <20181018112518.77556-1-ogawa.yasufumi@lab.ntt.co.jp> References: <20181018112518.77556-1-ogawa.yasufumi@lab.ntt.co.jp> X-TM-AS-MML: disable Subject: [spp] [PATCH 2/9] controller: change controller to use spp-ctl X-BeenThere: spp@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: Soft Patch Panel List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 18 Oct 2018 11:25:35 -0000 From: Yasufumi Ogawa SPP controller manages primary and secondary processes, and create TCP sessions while launching. Other process is not allowed to manage SPP processes directly without using management port. It is difficult to manage from other process via management port for supporting detailed operations. It is better to manage SPP by spp-ctl and to change SPP controller as a client because it is allowed to maange it from multiple controllers via well-defined REST APIs. This update is to change SPP controller to a client of spp-ctl. * Remove connection thread classes and queues because SPP controller is no need to has sessions. * Discard management port support because spp-ctl plays the role. * Use SppCtlClient for requesting to spp-ctl. * Change parsers for responses from primary and secondary processes to spp-ctl. * Remove arguments IP address and ports of primary and secondary processes. * Add SppCtlClient class for requests and responses. Signed-off-by: Yasufumi Ogawa --- src/controller/conn_thread.py | 251 ------------------------------------ src/controller/shell.py | 291 +++++++++++++++++++++++++----------------- src/controller/spp.py | 89 ------------- src/controller/spp_common.py | 40 ------ 4 files changed, 173 insertions(+), 498 deletions(-) delete mode 100644 src/controller/conn_thread.py diff --git a/src/controller/conn_thread.py b/src/controller/conn_thread.py deleted file mode 100644 index ff0697e..0000000 --- a/src/controller/conn_thread.py +++ /dev/null @@ -1,251 +0,0 @@ -#!/usr/bin/env python -# SPDX-License-Identifier: BSD-3-Clause -# Copyright(c) 2015-2016 Intel Corporation - -from __future__ import absolute_import - -from queue import Queue -import select -import socket -from . import spp_common -from .spp_common import logger -import threading -import traceback - - -class ConnectionThread(threading.Thread): - """Manage connection between controller and secondary""" - - def __init__(self, client_id, conn): - super(ConnectionThread, self).__init__() - self.daemon = True - - self.client_id = client_id - self.conn = conn - self.stop_event = threading.Event() - self.conn_opened = False - - def stop(self): - self.stop_event.set() - - def run(self): - cmd_str = '' - - # infinite loop so that function do not terminate and thread do not - # end. - while True: - try: - _, _, _ = select.select( - [self.conn, ], [self.conn, ], [], 5) - except select.error: - break - - # Sending message to connected secondary - try: - cmd_str = spp_common.MAIN2SEC[self.client_id].get(True) - self.conn.send(cmd_str) # send only takes string - except KeyError: - break - except Exception as excep: - print(excep, ",Error while sending msg in connectionthread()!") - break - - # Receiving from secondary - try: - data = self.conn.recv(spp_common.SOCK_BUF_SIZE) - if data: - msg = "%s" % data.decode('utf-8') - spp_common.SEC2MAIN[self.client_id].put(msg) - else: - spp_common.SEC2MAIN[self.client_id].put( - "closing:" + str(self.conn)) - break - except Exception as excep: - print( - excep, ",Error while receiving msg in connectionthread()!") - break - - spp_common.SECONDARY_LIST.remove(self.client_id) - self.conn.close() - - -class AcceptThread(threading.Thread): - """Manage connection""" - - def __init__(self, host, port): - super(AcceptThread, self).__init__() - self.daemon = True - - # Creating secondary socket object - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # Binding secondary socket to a address. bind() takes tuple of host - # and port. - self.sock.bind((host, port)) - - # Listening secondary at the address - self.sock.listen(spp_common.MAX_SECONDARY) - - self.stop_event = threading.Event() - self.sock_opened = False - - def getclientid(self, conn): - """Get client_id from client""" - - try: - conn.send(b'_get_client_id') - except KeyError: - return -1 - - data = conn.recv(spp_common.SOCK_BUF_SIZE) - if data is None: - return -1 - - if logger is not None: - logger.debug("data: %s" % data) - client_id = int(data.decode('utf-8').strip('\0')) - - if client_id < 0 or client_id > spp_common.MAX_SECONDARY: - logger.debug("Failed to get client_id: %d" % client_id) - return -1 - - found = 0 - for i in spp_common.SECONDARY_LIST: - if client_id == i: - found = 1 - break - - if found == 0: - return client_id - - # client_id in use, find a free one - free_client_id = -1 - for i in range(spp_common.MAX_SECONDARY): - found = -1 - for j in spp_common.SECONDARY_LIST: - if i == j: - found = i - break - if found == -1: - free_client_id = i - break - - if logger is not None: - logger.debug("Found free_client_id: %d" % free_client_id) - - if free_client_id < 0: - return -1 - - msg = "_set_client_id %u" % free_client_id - conn.send(msg.encode('utf-8')) - data = conn.recv(spp_common.SOCK_BUF_SIZE) - - return free_client_id - - def stop(self): - if self.sock_opened is True: - try: - self.sock.shutdown(socket.SHUT_RDWR) - except socket.error as excep: - print(excep, ", Error while closing sock in AcceptThread!") - traceback.print_exc() - self.sock.close() - self.stop_event.set() - - def run(self): - try: - while True: - # Accepting incoming connections - conn, _ = self.sock.accept() - - client_id = self.getclientid(conn) - if client_id < 0: - break - - # Creating new thread. - # Calling secondarythread function for this function and - # passing conn as argument. - spp_common.SECONDARY_LIST.append(client_id) - spp_common.MAIN2SEC[client_id] = Queue() - spp_common.SEC2MAIN[client_id] = Queue() - connection_thread = ConnectionThread(client_id, conn) - connection_thread.daemon = True - connection_thread.start() - - spp_common.SECONDARY_COUNT += 1 - except Exception as excep: - print(excep, ", Error in AcceptThread!") - traceback.print_exc() - self.sock_opened = False - self.sock.close() - - -class PrimaryThread(threading.Thread): - - def __init__(self, host, port): - super(PrimaryThread, self).__init__() - self.daemon = True - - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # Binding primary socket to a address. bind() takes tuple of host - # and port. - self.sock.bind((host, port)) - - # Listening primary at the address - self.sock.listen(1) # 5 denotes the number of clients can queue - - self.stop_event = threading.Event() - self.sock_opened = False - - def stop(self): - if self.sock_opened is True: - self.sock.shutdown(socket.SHUT_RDWR) - self.sock.close() - self.stop_event.set() - - def run(self): - cmd_str = '' - - while True: - # waiting for connection - spp_common.PRIMARY = False - conn, addr = self.sock.accept() - spp_common.PRIMARY = True - - while conn: - try: - _, _, _ = select.select([conn, ], [conn, ], [], 5) - except select.error: - break - - self.sock_opened = True - # Sending message to connected primary - try: - cmd_str = spp_common.MAIN2PRIMARY.get(True) - conn.send(cmd_str) # send only takes string - except KeyError: - break - except Exception as excep: - print( - excep, - ", Error while sending msg in primarythread()!") - break - - # Receiving from primary - try: - data = conn.recv(spp_common.SOCK_BUF_SIZE) - if data: - spp_common.PRIMARY2MAIN.put( - data.decode('utf-8').strip('\0')) - else: - spp_common.PRIMARY2MAIN.put('{"status": "closed"}') - conn.close() - self.sock_opened = False - break - except Exception as excep: - print( - excep, - ", Error while receiving msg in primarythread()!") - break diff --git a/src/controller/shell.py b/src/controller/shell.py index 3e0ca00..1363fc5 100644 --- a/src/controller/shell.py +++ b/src/controller/shell.py @@ -5,13 +5,12 @@ from __future__ import absolute_import import cmd -import json import os -from queue import Empty import re import readline from .shell_lib import common from . import spp_common +from . import spp_ctl_client from .spp_common import logger import subprocess from . import topo @@ -20,17 +19,13 @@ from . import topo class Shell(cmd.Cmd, object): """SPP command prompt.""" + # TODO(yasufum) move hist_file to $HOME as default hist_file = '.spp_history' intro = 'Welcome to the spp. Type help or ? to list commands.\n' prompt = 'spp > ' recorded_file = None - CMD_OK = "OK" - CMD_NG = "NG" - CMD_NOTREADY = "NOTREADY" - CMD_ERROR = "ERROR" - PORT_TYPES = ['phy', 'ring', 'vhost', 'pcap', 'nullpmd'] PRI_CMDS = ['status', 'exit', 'clear'] @@ -40,6 +35,8 @@ class Shell(cmd.Cmd, object): HIST_EXCEPT = ['bye', 'exit', 'history', 'redo'] + rest_common_error_codes = [400, 404, 500] + PLUGIN_DIR = 'command' subgraphs = {} topo_size = '60%' @@ -50,6 +47,10 @@ class Shell(cmd.Cmd, object): else: readline.write_history_file(hist_file) + def __init__(self): + cmd.Cmd.__init__(self) + self.spp_ctl_cli = spp_ctl_client.SppCtlClient() + def default(self, line): """Define defualt behaviour. @@ -78,17 +79,16 @@ class Shell(cmd.Cmd, object): try: for line in open(self.hist_file): - l = line.strip() - if not (l.split(' ')[0] in self.HIST_EXCEPT): - entries.append(l) + line_s = line.strip() + if not (line_s.split(' ')[0] in self.HIST_EXCEPT): + entries.append(line_s) f = open(self.hist_file, "w+") contents = '\n'.join(entries) contents += '\n' f.write(contents) f.close() except IOError: - print('Error: Cannot open history file "%s"' % - self.hist_file) + print('Error: Cannot open history file "%s"' % self.hist_file) def close_all_secondary(self): """Terminate all secondary processes.""" @@ -98,37 +98,39 @@ class Shell(cmd.Cmd, object): tmp_list.append(i) for i in tmp_list: self.command_secondary(i, 'exit') - spp_common.SECONDARY_COUNT = 0 - - def get_status(self): - """Return the status of SPP processes. - - Show the number of each of SPP processes running on. - - spp > status - Soft Patch Panel Status : - primary: 1 - secondary count: 2 - """ - - secondary = [] - for i in spp_common.SECONDARY_LIST: - secondary.append("%d" % i) - stat = { - # PRIMARY is 1 if it is running - "primary": "%d" % spp_common.PRIMARY, - "secondary": secondary - } - return stat def print_status(self): """Display information about connected clients.""" - print("Soft Patch Panel Status :") - print("primary: %d" % spp_common.PRIMARY) # it is 1 if PRIMA == True - print("secondary count: %d" % len(spp_common.SECONDARY_LIST)) - for i in spp_common.SECONDARY_LIST: - print("Connected secondary id: %d" % i) + res = self.spp_ctl_cli.get('processes') + if res is not None: + if res.status_code == 200: + proc_objs = res.json() + pri_obj = None + sec_obj = {} + sec_obj['nfv'] = [] + + for proc_obj in proc_objs: + if proc_obj['type'] == 'primary': + pri_obj = proc_obj + elif proc_obj['type'] == 'nfv': + sec_obj['nfv'].append(proc_obj) + + print('- primary:') + if pri_obj is not None: + print(' - status: running') + else: + print(' - status: not running') + + print('- secondary:') + print(' - processes:') + for obj in sec_obj['nfv']: + print(' %d: %s:%s' % ( + obj['client-id'], obj['type'], obj['client-id'])) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') def print_pri_status(self, json_obj): """Parse SPP primary's status and print. @@ -170,7 +172,7 @@ class Shell(cmd.Cmd, object): ... """ - if json_obj.has_key('phy_ports'): + if 'phy_ports' in json_obj: print('Physical Ports:') print(' ID rx tx tx_drop mac_addr') for pports in json_obj['phy_ports']: @@ -178,7 +180,7 @@ class Shell(cmd.Cmd, object): pports['id'], pports['rx'], pports['tx'], pports['tx_drop'], pports['eth'])) - if json_obj.has_key('ring_ports'): + if 'ring_ports' in json_obj: print('Ring Ports:') print(' ID rx tx rx_drop rx_drop') for rports in json_obj['ring_ports']: @@ -186,7 +188,7 @@ class Shell(cmd.Cmd, object): rports['id'], rports['rx'], rports['tx'], rports['rx_drop'], rports['tx_drop'])) - def print_sec_status(self, msg): + def print_sec_status(self, json_obj): """Parse and print message from SPP secondary. Print status received from secondary. @@ -203,55 +205,135 @@ class Shell(cmd.Cmd, object): {"client-id":1,...,"patches":[{"src":"phy:0"...},...]}'\x00.. """ - msg = msg.replace("\x00", "") # Clean received msg - - try: - sec_attr = json.loads(msg) - print('- status: %s' % sec_attr['status']) - print('- ports:') - for port in sec_attr['ports']: - dst = None - for patch in sec_attr['patches']: - if patch['src'] == port: - dst = patch['dst'] - - if dst is None: - print(' - %s' % port) - else: - print(' - %s -> %s' % (port, dst)) - except ValueError as err: - print('Invalid format: {0}.'.format(err)) - print("'%s'" % msg) + sec_attr = json_obj + print('- status: %s' % sec_attr['status']) + print('- ports:') + for port in sec_attr['ports']: + dst = None + for patch in sec_attr['patches']: + if patch['src'] == port: + dst = patch['dst'] + + if dst is None: + print(' - %s' % port) + else: + print(' - %s -> %s' % (port, dst)) def command_primary(self, command): """Send command to primary process""" - if spp_common.PRIMARY: - spp_common.MAIN2PRIMARY.put(command.encode('utf-8')) - recv = spp_common.PRIMARY2MAIN.get(True) - json_obj = json.loads(recv) - self.print_pri_status(json_obj) - return self.CMD_OK, recv + if command == 'status': + res = self.spp_ctl_cli.get('primary/status') + if res is not None: + if res.status_code == 200: + self.print_pri_status(res.json()) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif command == 'clear': + res = self.spp_ctl_cli.delete('primary/status') + if res is not None: + if res.status_code == 204: + print('Clear port statistics.') + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif command == 'exit': + print('"pri; exit" is deprecated.') + else: - recv = "primary not started" - print(recv) - return self.CMD_NOTREADY, recv + print('Invalid pri command!') def command_secondary(self, sec_id, command): - """Send command to secondary process with sec_id""" + """Send command to secondary process.""" + + cmd = command.split(' ')[0] + params = command.split(' ')[1:] + + if cmd == 'status': + res = self.spp_ctl_cli.get('nfvs/%d' % sec_id) + if res is not None: + if res.status_code == 200: + self.print_sec_status(res.json()) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif cmd == 'add': + req_params = {'action': 'add', 'port': params[0]} + res = self.spp_ctl_cli.put('nfvs/%d/ports' % sec_id, req_params) + if res is not None: + if res.status_code == 204: + print('Add %s.' % params[0]) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif cmd == 'del': + req_params = {'action': 'del', 'port': params[0]} + res = self.spp_ctl_cli.put('nfvs/%d/ports' % sec_id, req_params) + if res is not None: + if res.status_code == 204: + print('Delete %s.' % params[0]) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') - if sec_id in spp_common.SECONDARY_LIST: - spp_common.MAIN2SEC[sec_id].put(command.encode('utf-8')) - recv = spp_common.SEC2MAIN[sec_id].get(True) - if command == 'status': - self.print_sec_status(recv) + elif cmd == 'forward' or cmd == 'stop': + if cmd == 'forward': + req_params = {'action': 'start'} + elif cmd == 'stop': + req_params = {'action': 'stop'} + else: + print('Unknown command. "forward" or "stop"?') + + res = self.spp_ctl_cli.put('nfvs/%d/forward' % sec_id, req_params) + if res is not None: + if res.status_code == 204: + if cmd == 'forward': + print('Start forwarding.') + else: + print('Stop forwarding.') + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif cmd == 'patch': + if params[0] == 'reset': + res = self.spp_ctl_cli.delete('nfvs/%d/patches' % sec_id) + if res is not None: + if res.status_code == 204: + print('Clear all of patches.') + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') else: - print(recv) - return self.CMD_OK, recv + req_params = {'src': params[0], 'dst': params[1]} + res = self.spp_ctl_cli.put( + 'nfvs/%d/patches' % sec_id, req_params) + if res is not None: + if res.status_code == 204: + print('Patch ports (%s -> %s).' % ( + params[0], params[1])) + elif res.status_code in self.rest_common_error_codes: + pass + else: + print('Error: unknown response.') + + elif cmd == 'exit': + print('do nothing.') + else: - message = "secondary id %d not exist" % sec_id - print(message) - return self.CMD_NOTREADY, message + print('Invalid command "%s".' % cmd) def is_patched_ids_valid(self, id1, id2, delim=':'): """Check if port IDs are valid @@ -275,6 +357,9 @@ class Shell(cmd.Cmd, object): def check_sec_cmds(self, cmds): """Validate secondary commands before sending""" + # TODO(yasufum) change to return True or False, or None + # instead of 0 or 1 + level1 = ['status', 'exit', 'forward', 'stop'] level2 = ['add', 'patch', 'del'] patch_args = ['reset'] @@ -313,21 +398,6 @@ class Shell(cmd.Cmd, object): res = re.sub(r'\s?;\s?', ";", tmparg) return res - def response(self, result, message): - """Enqueue message from other than CLI""" - - try: - rcmd = spp_common.RCMD_EXECUTE_QUEUE.get(False) - except Empty: - return - - if (rcmd == spp_common.REMOTE_COMMAND): - param = result + '\n' + message - spp_common.RCMD_RESULT_QUEUE.put(param.encode('utf-8')) - else: - if logger is not None: - logger.debug("unknown remote command = %s" % rcmd) - def precmd(self, line): """Called before running a command @@ -357,8 +427,6 @@ class Shell(cmd.Cmd, object): """ self.print_status() - stat = self.get_status() - self.response(self.CMD_OK, json.dumps(stat)) def do_pri(self, command): """Send a command to primary process. @@ -376,12 +444,10 @@ class Shell(cmd.Cmd, object): logger.info("Receive pri command: '%s'" % command) if command and (command in self.PRI_CMDS): - result, message = self.command_primary(command) - self.response(result, message) + self.command_primary(command) else: message = "Invalid pri command: '%s'" % command print(message) - self.response(self.CMD_ERROR, message) def complete_pri(self, text, line, begidx, endidx): """Completion for primary process commands""" @@ -416,22 +482,18 @@ class Shell(cmd.Cmd, object): tmparg = self.clean_cmd(arg) cmds = tmparg.split(';') if len(cmds) < 2: - message = "error" + message = "'sec' requires an ID and ';' before command." print(message) - self.response(self.CMD_ERROR, message) elif str.isdigit(cmds[0]): sec_id = int(cmds[0]) if self.check_sec_cmds(cmds[1]): - result, message = self.command_secondary(sec_id, cmds[1]) - self.response(result, message) + self.command_secondary(sec_id, cmds[1]) else: message = "invalid cmd" print(message) - self.response(self.CMD_ERROR, message) else: print(cmds[0]) print("first %s" % cmds[1]) - self.response(self.CMD_ERROR, "invalid format") def complete_sec(self, text, line, begidx, endidx): """Completion for secondary process commands""" @@ -493,7 +555,6 @@ class Shell(cmd.Cmd, object): print("Record file is required!") else: self.recorded_file = open(fname, 'w') - self.response(self.CMD_OK, "record") def complete_record(self, text, line, begidx, endidx): return common.compl_common(text, line) @@ -519,11 +580,9 @@ class Shell(cmd.Cmd, object): lines.append("# %s" % line) lines.append(line) self.cmdqueue.extend(lines) - self.response(self.CMD_OK, "playback") except IOError: message = "Error: File does not exist." print(message) - self.response(self.CMD_NG, message) def complete_playback(self, text, line, begidx, endidx): return common.compl_common(text, line) @@ -661,8 +720,7 @@ class Shell(cmd.Cmd, object): cmd_options = ' '.join(cmd_ary) eval('self.do_%s(cmd_options)' % cmd) except IOError: - print('Error: Cannot open history file "%s"' % - self.hist_file) + print('Error: Cannot open history file "%s"' % self.hist_file) def do_history(self, arg): """Show command history. @@ -694,13 +752,12 @@ class Shell(cmd.Cmd, object): cnt = 1 for line in f: - l = line.strip() - print(hist_format % (cnt, l)) + line_s = line.strip() + print(hist_format % (cnt, line_s)) cnt += 1 f.close() except IOError: - print('Error: Cannot open history file "%s"' % - self.hist_file) + print('Error: Cannot open history file "%s"' % self.hist_file) def complete_cat(self, text, line, begidx, endidx): return common.compl_common(text, line) @@ -877,7 +934,6 @@ class Shell(cmd.Cmd, object): if len(spp_common.SECONDARY_LIST) == 0: message = "secondary not exist" print(message) - self.response(self.CMD_NOTREADY, message) else: tp = topo.Topo( spp_common.SECONDARY_LIST, @@ -898,7 +954,6 @@ class Shell(cmd.Cmd, object): else: print("Usage: topo dst [ftype]") return False - self.response(self.CMD_OK, json.dumps(res_ary)) def complete_topo(self, text, line, begidx, endidx): """Complete topo command diff --git a/src/controller/spp.py b/src/controller/spp.py index 80b1fab..373bb93 100644 --- a/src/controller/spp.py +++ b/src/controller/spp.py @@ -3,110 +3,21 @@ # Copyright(c) 2015-2016 Intel Corporation from __future__ import absolute_import -# from __future__ import print_function import argparse -from .conn_thread import AcceptThread -from .conn_thread import PrimaryThread from .shell import Shell -import socket -import socketserver -from . import spp_common -from .spp_common import logger import sys -import threading -import traceback - - -class CmdRequestHandler(socketserver.BaseRequestHandler): - """Request handler for getting message from remote entities""" - - CMD = None # contains a instance of Shell class - - def handle(self): - self.data = self.request.recv(spp_common.SOCK_BUF_SIZE).strip() - cur_thread = threading.currentThread() - print(cur_thread.getName()) - print(self.client_address[0]) - print(self.data) - if CmdRequestHandler.CMD is not None: - spp_common.RCMD_EXECUTE_QUEUE.put(spp_common.REMOTE_COMMAND) - CmdRequestHandler.CMD.onecmd(self.data) - ret = spp_common.RCMD_RESULT_QUEUE.get() - if (ret is not None): - if logger is not None: - logger.debug("ret:%s" % ret) - self.request.send(ret) - else: - if logger is not None: - logger.debug("ret is none") - self.request.send("") - else: - if logger is not None: - logger.debug("CMD is None") - self.request.send("") def main(argv): - """main""" parser = argparse.ArgumentParser(description="SPP Controller") - - parser.add_argument( - "-p", "--pri-port", - type=int, default=5555, - help="primary port number") - parser.add_argument( - "-s", "--sec-port", - type=int, default=6666, - help="secondary port number") - parser.add_argument( - "-m", "--mng-port", - type=int, default=7777, - help="management port number") - parser.add_argument( - "-ip", "--ipaddr", - type=str, default='', # 'localhost' or '127.0.0.1' or '' are all same - help="IP address") args = parser.parse_args() - host = args.ipaddr - primary_port = args.pri_port - secondary_port = args.sec_port - management_port = args.mng_port - - print("primary port : %d" % primary_port) - print('secondary port : %d' % secondary_port) - print('management port : %d' % management_port) - - primary_thread = PrimaryThread(host, primary_port) - primary_thread.start() - - accept_thread = AcceptThread(host, secondary_port) - accept_thread.start() - shell = Shell() - - # Run request handler as a TCP server thread - socketserver.ThreadingTCPServer.allow_reuse_address = True - CmdRequestHandler.CMD = shell - command_server = socketserver.ThreadingTCPServer( - (host, management_port), CmdRequestHandler) - - t = threading.Thread(target=command_server.serve_forever) - t.setDaemon(True) - t.start() - shell.cmdloop() shell = None - try: - primary_thread.stop() - accept_thread.stop() - except socket.error as excep: - print(excep, ", Error while terminating threads in main()!") - traceback.print_exc() - if __name__ == "__main__": diff --git a/src/controller/spp_common.py b/src/controller/spp_common.py index 0cac2d9..0986918 100644 --- a/src/controller/spp_common.py +++ b/src/controller/spp_common.py @@ -4,7 +4,6 @@ import logging import os -from queue import Queue # Setup logger object logger = logging.getLogger(__name__) @@ -20,49 +19,10 @@ handler.setFormatter(formatter) logger.setLevel(logging.DEBUG) logger.addHandler(handler) -PRIMARY = '' SECONDARY_LIST = [] -# Initialize primary comm channel -MAIN2PRIMARY = Queue() -PRIMARY2MAIN = Queue() - # Maximum num of sock queues for secondaries MAX_SECONDARY = 16 -# Should be as same as MSG_SIZE in src/shared/common.h -SOCK_BUF_SIZE = 2048 - -PRIMARY = '' -SECONDARY_COUNT = 0 - -REMOTE_COMMAND = "RCMD" -RCMD_EXECUTE_QUEUE = Queue() -RCMD_RESULT_QUEUE = Queue() - delim_node = '_' delim_label = ':' - - -class GrowingList(list): - """Growing List - - Custom list type for appending index over the range which is - similar to ruby's Array. Empty index is filled with 'None'. - It is used to contain queues for secondaries with any sec ID. - - >>> gl = GrowingList() - >>> gl.[3] = 0 - >>> gl - [None, None, None, 0] - """ - - def __setitem__(self, index, value): - if index >= len(self): - self.extend([None]*(index + 1 - len(self))) - list.__setitem__(self, index, value) - - -# init secondary comm channel list -MAIN2SEC = GrowingList() -SEC2MAIN = GrowingList() -- 2.13.1