Soft Patch Panel
 help / color / mirror / Atom feed
From: oda@valinux.co.jp
To: spp@dpdk.org
Subject: [spp] [PATCH v3 07/13] spp-ctl: add Controller class
Date: Fri,  5 Oct 2018 10:37:49 +0900	[thread overview]
Message-ID: <20181005013755.19838-8-oda@valinux.co.jp> (raw)
In-Reply-To: <20181005013755.19838-1-oda@valinux.co.jp>

From: Itsuro Oda <oda@valinux.co.jp>

Controller is main class of spp-ctl for setting up connection between
SPP processes and initializing WebServer class for receiving requests.

Signed-off-by: Itsuro Oda <oda@valinux.co.jp>
---
 src/spp-ctl/spp_ctl.py | 158 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 158 insertions(+)
 create mode 100644 src/spp-ctl/spp_ctl.py

diff --git a/src/spp-ctl/spp_ctl.py b/src/spp-ctl/spp_ctl.py
new file mode 100644
index 0000000..e168747
--- /dev/null
+++ b/src/spp-ctl/spp_ctl.py
@@ -0,0 +1,158 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Nippon Telegraph and Telephone Corporation
+
+import eventlet
+eventlet.monkey_patch()
+
+import argparse
+import errno
+import json
+import logging
+import socket
+import subprocess
+
+import spp_proc
+import spp_webapi
+
+
+LOG = logging.getLogger(__name__)
+
+
+MSG_SIZE = 4096
+
+
+class Controller(object):
+
+    def __init__(self, pri_port, sec_port, api_port):
+        self.web_server = spp_webapi.WebServer(self, api_port)
+        self.procs = {}
+        self.init_connection(pri_port, sec_port)
+
+    def start(self):
+        self.web_server.start()
+
+    def init_connection(self, pri_port, sec_port):
+        self.pri_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.pri_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.pri_sock.bind(('127.0.0.1', pri_port))
+        self.pri_sock.listen(1)
+        self.primary_listen_thread = eventlet.greenthread.spawn(
+            self.accept_primary)
+
+        self.sec_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sec_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.sec_sock.bind(('127.0.0.1', sec_port))
+        self.sec_sock.listen(1)
+        self.secondary_listen_thread = eventlet.greenthread.spawn(
+            self.accept_secondary)
+
+    def accept_primary(self):
+        while True:
+            conn, _ = self.pri_sock.accept()
+            proc = self.procs.get(spp_proc.ID_PRIMARY)
+            if proc is not None:
+                LOG.warning("spp_primary reconnect !")
+                with proc.sem:
+                    try:
+                        proc.conn.close()
+                    except Exception:
+                        pass
+                    proc.conn = conn
+                # NOTE: when spp_primary restart, all secondarys must be
+                # restarted. this is out of controle of spp-ctl.
+            else:
+                LOG.info("primary connected.")
+                self.procs[spp_proc.ID_PRIMARY] = spp_proc.PrimaryProc(conn)
+
+    def accept_secondary(self):
+        while True:
+            conn, _ = self.sec_sock.accept()
+            LOG.debug("sec accepted: get process id")
+            proc = self._get_proc(conn)
+            if proc is None:
+                LOG.error("get process id failed")
+                conn.close()
+                continue
+            old_proc = self.procs.get(proc.id)
+            if old_proc:
+                LOG.warning("%s(%d) reconnect !", old_proc.type, old_proc.id)
+                if old_proc.type != proc.type:
+                    LOG.warning("type changed ! new type: %s", proc.type)
+                with old_proc.sem:
+                    try:
+                        old_proc.conn.close()
+                    except Exception:
+                        pass
+            else:
+                LOG.info("%s(%d) connected.", proc.type, proc.id)
+            self.procs[proc.id] = proc
+
+    @staticmethod
+    def _continue_recv(conn):
+        try:
+            # must set non-blocking to recieve remining data not to happen
+            # blocking here.
+            # NOTE: usually MSG_DONTWAIT flag is used for this purpose but
+            # this flag is not supported under eventlet.
+            conn.setblocking(False)
+            data = b""
+            while True:
+                try:
+                    rcv_data = conn.recv(MSG_SIZE)
+                    data += rcv_data
+                    if len(rcv_data) < MSG_SIZE:
+                        break
+                except socket.error as e:
+                    if e.args[0] == errno.EAGAIN:
+                        # OK, no data remining. this happens when recieve data
+                        # length is just multiple of MSG_SIZE.
+                        break
+                    raise e
+            return data
+        finally:
+            conn.setblocking(True)
+
+    @staticmethod
+    def _send_command(conn, command):
+        conn.sendall(command.encode())
+        data = conn.recv(MSG_SIZE)
+        if data and len(data) == MSG_SIZE:
+            # could not receive data at once. recieve remining data.
+            data += self._continue_recv(conn)
+        if data:
+            data = data.decode()
+        return data
+
+    def _get_proc(self, conn):
+        # it is a bit ad hoc. send "_get_clinet_id" command and try to
+        # decode reply for each proc type. if success, that is the type.
+        data = self._send_command(conn, "_get_client_id")
+        for proc in [spp_proc.VfProc, spp_proc.NfvProc]:
+            sec_id = proc._decode_client_id(data)
+            if sec_id is not None:
+                return proc(sec_id, conn)
+
+    def get_processes(self):
+        procs = []
+        for proc in self.procs.values():
+            p = {"type": proc.type}
+            if proc.id != spp_proc.ID_PRIMARY:
+                p["client-id"] = proc.id
+            procs.append(p)
+        return procs
+
+
+def main():
+    parser = argparse.ArgumentParser(description="SPP Controller")
+    parser.add_argument("-p", dest='pri_port', type=int, default=5555,
+                        action='store', help="primary port")
+    parser.add_argument("-s", dest='sec_port', type=int, default=6666,
+                        action='store', help="secondary port")
+    parser.add_argument("-a", dest='api_port', type=int, default=7777,
+                        action='store', help="web api port")
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.DEBUG)
+
+    controller = Controller(args.pri_port, args.sec_port, args.api_port)
+    controller.start()
-- 
2.17.1

  parent reply	other threads:[~2018-10-05  1:37 UTC|newest]

