From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail.valinux.co.jp (mail.valinux.co.jp [210.128.90.3]) by dpdk.org (Postfix) with ESMTP id 25519255 for ; Sun, 23 Sep 2018 04:36:22 +0200 (CEST) Received: from localhost (localhost [127.0.0.1]) by mail.valinux.co.jp (Postfix) with ESMTP id 4FA2DB3C73 for ; Sun, 23 Sep 2018 11:36:21 +0900 (JST) X-Virus-Scanned: Debian amavisd-new at valinux.co.jp Received: from mail.valinux.co.jp ([127.0.0.1]) by localhost (mail.valinux.co.jp [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id np31gxBCJwNX for ; Sun, 23 Sep 2018 11:36:21 +0900 (JST) Received: from [127.0.0.1] (vagw.valinux.co.jp [210.128.90.14]) (using TLSv1 with cipher ECDHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by mail.valinux.co.jp (Postfix) with ESMTPS id 3A707B3C5D for ; Sun, 23 Sep 2018 11:36:21 +0900 (JST) Date: Sun, 23 Sep 2018 11:36:21 +0900 From: Itsuro ODA To: spp@dpdk.org In-Reply-To: <20180923112233.2D71.277DD91C@valinux.co.jp> References: <20180923112233.2D71.277DD91C@valinux.co.jp> Message-Id: <20180923113621.2D90.277DD91C@valinux.co.jp> MIME-Version: 1.0 Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit X-Mailer: Becky! ver. 2.71.01 [ja] Subject: [spp] [PATCH v2 7/9] spp-ctl: SPP controller with Web API X-BeenThere: spp@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: Soft Patch Panel List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sun, 23 Sep 2018 02:36:22 -0000 From: Itsuro Oda Signed-off-by: Itsuro Oda --- src/spp-ctl/spp_ctl.py | 158 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/spp-ctl/spp_ctl.py diff --git a/src/spp-ctl/spp_ctl.py b/src/spp-ctl/spp_ctl.py new file mode 100644 index 0000000..e168747 --- /dev/null +++ b/src/spp-ctl/spp_ctl.py @@ -0,0 +1,158 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Nippon Telegraph and Telephone Corporation + +import eventlet +eventlet.monkey_patch() + +import argparse +import errno +import json +import logging +import socket +import subprocess + +import spp_proc +import spp_webapi + + +LOG = logging.getLogger(__name__) + + +MSG_SIZE = 4096 + + +class Controller(object): + + def __init__(self, pri_port, sec_port, api_port): + self.web_server = spp_webapi.WebServer(self, api_port) + self.procs = {} + self.init_connection(pri_port, sec_port) + + def start(self): + self.web_server.start() + + def init_connection(self, pri_port, sec_port): + self.pri_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.pri_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.pri_sock.bind(('127.0.0.1', pri_port)) + self.pri_sock.listen(1) + self.primary_listen_thread = eventlet.greenthread.spawn( + self.accept_primary) + + self.sec_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sec_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sec_sock.bind(('127.0.0.1', sec_port)) + self.sec_sock.listen(1) + self.secondary_listen_thread = eventlet.greenthread.spawn( + self.accept_secondary) + + def accept_primary(self): + while True: + conn, _ = self.pri_sock.accept() + proc = self.procs.get(spp_proc.ID_PRIMARY) + if proc is not None: + LOG.warning("spp_primary reconnect !") + with proc.sem: + try: + proc.conn.close() + except Exception: + pass + proc.conn = conn + # NOTE: when spp_primary restart, all secondarys must be + # restarted. this is out of controle of spp-ctl. + else: + LOG.info("primary connected.") + self.procs[spp_proc.ID_PRIMARY] = spp_proc.PrimaryProc(conn) + + def accept_secondary(self): + while True: + conn, _ = self.sec_sock.accept() + LOG.debug("sec accepted: get process id") + proc = self._get_proc(conn) + if proc is None: + LOG.error("get process id failed") + conn.close() + continue + old_proc = self.procs.get(proc.id) + if old_proc: + LOG.warning("%s(%d) reconnect !", old_proc.type, old_proc.id) + if old_proc.type != proc.type: + LOG.warning("type changed ! new type: %s", proc.type) + with old_proc.sem: + try: + old_proc.conn.close() + except Exception: + pass + else: + LOG.info("%s(%d) connected.", proc.type, proc.id) + self.procs[proc.id] = proc + + @staticmethod + def _continue_recv(conn): + try: + # must set non-blocking to recieve remining data not to happen + # blocking here. + # NOTE: usually MSG_DONTWAIT flag is used for this purpose but + # this flag is not supported under eventlet. + conn.setblocking(False) + data = b"" + while True: + try: + rcv_data = conn.recv(MSG_SIZE) + data += rcv_data + if len(rcv_data) < MSG_SIZE: + break + except socket.error as e: + if e.args[0] == errno.EAGAIN: + # OK, no data remining. this happens when recieve data + # length is just multiple of MSG_SIZE. + break + raise e + return data + finally: + conn.setblocking(True) + + @staticmethod + def _send_command(conn, command): + conn.sendall(command.encode()) + data = conn.recv(MSG_SIZE) + if data and len(data) == MSG_SIZE: + # could not receive data at once. recieve remining data. + data += self._continue_recv(conn) + if data: + data = data.decode() + return data + + def _get_proc(self, conn): + # it is a bit ad hoc. send "_get_clinet_id" command and try to + # decode reply for each proc type. if success, that is the type. + data = self._send_command(conn, "_get_client_id") + for proc in [spp_proc.VfProc, spp_proc.NfvProc]: + sec_id = proc._decode_client_id(data) + if sec_id is not None: + return proc(sec_id, conn) + + def get_processes(self): + procs = [] + for proc in self.procs.values(): + p = {"type": proc.type} + if proc.id != spp_proc.ID_PRIMARY: + p["client-id"] = proc.id + procs.append(p) + return procs + + +def main(): + parser = argparse.ArgumentParser(description="SPP Controller") + parser.add_argument("-p", dest='pri_port', type=int, default=5555, + action='store', help="primary port") + parser.add_argument("-s", dest='sec_port', type=int, default=6666, + action='store', help="secondary port") + parser.add_argument("-a", dest='api_port', type=int, default=7777, + action='store', help="web api port") + args = parser.parse_args() + + logging.basicConfig(level=logging.DEBUG) + + controller = Controller(args.pri_port, args.sec_port, args.api_port) + controller.start() -- 2.17.0 -- Itsuro ODA