From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id E198EA0543; Wed, 24 Aug 2022 18:25:23 +0200 (CEST) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 134BB4282B; Wed, 24 Aug 2022 18:25:06 +0200 (CEST) Received: from lb.pantheon.sk (lb.pantheon.sk [46.229.239.20]) by mails.dpdk.org (Postfix) with ESMTP id 5A3844280C for ; Wed, 24 Aug 2022 18:25:02 +0200 (CEST) Received: from localhost (localhost [127.0.0.1]) by lb.pantheon.sk (Postfix) with ESMTP id 7553ACD278; Wed, 24 Aug 2022 18:25:01 +0200 (CEST) X-Virus-Scanned: amavisd-new at siecit.sk Received: from lb.pantheon.sk ([127.0.0.1]) by localhost (lb.pantheon.sk [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id EY9xfvrB5A99; Wed, 24 Aug 2022 18:25:00 +0200 (CEST) Received: from entguard.lab.pantheon.local (unknown [46.229.239.141]) by lb.pantheon.sk (Postfix) with ESMTP id 99D91CD270; Wed, 24 Aug 2022 18:24:56 +0200 (CEST) From: =?UTF-8?q?Juraj=20Linke=C5=A1?= To: thomas@monjalon.net, david.marchand@redhat.com, ronan.randles@intel.com, Honnappa.Nagarahalli@arm.com, ohilyard@iol.unh.edu, lijuan.tu@intel.com Cc: dev@dpdk.org, =?UTF-8?q?Juraj=20Linke=C5=A1?= Subject: [RFC PATCH v1 04/10] dts: add basic node management methods Date: Wed, 24 Aug 2022 16:24:48 +0000 Message-Id: <20220824162454.394285-5-juraj.linkes@pantheon.tech> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20220824162454.394285-1-juraj.linkes@pantheon.tech> References: <20220824162454.394285-1-juraj.linkes@pantheon.tech> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org The nodes DTS is working with are either a system under test node (where DPDK runs) and a traffic generator node. The added methods are common to both system under test nodes and traffic generator nodes. Signed-off-by: Juraj Linkeš --- dts/framework/node.py | 395 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 387 insertions(+), 8 deletions(-) diff --git a/dts/framework/node.py b/dts/framework/node.py index e5c5454ebe..c08c79cca3 100644 --- a/dts/framework/node.py +++ b/dts/framework/node.py @@ -4,9 +4,13 @@ # Copyright(c) 2022 University of New Hampshire # +import dataclasses +import re +from abc import ABC from typing import Optional -from .config import NodeConfiguration +from framework.config import OS, NodeConfiguration + from .logger import DTSLOG, getLogger from .settings import SETTINGS from .ssh_connection import SSHConnection @@ -16,22 +20,41 @@ """ -class Node(object): +@dataclasses.dataclass(slots=True, frozen=True) +class CPUCore: + thread: str + socket: str + core: int + + +class Node(ABC): """ Basic module for node management. This module implements methods that manage a node, such as information gathering (of CPU/PCI/NIC) and environment setup. """ - _config: NodeConfiguration + name: str + skip_setup: bool + sessions: list[SSHConnection] + default_hugepages_cleared: bool + prefix_list: list[str] + cores: list[CPUCore] + number_of_cores: int logger: DTSLOG main_session: SSHConnection - name: str + _config: NodeConfiguration _other_sessions: list[SSHConnection] def __init__(self, node_config: NodeConfiguration): self._config = node_config self.name = node_config.name + self.skip_setup = SETTINGS.skip_setup + self.default_hugepages_cleared = False + self.prefix_list = [] + self.cores = [] + self.number_of_cores = 0 + self._dpdk_dir = None self.logger = getLogger(self.name) self.logger.info(f"Created node: {self.name}") @@ -42,22 +65,23 @@ def __init__(self, node_config: NodeConfiguration): self.get_username(), self.get_password(), ) + self._other_sessions = [] def get_ip_address(self) -> str: """ - Get SUT's ip address. + Get Node's ip address. """ return self._config.hostname def get_password(self) -> Optional[str]: """ - Get SUT's login password. + Get Node's login password. """ return self._config.password def get_username(self) -> str: """ - Get SUT's login username. + Get Node's login username. """ return self._config.user @@ -66,6 +90,7 @@ def send_expect( command: str, expected: str, timeout: float = SETTINGS.timeout, + alt_session: bool = False, verify: bool = False, trim_whitespace: bool = True, ) -> str | int: @@ -81,19 +106,373 @@ def send_expect( if trim_whitespace: expected = expected.strip() + if alt_session and len(self._other_sessions): + return self._other_sessions[0].send_expect( + command, expected, timeout, verify + ) + return self.main_session.send_expect(command, expected, timeout, verify) - def send_command(self, cmds: str, timeout: float = SETTINGS.timeout) -> str: + def send_command( + self, cmds: str, timeout: float = SETTINGS.timeout, alt_session: bool = False + ) -> str: """ Send commands to node and return string before timeout. """ + if alt_session and len(self._other_sessions): + return self._other_sessions[0].send_command(cmds, timeout) + return self.main_session.send_command(cmds, timeout) + def get_session_output(self, timeout: float = SETTINGS.timeout): + """ + Get session output message before timeout + """ + return self.main_session.get_session_before(timeout) + + def get_total_huge_pages(self): + """ + Get the huge page number of Node. + """ + huge_pages = self.send_expect( + "awk '/HugePages_Total/ { print $2 }' /proc/meminfo", "# ", alt_session=True + ) + if huge_pages != "": + return int(huge_pages.split()[0]) + return 0 + + def mount_huge_pages(self): + """ + Mount hugepage file system on Node. + """ + self.send_expect("umount `awk '/hugetlbfs/ { print $2 }' /proc/mounts`", "# ") + out = self.send_expect("awk '/hugetlbfs/ { print $2 }' /proc/mounts", "# ") + # only mount hugepage when no hugetlbfs mounted + if not len(out): + self.send_expect("mkdir -p /mnt/huge", "# ") + self.send_expect("mount -t hugetlbfs nodev /mnt/huge", "# ") + + def strip_hugepage_path(self): + mounts = self.send_expect("cat /proc/mounts |grep hugetlbfs", "# ") + infos = mounts.split() + if len(infos) >= 2: + return infos[1] + else: + return "" + + def set_huge_pages(self, huge_pages, numa=""): + """ + Set numbers of huge pages + """ + page_size = self.send_expect( + "awk '/Hugepagesize/ {print $2}' /proc/meminfo", "# " + ) + + if not numa: + self.send_expect( + "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages" + % (huge_pages, page_size), + "# ", + 5, + ) + else: + # sometimes we set hugepage on kernel cmdline, so we clear it + if not self.default_hugepages_cleared: + self.send_expect( + "echo 0 > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages" + % (page_size), + "# ", + 5, + ) + self.default_hugepages_cleared = True + + # some platform not support numa, example VM SUT + try: + self.send_expect( + "echo %d > /sys/devices/system/node/%s/hugepages/hugepages-%skB/nr_hugepages" + % (huge_pages, numa, page_size), + "# ", + 5, + ) + except: + self.logger.warning("set %d hugepage on %s error" % (huge_pages, numa)) + self.send_expect( + "echo %d > /sys/kernel/mm/hugepages/hugepages-%skB/nr_hugepages" + % (huge_pages.page_size), + "# ", + 5, + ) + + def get_dpdk_pids(self, prefix_list, alt_session): + """ + get all dpdk applications on Node. + """ + file_directories = [ + "/var/run/dpdk/%s/config" % file_prefix for file_prefix in prefix_list + ] + pids = [] + pid_reg = r"p(\d+)" + for config_file in file_directories: + # Covers case where the process is run as a unprivileged user and does not generate the file + isfile = self.send_expect( + "ls -l {}".format(config_file), "# ", 20, alt_session + ) + if isfile: + cmd = "lsof -Fp %s" % config_file + out = self.send_expect(cmd, "# ", 20, alt_session) + if len(out): + lines = out.split("\r\n") + for line in lines: + m = re.match(pid_reg, line) + if m: + pids.append(m.group(1)) + for pid in pids: + self.send_expect("kill -9 %s" % pid, "# ", 20, alt_session) + self.get_session_output(timeout=2) + + hugepage_info = [ + "/var/run/dpdk/%s/hugepage_info" % file_prefix + for file_prefix in prefix_list + ] + for hugepage in hugepage_info: + # Covers case where the process is run as a unprivileged user and does not generate the file + isfile = self.send_expect( + "ls -l {}".format(hugepage), "# ", 20, alt_session + ) + if isfile: + cmd = "lsof -Fp %s" % hugepage + out = self.send_expect(cmd, "# ", 20, alt_session) + if len(out) and "No such file or directory" not in out: + self.logger.warning("There are some dpdk process not free hugepage") + self.logger.warning("**************************************") + self.logger.warning(out) + self.logger.warning("**************************************") + + # remove directory + directorys = ["/var/run/dpdk/%s" % file_prefix for file_prefix in prefix_list] + for directory in directorys: + cmd = "rm -rf %s" % directory + self.send_expect(cmd, "# ", 20, alt_session) + + # delete hugepage on mnt path + if getattr(self, "hugepage_path", None): + for file_prefix in prefix_list: + cmd = "rm -rf %s/%s*" % (self.hugepage_path, file_prefix) + self.send_expect(cmd, "# ", 20, alt_session) + + def kill_all(self, alt_session=True): + """ + Kill all dpdk applications on Node. + """ + if "Traffic" in str(self): + self.logger.info("kill_all: called by tg") + pass + else: + if self.prefix_list: + self.logger.info("kill_all: called by SUT and prefix list has value.") + self.get_dpdk_pids(self.prefix_list, alt_session) + # init prefix_list + self.prefix_list = [] + else: + self.logger.info("kill_all: called by SUT and has no prefix list.") + out = self.send_command( + "ls -l /var/run/dpdk |awk '/^d/ {print $NF}'", + timeout=0.5, + alt_session=True, + ) + # the last directory is expect string, eg: [PEXPECT]# + if out != "": + dir_list = out.split("\r\n") + self.get_dpdk_pids(dir_list[:-1], alt_session) + + def get_os(self) -> OS: + return self._config.os + + def init_core_list(self): + """ + Load or create core information of Node. + """ + if not self.cores or not self.number_of_cores: + self.init_core_list_uncached() + + def init_core_list_uncached(self): + """ + Scan cores on Node and create core information list. + """ + init_core_list_uncached = getattr( + self, "init_core_list_uncached_%s" % self.get_os() + ) + init_core_list_uncached() + + def init_core_list_uncached_linux(self): + """ + Scan cores in linux and create core information list. + """ + self.cores = [] + + cpuinfo = self.send_expect( + "lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \#", "#", alt_session=True + ) + + cpuinfo = [i for i in cpuinfo.split() if re.match("^\d.+", i)] + # haswell cpu on cottonwood core id not correct + # need additional coremap for haswell cpu + core_id = 0 + coremap = {} + for line in cpuinfo: + (thread, core, socket, node) = line.split(",")[0:4] + + if core not in list(coremap.keys()): + coremap[core] = core_id + core_id += 1 + + if self._config.bypass_core0 and core == "0" and socket == "0": + self.logger.info("Core0 bypassed") + continue + if self._config.arch == "arm64" or self._config.arch == "ppc64": + self.cores.append( + CPUCore(thread=thread, socket=node, core=coremap[core]) + ) + else: + self.cores.append( + CPUCore(thread=thread, socket=socket, core=coremap[core]) + ) + + self.number_of_cores = len(self.cores) + + def get_core_list(self, config, socket=-1, from_last=False): + """ + Get lcore array according to the core config like "all", "1S/1C/1T". + We can specify the physical CPU socket by the "socket" parameter. + """ + if config == "all": + cores = [] + if socket != -1: + for core in self.cores: + if int(core.socket) == socket: + cores.append(core.thread) + else: + cores = [core.thread for core in self.cores] + return cores + + m = re.match("([1234])S/([0-9]+)C/([12])T", config) + + if m: + nr_sockets = int(m.group(1)) + nr_cores = int(m.group(2)) + nr_threads = int(m.group(3)) + + partial_cores = self.cores + + # If not specify socket sockList will be [0,1] in numa system + # If specify socket will just use the socket + if socket < 0: + sockList = set([int(core.socket) for core in partial_cores]) + else: + for n in partial_cores: + if int(n.socket) == socket: + sockList = [int(n.socket)] + + if from_last: + sockList = list(sockList)[-nr_sockets:] + else: + sockList = list(sockList)[:nr_sockets] + partial_cores = [n for n in partial_cores if int(n.socket) in sockList] + thread_list = set([int(n.thread) for n in partial_cores]) + thread_list = list(thread_list) + + # filter usable core to core_list + temp = [] + for sock in sockList: + core_list = set( + [int(n.core) for n in partial_cores if int(n.socket) == sock] + ) + if from_last: + core_list = list(core_list)[-nr_cores:] + else: + core_list = list(core_list)[:nr_cores] + temp.extend(core_list) + + core_list = temp + + # if system core less than request just use all cores in in socket + if len(core_list) < (nr_cores * nr_sockets): + partial_cores = self.cores + sockList = set([int(n.socket) for n in partial_cores]) + + if from_last: + sockList = list(sockList)[-nr_sockets:] + else: + sockList = list(sockList)[:nr_sockets] + partial_cores = [n for n in partial_cores if int(n.socket) in sockList] + + temp = [] + for sock in sockList: + core_list = list( + [int(n.thread) for n in partial_cores if int(n.socket) == sock] + ) + if from_last: + core_list = core_list[-nr_cores:] + else: + core_list = core_list[:nr_cores] + temp.extend(core_list) + + core_list = temp + + partial_cores = [n for n in partial_cores if int(n.core) in core_list] + temp = [] + if len(core_list) < nr_cores: + raise ValueError( + "Cannot get requested core configuration " + "requested {} have {}".format(config, self.cores) + ) + if len(sockList) < nr_sockets: + raise ValueError( + "Cannot get requested core configuration " + "requested {} have {}".format(config, self.cores) + ) + # recheck the core_list and create the thread_list + i = 0 + for sock in sockList: + coreList_aux = [ + int(core_list[n]) + for n in range((nr_cores * i), (nr_cores * i + nr_cores)) + ] + for core in coreList_aux: + thread_list = list( + [ + int(n.thread) + for n in partial_cores + if ((int(n.core) == core) and (int(n.socket) == sock)) + ] + ) + if from_last: + thread_list = thread_list[-nr_threads:] + else: + thread_list = thread_list[:nr_threads] + temp.extend(thread_list) + thread_list = temp + i += 1 + return list(map(str, thread_list)) + + def create_session(self, name: str) -> SSHConnection: + connection = SSHConnection( + self.get_ip_address(), + name, + getLogger(name, node=self.name), + self.get_username(), + self.get_password(), + ) + self._other_sessions.append(connection) + return connection + def node_exit(self) -> None: """ Recover all resource before node exit """ if self.main_session: self.main_session.close() + for session in self._other_sessions: + session.close() self.logger.logger_exit() -- 2.30.2