Thread overview: 33+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-09-12 23:25 [spp] [PATCH] spp-ctl: SPP controller with Web API Itsuro ODA
2018-09-18 10:00 ` Yasufumi Ogawa
2018-09-18 21:40   ` Itsuro ODA
2018-10-05  1:37 ` [spp] [PATCH v3 00/13] " oda
2018-10-05  1:37   ` [spp] [PATCH v3 01/13] docs: add overview of spp-ctl oda
2018-10-05  1:37   ` [spp] [PATCH v3 02/13] docs: add API reference " oda
2018-10-05  1:37   ` [spp] [PATCH v3 03/13] docs: add index " oda
2018-10-05  1:37   ` [spp] [PATCH v3 04/13] project: add requirements.txt for spp-ctl oda
2018-10-05  1:37   ` [spp] [PATCH v3 05/13] docs: add spp-ctl to index of doc root oda
2018-10-05  1:37   ` [spp] [PATCH v3 06/13] spp-ctl: add entry point oda
2018-10-05  1:37   ` oda [this message]
2018-10-05  1:37   ` [spp] [PATCH v3 08/13] spp-ctl: add web API handler oda
2018-10-05  1:37   ` [spp] [PATCH v3 09/13] spp-ctl: add spp command interfaces oda
2018-10-05  1:37   ` [spp] [PATCH v3 10/13] spp-ctl: update parsing spp_nfv status oda
2018-10-05  1:37   ` [spp] [PATCH v3 11/13] docs: add request examples of spp-ctl oda
2018-10-05  1:37   ` [spp] [PATCH v3 12/13] docs: correct directives " oda
2018-10-05  1:37   ` [spp] [PATCH v3 13/13] docs: add labels and captions for tables oda
2018-10-05  3:57 ` [spp] [PATCH v4 00/14] spp-ctl: SPP controller with Web API oda
2018-10-05  3:57   ` [spp] [PATCH v4 01/14] docs: add overview of spp-ctl oda
2018-10-05  3:57   ` [spp] [PATCH v4 02/14] docs: add API reference " oda
2018-10-05  3:57   ` [spp] [PATCH v4 03/14] docs: add index " oda
2018-10-05  3:57   ` [spp] [PATCH v4 04/14] project: add requirements.txt for spp-ctl oda
2018-10-05  3:57   ` [spp] [PATCH v4 05/14] docs: add spp-ctl to index of doc root oda
2018-10-05  3:57   ` [spp] [PATCH v4 06/14] spp-ctl: add entry point oda
2018-10-05  3:57   ` [spp] [PATCH v4 07/14] spp-ctl: add Controller class oda
2018-10-05  3:57   ` [spp] [PATCH v4 08/14] spp-ctl: add web API handler oda
2018-10-05  3:57   ` [spp] [PATCH v4 09/14] spp-ctl: add spp command interfaces oda
2018-10-05  3:57   ` [spp] [PATCH v4 10/14] spp-ctl: update parsing spp_nfv status oda
2018-10-05  3:57   ` [spp] [PATCH v4 11/14] docs: add request examples of spp-ctl oda
2018-10-05  3:57   ` [spp] [PATCH v4 12/14] docs: correct directives " oda
2018-10-05  3:57   ` [spp] [PATCH v4 13/14] docs: add labels and captions for tables oda
2018-10-05  3:57   ` [spp] [PATCH v4 14/14] spp-ctl: fix incorrect URL oda
2018-10-09  2:01   ` [spp] [PATCH v4 00/14] spp-ctl: SPP controller with Web API Yasufumi Ogawa

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20181005013755.19838-8-oda@valinux.co.jp \
    --to=oda@valinux.co.jp \
    --cc=spp@dpdk.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).