From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 411A545E06; Mon, 2 Dec 2024 16:09:48 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 30C17402DB; Mon, 2 Dec 2024 16:09:42 +0100 (CET) Received: from mgamail.intel.com (mgamail.intel.com [198.175.65.16]) by mails.dpdk.org (Postfix) with ESMTP id 919F1400D6 for ; Mon, 2 Dec 2024 16:09:39 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1733152180; x=1764688180; h=from:to:subject:date:message-id:in-reply-to:references: mime-version:content-transfer-encoding; bh=7SshB8eOTutQXL3Fll19V96tsH1X6/JrRjxCPb/NReM=; b=b+bYJLQ2drWBF9gNGb5dqWZngDBbPzrhBgwkDiCCbiNsQHi5ia42uthw 9nMI+lIoZOJRnqyUihdxppxJmgIMjTcqK+OuK8gdj2lLjLd6qIY1C2j19 9xpERdzS3gKVrRsVxgwYLaACiqsBit9koBH+TlPjhGbYq1LxmJfTNDmHM 0V7FxoZIiMwf7PaLz4I9zo804xTcxMas1X8Vhd7/qwyZzFnd5aOaaLJym zmpa4YdDvN/KJEsHPVjnyp+XgbKHd3Z8aACwDGXrz9yc5QbG+FD3kVSem w5falIZHfxF1HWGofoEhEwwDNXZDQFQkoX96Gh1/E7pW+sDwwpOFfGh8P A==; X-CSE-ConnectionGUID: X6VBBuQpQ9iOJDvaiOj1gQ== X-CSE-MsgGUID: KL70A3KuTWGCNIQhtbC7fg== X-IronPort-AV: E=McAfee;i="6700,10204,11274"; a="33459126" X-IronPort-AV: E=Sophos;i="6.12,202,1728975600"; d="scan'208";a="33459126" Received: from fmviesa001.fm.intel.com ([10.60.135.141]) by orvoesa108.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 02 Dec 2024 07:09:40 -0800 X-CSE-ConnectionGUID: 4nfcI163Q42644TZUmWebQ== X-CSE-MsgGUID: KihI8ptMRUSqMBA0eIba/A== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="6.12,202,1728975600"; d="scan'208";a="124069541" Received: from silpixa00401119.ir.intel.com ([10.55.129.167]) by fmviesa001.fm.intel.com with ESMTP; 02 Dec 2024 07:09:37 -0800 From: Anatoly Burakov To: dev@dpdk.org, Robin Jarry Subject: [PATCH v1 1/1] usertools/devbind: update coding style Date: Mon, 2 Dec 2024 15:09:34 +0000 Message-ID: <3fb9cafaad635d583e20a02610c7c2c9cb7e2771.1733151400.git.anatoly.burakov@intel.com> X-Mailer: git-send-email 2.43.5 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Devbind is one of the oldest tools in DPDK, and is written in a way that uses a lot of string matching, no type safety, lots of global variables, and has a few inconsistencies in the way it handles data (such as differences between lspci calls and parsing in different circumstances). This patch is a nigh complete rewrite of devbind, with full 100% feature and command-line compatibility with the old version, albeit with a few differences in formatting and error messages. All file handling code has also been replaced with context managers. What's different from old code: - Full PEP-484 compliance - Formatted with Ruff - Much better structured code - Clean and consistent control flow - More comments - Better error handling - Fewer lspci calls - Unified lspci parsing - Using /sys/bus/pci/drivers as a source of truth about kernel modules - Check for iproute2 package - Deprecate --status-dev in favor of optional --status argument Signed-off-by: Anatoly Burakov --- usertools/dpdk-devbind.py | 1736 +++++++++++++++++++++---------------- 1 file changed, 968 insertions(+), 768 deletions(-) diff --git a/usertools/dpdk-devbind.py b/usertools/dpdk-devbind.py index f2a2a9a12f..fe4b60a541 100755 --- a/usertools/dpdk-devbind.py +++ b/usertools/dpdk-devbind.py @@ -1,705 +1,898 @@ #!/usr/bin/env python3 # SPDX-License-Identifier: BSD-3-Clause -# Copyright(c) 2010-2014 Intel Corporation -# +# Copyright(c) 2010-2024 Intel Corporation -import sys +import argparse +import glob +import grp import os +import pwd import subprocess -import argparse -import platform - -from glob import glob -from os.path import exists, basename -from os.path import join as path_join - -# The PCI base class for all devices -network_class = {'Class': '02', 'Vendor': None, 'Device': None, - 'SVendor': None, 'SDevice': None} -acceleration_class = {'Class': '12', 'Vendor': None, 'Device': None, - 'SVendor': None, 'SDevice': None} -ifpga_class = {'Class': '12', 'Vendor': '8086', 'Device': '0b30', - 'SVendor': None, 'SDevice': None} -encryption_class = {'Class': '10', 'Vendor': None, 'Device': None, - 'SVendor': None, 'SDevice': None} -intel_processor_class = {'Class': '0b', 'Vendor': '8086', 'Device': None, - 'SVendor': None, 'SDevice': None} -cavium_sso = {'Class': '08', 'Vendor': '177d', 'Device': 'a04b,a04d', - 'SVendor': None, 'SDevice': None} -cavium_fpa = {'Class': '08', 'Vendor': '177d', 'Device': 'a053', - 'SVendor': None, 'SDevice': None} -cavium_pkx = {'Class': '08', 'Vendor': '177d', 'Device': 'a0dd,a049', - 'SVendor': None, 'SDevice': None} -cavium_tim = {'Class': '08', 'Vendor': '177d', 'Device': 'a051', - 'SVendor': None, 'SDevice': None} -cavium_zip = {'Class': '12', 'Vendor': '177d', 'Device': 'a037', - 'SVendor': None, 'SDevice': None} -avp_vnic = {'Class': '05', 'Vendor': '1af4', 'Device': '1110', - 'SVendor': None, 'SDevice': None} - -cnxk_bphy = {'Class': '08', 'Vendor': '177d', 'Device': 'a089', - 'SVendor': None, 'SDevice': None} -cnxk_bphy_cgx = {'Class': '08', 'Vendor': '177d', 'Device': 'a059,a060', - 'SVendor': None, 'SDevice': None} -cnxk_dma = {'Class': '08', 'Vendor': '177d', 'Device': 'a081', - 'SVendor': None, 'SDevice': None} -cnxk_inl_dev = {'Class': '08', 'Vendor': '177d', 'Device': 'a0f0,a0f1', - 'SVendor': None, 'SDevice': None} - -hisilicon_dma = {'Class': '08', 'Vendor': '19e5', 'Device': 'a122', - 'SVendor': None, 'SDevice': None} -odm_dma = {'Class': '08', 'Vendor': '177d', 'Device': 'a08c', - 'SVendor': None, 'SDevice': None} - -intel_dlb = {'Class': '0b', 'Vendor': '8086', 'Device': '270b,2710,2714', - 'SVendor': None, 'SDevice': None} -intel_ioat_bdw = {'Class': '08', 'Vendor': '8086', - 'Device': '6f20,6f21,6f22,6f23,6f24,6f25,6f26,6f27,6f2e,6f2f', - 'SVendor': None, 'SDevice': None} -intel_ioat_skx = {'Class': '08', 'Vendor': '8086', 'Device': '2021', - 'SVendor': None, 'SDevice': None} -intel_ioat_icx = {'Class': '08', 'Vendor': '8086', 'Device': '0b00', - 'SVendor': None, 'SDevice': None} -intel_idxd_spr = {'Class': '08', 'Vendor': '8086', 'Device': '0b25', - 'SVendor': None, 'SDevice': None} -intel_ntb_skx = {'Class': '06', 'Vendor': '8086', 'Device': '201c', - 'SVendor': None, 'SDevice': None} -intel_ntb_icx = {'Class': '06', 'Vendor': '8086', 'Device': '347e', - 'SVendor': None, 'SDevice': None} - -cnxk_sso = {'Class': '08', 'Vendor': '177d', 'Device': 'a0f9,a0fa', - 'SVendor': None, 'SDevice': None} -cnxk_npa = {'Class': '08', 'Vendor': '177d', 'Device': 'a0fb,a0fc', - 'SVendor': None, 'SDevice': None} -cn9k_ree = {'Class': '08', 'Vendor': '177d', 'Device': 'a0f4', - 'SVendor': None, 'SDevice': None} - -virtio_blk = {'Class': '01', 'Vendor': "1af4", 'Device': '1001,1042', - 'SVendor': None, 'SDevice': None} - -cnxk_ml = {'Class': '08', 'Vendor': '177d', 'Device': 'a092', - 'SVendor': None, 'SDevice': None} - -network_devices = [network_class, cavium_pkx, avp_vnic, ifpga_class] -baseband_devices = [acceleration_class] -crypto_devices = [encryption_class, intel_processor_class] -dma_devices = [cnxk_dma, hisilicon_dma, - intel_idxd_spr, intel_ioat_bdw, intel_ioat_icx, intel_ioat_skx, - odm_dma] -eventdev_devices = [cavium_sso, cavium_tim, intel_dlb, cnxk_sso] -mempool_devices = [cavium_fpa, cnxk_npa] -compress_devices = [cavium_zip] -regex_devices = [cn9k_ree] -ml_devices = [cnxk_ml] -misc_devices = [cnxk_bphy, cnxk_bphy_cgx, cnxk_inl_dev, - intel_ntb_skx, intel_ntb_icx, - virtio_blk] - -# global dict ethernet devices present. Dictionary indexed by PCI address. -# Each device within this is itself a dictionary of device properties -devices = {} -# list of supported DPDK drivers -dpdk_drivers = ["igb_uio", "vfio-pci", "uio_pci_generic"] -# list of currently loaded kernel modules -loaded_modules = None - -# command-line arg flags -b_flag = None -status_flag = False -force_flag = False -noiommu_flag = False -args = [] - - -# check if this system has NUMA support -def is_numa(): - return os.path.exists('/sys/devices/system/node') - - -# check if a specific kernel module is loaded -def module_is_loaded(module): - global loaded_modules - - if module == 'vfio_pci': - module = 'vfio-pci' - - if loaded_modules: - return module in loaded_modules - - # Get list of sysfs modules (both built-in and dynamically loaded) - sysfs_path = '/sys/module/' - - # Get the list of directories in sysfs_path - sysfs_mods = [m for m in os.listdir(sysfs_path) - if os.path.isdir(os.path.join(sysfs_path, m))] - - # special case for vfio_pci (module is named vfio-pci, - # but its .ko is named vfio_pci) - sysfs_mods = [a if a != 'vfio_pci' else 'vfio-pci' for a in sysfs_mods] - - loaded_modules = sysfs_mods - - # add built-in modules as loaded - release = platform.uname().release - filename = os.path.join("/lib/modules/", release, "modules.builtin") - if os.path.exists(filename): - try: - with open(filename) as f: - loaded_modules += [os.path.splitext(os.path.basename(mod))[0] for mod in f] - except IOError: - print("Warning: cannot read list of built-in kernel modules") - - return module in loaded_modules - - -def check_modules(): - '''Checks that igb_uio is loaded''' - global dpdk_drivers - - # list of supported modules - mods = [{"Name": driver, "Found": False} for driver in dpdk_drivers] - - # first check if module is loaded - for mod in mods: - if module_is_loaded(mod["Name"]): - mod["Found"] = True - - # check if we have at least one loaded module - if True not in [mod["Found"] for mod in mods] and b_flag is not None: - print("Warning: no supported DPDK kernel modules are loaded", file=sys.stderr) - - # change DPDK driver list to only contain drivers that are loaded - dpdk_drivers = [mod["Name"] for mod in mods if mod["Found"]] - - -def has_driver(dev_id): - '''return true if a device is assigned to a driver. False otherwise''' - return "Driver_str" in devices[dev_id] - - -def get_pci_device_details(dev_id, probe_lspci): - '''This function gets additional details for a PCI device''' - device = {} - - if probe_lspci: - extra_info = subprocess.check_output(["lspci", "-vmmks", dev_id]).splitlines() - # parse lspci details - for line in extra_info: - if not line: - continue - name, value = line.decode("utf8").split("\t", 1) - name = name.strip(":") + "_str" - device[name] = value - # check for a unix interface name - device["Interface"] = "" - for base, dirs, _ in os.walk("/sys/bus/pci/devices/%s/" % dev_id): - if "net" in dirs: - device["Interface"] = \ - ",".join(os.listdir(os.path.join(base, "net"))) - break - # check if a port is used for ssh connection - device["Ssh_if"] = False - device["Active"] = "" - - return device - - -def clear_data(): - '''This function clears any old data''' - global devices - devices = {} - - -def get_device_details(devices_type): - '''This function populates the "devices" dictionary. The keys used are - the pci addresses (domain:bus:slot.func). The values are themselves - dictionaries - one for each NIC.''' - global devices - global dpdk_drivers - - # first loop through and read details for all devices - # request machine readable format, with numeric IDs and String - dev = {} - dev_lines = subprocess.check_output(["lspci", "-Dvmmnnk"]).splitlines() - for dev_line in dev_lines: - if not dev_line: - if device_type_match(dev, devices_type): - # Replace "Driver" with "Driver_str" to have consistency of - # of dictionary key names - if "Driver" in dev.keys(): - dev["Driver_str"] = dev.pop("Driver") - if "Module" in dev.keys(): - dev["Module_str"] = dev.pop("Module") - # use dict to make copy of dev - devices[dev["Slot"]] = dict(dev) - # Clear previous device's data - dev = {} - else: - name, value = dev_line.decode("utf8").split("\t", 1) - value_list = value.rsplit(' ', 1) - if value_list: - # String stored in _str - dev[name.rstrip(":") + '_str'] = value_list[0] - # Numeric IDs - dev[name.rstrip(":")] = value_list[len(value_list) - 1] \ - .rstrip("]").lstrip("[") - - if devices_type == network_devices: - # check what is the interface if any for an ssh connection if - # any to this host, so we can mark it later. - ssh_if = [] - route = subprocess.check_output(["ip", "-o", "route"]) - # filter out all lines for 169.254 routes - route = "\n".join(filter(lambda ln: not ln.startswith("169.254"), - route.decode().splitlines())) - rt_info = route.split() - for i in range(len(rt_info) - 1): - if rt_info[i] == "dev": - ssh_if.append(rt_info[i + 1]) - - # based on the basic info, get extended text details - for d in devices.keys(): - if not device_type_match(devices[d], devices_type): - continue - - # get additional info and add it to existing data - devices[d] = devices[d].copy() - # No need to probe lspci - devices[d].update(get_pci_device_details(d, False).items()) - - if devices_type == network_devices: - for _if in ssh_if: - if _if in devices[d]["Interface"].split(","): - devices[d]["Ssh_if"] = True - devices[d]["Active"] = "*Active*" - break - - # add igb_uio to list of supporting modules if needed - if "Module_str" in devices[d]: - for driver in dpdk_drivers: - if driver not in devices[d]["Module_str"]: - devices[d]["Module_str"] = \ - devices[d]["Module_str"] + ",%s" % driver - else: - devices[d]["Module_str"] = ",".join(dpdk_drivers) - - # make sure the driver and module strings do not have any duplicates - if has_driver(d): - modules = devices[d]["Module_str"].split(",") - if devices[d]["Driver_str"] in modules: - modules.remove(devices[d]["Driver_str"]) - devices[d]["Module_str"] = ",".join(modules) - - -def device_type_match(dev, devices_type): - for i in range(len(devices_type)): - param_count = len( - [x for x in devices_type[i].values() if x is not None]) - match_count = 0 - if dev["Class"][0:2] == devices_type[i]["Class"]: - match_count = match_count + 1 - for key in devices_type[i].keys(): - if key != 'Class' and devices_type[i][key]: - value_list = devices_type[i][key].split(',') - for value in value_list: - if value.strip(' ') == dev[key]: - match_count = match_count + 1 - # count must be the number of non None parameters to match - if match_count == param_count: - return True - return False - - -def dev_id_from_dev_name(dev_name): - '''Take a device "name" - a string passed in by user to identify a NIC - device, and determine the device id - i.e. the domain:bus:slot.func - for - it, which can then be used to index into the devices array''' - - # check if it's already a suitable index - if dev_name in devices: - return dev_name - # check if it's an index just missing the domain part - if "0000:" + dev_name in devices: - return "0000:" + dev_name - - # check if it's an interface name, e.g. eth1 - for d in devices.keys(): - if dev_name in devices[d]["Interface"].split(","): - return devices[d]["Slot"] - # if nothing else matches - error - raise ValueError("Unknown device: %s. " - "Please specify device in \"bus:slot.func\" format" % dev_name) - - -def unbind_one(dev_id, force): - '''Unbind the device identified by "dev_id" from its current driver''' - dev = devices[dev_id] - if not has_driver(dev_id): - print("Notice: %s %s %s is not currently managed by any driver" % - (dev["Slot"], dev["Device_str"], dev["Interface"]), file=sys.stderr) - return - - # prevent us disconnecting ourselves - if dev["Ssh_if"] and not force: - print("Warning: routing table indicates that interface %s is active. " - "Skipping unbind" % dev_id, file=sys.stderr) - return - - # write to /sys to unbind - filename = "/sys/bus/pci/drivers/%s/unbind" % dev["Driver_str"] +import sys +import typing as T + +# the following list of modules is supported by DPDK +DPDK_KERNEL_MODULES = {"igb_uio", "vfio-pci", "uio_pci_generic"} + +# pattern matching criteria for various devices and devices classes. keys are entries in lspci, +# while values, if present are further matches for lspci criteria. values can be either strings or +# list of strings, in which case any match is sufficient. +StrOrList = T.Union[str, T.List[str]] +DeviceMatchPattern = T.Dict[str, StrOrList] +CLASS_NETWORK: DeviceMatchPattern = { + "Class": "02", +} +CLASS_ACCELERATION: DeviceMatchPattern = { + "Class": "12", +} +CLASS_IFPGA: DeviceMatchPattern = { + "Class": "12", + "Vendor": "8086", + "Device": "0b30", +} +CLASS_ENCRYPTION: DeviceMatchPattern = { + "Class": "10", +} +CLASS_INTEL_PROCESSOR: DeviceMatchPattern = { + "Class": "0b", + "Vendor": "8086", +} +DEVICE_CAVIUM_SSO: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a04b", "a04d"], +} +DEVICE_CAVIUM_FPA: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a053", +} +DEVICE_CAVIUM_PKX: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a0dd", "a049"], +} +DEVICE_CAVIUM_TIM: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a051", +} +DEVICE_CAVIUM_ZIP: DeviceMatchPattern = { + "Class": "12", + "Vendor": "177d", + "Device": "a037", +} +DEVICE_AVP_VNIC: DeviceMatchPattern = { + "Class": "05", + "Vendor": "1af4", + "Device": "1110", +} +DEVICE_CNXK_BPHY: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a089", +} +DEVICE_CNXK_BPHY_CGX: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a059", "a060"], +} +DEVICE_CNXK_DMA: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a081", +} +DEVICE_CNXK_INL_DEV: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a0f0", "a0f1"], +} +DEVICE_HISILICON_DMA: DeviceMatchPattern = { + "Class": "08", + "Vendor": "19e5", + "Device": "a122", +} +DEVICE_ODM_DMA: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a08c", +} +DEVICE_INTEL_DLB: DeviceMatchPattern = { + "Class": "0b", + "Vendor": "8086", + "Device": ["270b", "2710", "2714"], +} +DEVICE_INTEL_IOAT_BDW: DeviceMatchPattern = { + "Class": "08", + "Vendor": "8086", + "Device": [ + "6f20", + "6f21", + "6f22", + "6f23", + "6f24", + "6f25", + "6f26", + "6f27", + "6f2e", + "6f2f", + ], +} +DEVICE_INTEL_IOAT_SKX: DeviceMatchPattern = { + "Class": "08", + "Vendor": "8086", + "Device": "2021", +} +DEVICE_INTEL_IOAT_ICX: DeviceMatchPattern = { + "Class": "08", + "Vendor": "8086", + "Device": "0b00", +} +DEVICE_INTEL_IDXD_SPR: DeviceMatchPattern = { + "Class": "08", + "Vendor": "8086", + "Device": "0b25", +} +DEVICE_INTEL_NTB_SKX: DeviceMatchPattern = { + "Class": "06", + "Vendor": "8086", + "Device": "201c", +} +DEVICE_INTEL_NTB_ICX: DeviceMatchPattern = { + "Class": "06", + "Vendor": "8086", + "Device": "347e", +} +DEVICE_CNXK_SSO: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a0f9", "a0fa"], +} +DEVICE_CNXK_NPA: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": ["a0fb", "a0fc"], +} +DEVICE_CN9K_REE: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a0f4", +} +DEVICE_VIRTIO_BLK: DeviceMatchPattern = { + "Class": "01", + "Vendor": "1af4", + "Device": ["1001", "1042"], +} +DEVICE_CNXK_ML: DeviceMatchPattern = { + "Class": "08", + "Vendor": "177d", + "Device": "a092", +} + +# device types as recognized by devbind +NETWORK_DEVICES = [CLASS_NETWORK, CLASS_IFPGA, DEVICE_CAVIUM_PKX, DEVICE_AVP_VNIC] +BASEDBAND_DEVICES = [CLASS_ACCELERATION] +CRYPTO_DEVICES = [CLASS_ENCRYPTION, CLASS_INTEL_PROCESSOR] +DMA_DEVICES = [ + DEVICE_CNXK_DMA, + DEVICE_HISILICON_DMA, + DEVICE_INTEL_IDXD_SPR, + DEVICE_INTEL_IOAT_BDW, + DEVICE_INTEL_IOAT_ICX, + DEVICE_INTEL_IOAT_SKX, + DEVICE_ODM_DMA, +] +EVENTDEV_DEVICES = [ + DEVICE_CAVIUM_SSO, + DEVICE_CAVIUM_TIM, + DEVICE_INTEL_DLB, + DEVICE_CNXK_SSO, +] +MEMPOOL_DEVICES = [DEVICE_CAVIUM_FPA, DEVICE_CNXK_NPA] +COMPRESS_DEVICES = [DEVICE_CAVIUM_ZIP] +REGEX_DEVICES = [DEVICE_CN9K_REE] +ML_DEVICES = [DEVICE_CNXK_ML] +MISC_DEVICES = [ + DEVICE_CNXK_BPHY, + DEVICE_CNXK_BPHY_CGX, + DEVICE_CNXK_INL_DEV, + DEVICE_INTEL_NTB_SKX, + DEVICE_INTEL_NTB_ICX, + DEVICE_VIRTIO_BLK, +] + + +class DevbindError(Exception): + """Generic error to be displayed by devbind.""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message + + def __str__(self) -> str: + return self.message + + +def category_key_match(key: str, value: str, pattern: StrOrList) -> bool: + """Check if value matches the pattern according to key match rules.""" + # if pattern is a list of strings, recurse and check each item + if isinstance(pattern, list): + return any( + category_key_match(key, value, pattern_item) for pattern_item in pattern + ) + # pattern is a single string, use single string match rules + if key == "Class": + # special case for Class: it has to match from the beginning + return value.startswith(pattern) + # default case: exact match + return value == pattern + + +def parse_lspci_line(line: str) -> T.Dict[str, str]: + """Parse lspci line and return a dictionary.""" + # the format can be either: + # key: value + # or + # key: string representation [value] + # we want to store both because we may want to display both + res: T.Dict[str, str] = {} + name, value = line.split("\t", 1) + name = name.strip().rstrip(":") + value = value.strip() + # does this value have string representation? + value_list = value.rsplit(" ", 1) + if len(value_list) > 1: + value_str, value = value_list + # store string representation + res[name + "_str"] = value_str + # strip out brackets + value = value.strip("[]") + res[name] = value + + return res + + +def fmt_key_val(name: str, value: str) -> str: + """Generate a devbind device printout string for a particular value.""" + # if there's a name provided, include it in the output + if name and value: + return f"{name}={value}" + # otherwise just print the value + return value + + +def resolve_pci_glob(dev: str) -> T.List[str]: + """Returns a list of PCI devices matching a glob pattern.""" + pci_sysfs_path = "/sys/bus/pci/devices" + for _glob in [dev, "0000:" + dev]: + paths = [ + os.path.basename(path) + for path in glob.glob(os.path.join(pci_sysfs_path, _glob)) + ] + if paths: + return paths + return [dev] + + +def check_installed(program: str, package: str) -> None: + """Check if a program is installed.""" + if subprocess.call( + ["which", program], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ): + raise DevbindError(f"'{program}' not found - please install '{package}'.") + + +def read_output_lines(args: T.List[str]) -> T.List[str]: + """Run a subprocess, collect its output, and return it as a list of lines.""" try: - f = open(filename, "a") - except OSError as err: - sys.exit("Error: unbind failed for %s - Cannot open %s: %s" % - (dev_id, filename, err)) - f.write(dev_id) - f.close() + output = subprocess.check_output(args).decode("utf-8") + except subprocess.CalledProcessError as e: + raise DevbindError(f"Error running '{' '.join(args)}': {e}") from e + return output.splitlines() -def bind_one(dev_id, driver, force): - '''Bind the device given by "dev_id" to the driver "driver". If the device - is already bound to a different driver, it will be unbound first''' - dev = devices[dev_id] - saved_driver = None # used to rollback any unbind in case of failure +def read_routed_interfaces() -> T.List[str]: + """Find interfaces with active routes.""" + route_info = [ + token.strip() + # read 'ip -o route' output + for line in read_output_lines(["ip", "-o", "route"]) + # skip routes that are not of interest to us + if not line.startswith("169.254") + # split into individual tokens + for token in line.split() + ] + routed_ifs: T.List[str] = [] + # find devices with active routes + for node, iface in zip(route_info, route_info[1:]): + if node == "dev": + routed_ifs.append(iface) + return routed_ifs - # prevent disconnection of our ssh session - if dev["Ssh_if"] and not force: - print("Warning: routing table indicates that interface %s is active. " - "Not modifying" % dev_id, file=sys.stderr) - return - # unbind any existing drivers we don't want - if has_driver(dev_id): - if dev["Driver_str"] == driver: - print("Notice: %s already bound to driver %s, skipping" % - (dev_id, driver), file=sys.stderr) - return - saved_driver = dev["Driver_str"] - unbind_one(dev_id, force) - dev["Driver_str"] = "" # clear driver string +def sysfs_read_pci_drivers() -> T.List[str]: + """Gather all PCI modules loaded on the system.""" + return os.listdir("/sys/bus/pci/drivers") - # For kernels >= 3.15 driver_override can be used to specify the driver - # for a device rather than relying on the driver to provide a positive - # match of the device. The existing process of looking up - # the vendor and device ID, adding them to the driver new_id, - # will erroneously bind other devices too which has the additional burden - # of unbinding those devices - if driver in dpdk_drivers: - filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id - if exists(filename): - try: - f = open(filename, "w") - except OSError as err: - print("Error: bind failed for %s - Cannot open %s: %s" - % (dev_id, filename, err), file=sys.stderr) - return - try: - f.write("%s" % driver) - f.close() - except OSError as err: - print("Error: bind failed for %s - Cannot write driver %s to " - "PCI ID: %s" % (dev_id, driver, err), file=sys.stderr) - return - # For kernels < 3.15 use new_id to add PCI id's to the driver - else: - filename = "/sys/bus/pci/drivers/%s/new_id" % driver - try: - f = open(filename, "w") - except OSError as err: - print("Error: bind failed for %s - Cannot open %s: %s" - % (dev_id, filename, err), file=sys.stderr) - return - try: - # Convert Device and Vendor Id to int to write to new_id - f.write("%04x %04x" % (int(dev["Vendor"], 16), - int(dev["Device"], 16))) - f.close() - except OSError as err: - print("Error: bind failed for %s - Cannot write new PCI ID to " - "driver %s: %s" % (dev_id, driver, err), file=sys.stderr) - return - - # do the bind by writing to /sys - filename = "/sys/bus/pci/drivers/%s/bind" % driver - try: - f = open(filename, "a") - except OSError as err: - print("Error: bind failed for %s - Cannot open %s: %s" - % (dev_id, filename, err), file=sys.stderr) - if saved_driver is not None: # restore any previous driver - bind_one(dev_id, saved_driver, force) - return - try: - f.write(dev_id) - f.close() - except OSError as err: - # for some reason, closing dev_id after adding a new PCI ID to new_id - # results in IOError. however, if the device was successfully bound, - # we don't care for any errors and can safely ignore IOError - tmp = get_pci_device_details(dev_id, True) - if "Driver_str" in tmp and tmp["Driver_str"] == driver: - return - print("Error: bind failed for %s - Cannot bind to driver %s: %s" - % (dev_id, driver, err), file=sys.stderr) - if saved_driver is not None: # restore any previous driver - bind_one(dev_id, saved_driver, force) - return - - # For kernels > 3.15 driver_override is used to bind a device to a driver. - # Before unbinding it, overwrite driver_override with empty string so that - # the device can be bound to any other driver - filename = "/sys/bus/pci/devices/%s/driver_override" % dev_id - if exists(filename): - try: - f = open(filename, "w") - except OSError as err: - sys.exit("Error: unbind failed for %s - Cannot open %s: %s" - % (dev_id, filename, err)) - try: - f.write("\00") - f.close() - except OSError as err: - sys.exit("Error: unbind failed for %s - Cannot write %s: %s" - % (dev_id, filename, err)) +def sysfs_device_get_path(dev: str, path: str) -> str: + """Construct path in device sysfs directory.""" + return os.path.join("/sys/bus/pci/devices", dev, path) -def unbind_all(dev_list, force=False): - """Unbind method, takes a list of device locations""" - - if dev_list[0] == "dpdk": - for d in devices.keys(): - if "Driver_str" in devices[d]: - if devices[d]["Driver_str"] in dpdk_drivers: - unbind_one(devices[d]["Slot"], force) - return - - try: - dev_list = map(dev_id_from_dev_name, dev_list) - except ValueError as ex: - print(ex) - sys.exit(1) - for d in dev_list: - unbind_one(d, force) +def sysfs_driver_get_path(driver: str, path: str) -> str: + """Construct path in driver sysfs directory.""" + return os.path.join("/sys/bus/pci/drivers", driver, path) -def has_iommu(): - """Check if IOMMU is enabled on system""" +def sysfs_iommu_enabled() -> bool: + """Check if IOMMU is enabled on the system.""" return len(os.listdir("/sys/class/iommu")) > 0 -def check_noiommu_mode(): - """Check and enable the noiommu mode for VFIO drivers""" - global noiommu_flag - filename = "/sys/module/vfio/parameters/enable_unsafe_noiommu_mode" - +def sysfs_enable_unsafe_noiommu() -> None: + """Enable unsafe no-IOMMU mode.""" + fname = "/sys/module/vfio/parameters/enable_unsafe_noiommu_mode" try: - with open(filename, "r") as f: - value = f.read(1) - if value in ("1", "y" ,"Y"): - return - except OSError as err: - sys.exit(f"Error: failed to check unsafe noiommu mode - Cannot open {filename}: {err}") - - if not noiommu_flag: - sys.exit("Error: IOMMU support is disabled, use --noiommu-mode for binding in noiommu mode") - + with open(fname, "r", encoding="utf-8") as f: + val = f.read() + if val in ["1", "Y", "y"]: + # already enabled + return + except OSError as e: + raise DevbindError(f"Cannot read unsafe no IOMMU mode status: {e}") from e try: - with open(filename, "w") as f: + with open(fname, "w", encoding="utf-8") as f: f.write("1") - except OSError as err: - sys.exit(f"Error: failed to enable unsafe noiommu mode - Cannot open {filename}: {err}") - print("Warning: enabling unsafe no IOMMU mode for VFIO drivers") + except OSError as e: + raise DevbindError(f"Cannot write unsafe no IOMMU mode status: {e}") from e + print( + "Warning: IOMMU is not enabled, enabling unsafe no-IOMMU mode for VFIO drivers." + ) -def bind_all(dev_list, driver, force=False): - """Bind method, takes a list of device locations""" - global devices +def sysfs_get_vfio_device(dev: str) -> str: + """Get VFIO device file for a PCI device.""" + iommu_grp_base_path = sysfs_device_get_path(dev, "iommu_group") + # extract group number from base path + iommu_grp = os.path.basename(os.readlink(iommu_grp_base_path)) + # find VFIO device corresponding to this IOMMU group + return os.path.join("/dev/vfio", iommu_grp) - # a common user error is to forget to specify the driver the devices need to - # be bound to. check if the driver is a valid device, and if it is, show - # a meaningful error. + +def device_vfio_set_ownership(dev: str, uid: int, gid: int) -> None: + """Set device ownership.""" + try: + os.chown(sysfs_get_vfio_device(dev), uid, gid) + except OSError as e: + raise DevbindError(f"Failed to set device ownership for {dev}: {e}") from e + + +class Device: + """Thin wrapper around a device dict read from lspci.""" + + def __init__(self, dev_dict: T.Dict[str, str]): + self._dev_dict = dev_dict + self.slot = self._dev_dict["Slot"] + + # find kernel interfaces for this device + self._update_interfaces() + + def __str__(self) -> str: + return self.slot + + def _set_value(self, key: str, value: StrOrList): + """Generic setter for different fields.""" + if value: + # value can be a list or a string + if isinstance(value, list): + self._dev_dict[key] = ",".join(value) + else: + self._dev_dict[key] = value + elif key in self._dev_dict: + # delete key if value is empty + del self._dev_dict[key] + + def _get_str(self, key: str) -> str: + """Generic getter for string fields.""" + if key in self._dev_dict: + return self._dev_dict[key] + return "" + + def _get_list(self, key: str) -> T.List[str]: + """Generic getter for list fields.""" + if key in self._dev_dict: + return [m.strip() for m in self._dev_dict[key].split(",")] + return [] + + def _update_interfaces(self): + """Update interfaces for this device.""" + sysfs_path = sysfs_device_get_path(self.slot, "net") + try: + self.interfaces = os.listdir(sysfs_path) + except OSError: + pass + + def update(self): + """Update device information from lspci.""" + lspci_output = read_output_lines(["lspci", "-vmmnnks", self.slot]) + for line in lspci_output: + if not line: + continue + self._dev_dict.update(parse_lspci_line(line)) + self._update_interfaces() + + def get_str(self, key: str) -> str: + """Get string value as it appears in the device dictionary.""" + return self._get_str(key) + + def match(self, pattern: DeviceMatchPattern) -> bool: + """Check if this device matches the pattern.""" + for key, match_pattern in pattern.items(): + if key not in self._dev_dict: + return False + value = self._dev_dict[key] + if not category_key_match(key, value, match_pattern): + return False + return True + + @property + def driver(self) -> str: + """Get driver bound for this device.""" + return self._get_str("Driver") + + @driver.setter + def driver(self, driver: str): + """Set driver for this device.""" + self._set_value("Driver", driver) + + @property + def modules(self) -> T.List[str]: + """Get compatible modules for this device.""" + return self._get_list("Module") + + @modules.setter + def modules(self, modules: T.List[str]): + """Set compatible modules for this device.""" + self._set_value("Module", modules) + + @property + def interfaces(self) -> T.List[str]: + """Get interfaces for this device.""" + return self._get_list("Interface") + + @interfaces.setter + def interfaces(self, interfaces: T.List[str]): + """Set interfaces for this device.""" + self._set_value("Interface", interfaces) + + @property + def active_interface(self) -> bool: + """Return active interface information.""" + return bool(self._get_str("Active")) + + @active_interface.setter + def active_interface(self, active: bool): + """Set active interface information.""" + self._set_value("Active", "*Active*" if active else "") + + +def read_devices_from_lspci() -> T.Iterable[Device]: + """Read devices from lspci.""" + lspci_output = read_output_lines(["lspci", "-Dvmmnnk"]) + cur_device: T.Dict[str, str] = {} + for line in lspci_output: + if not line: + dev = Device(cur_device) + yield dev + cur_device = {} + else: + cur_device.update(parse_lspci_line(line)) + + +class Devbind(object): + """Class to cover various devbind-related operations and data.""" + + def __init__(self) -> None: + self.uio_modules: T.List[str] + # all detected PCI devices, keyed by PCI D:B:D.F + self.pci_devices: T.Dict[str, Device] + # matched categories of devices + self.network_devices: T.List[Device] = [] + self.baseband_devices: T.List[Device] = [] + self.crypto_devices: T.List[Device] = [] + self.dma_devices: T.List[Device] = [] + self.eventdev_devices: T.List[Device] = [] + self.mempool_devices: T.List[Device] = [] + self.compress_devices: T.List[Device] = [] + self.regex_devices: T.List[Device] = [] + self.ml_devices: T.List[Device] = [] + self.misc_devices: T.List[Device] = [] + # whether to use driver_override + self.use_driver_override: bool + # gather all loaded kernel modules + self.loaded_pci_modules = sysfs_read_pci_drivers() + # find which ones are UIO modules + self._scan_uio_modules() + # gather all routed interfaces + self._routed_interfaces = read_routed_interfaces() + # initialize data + self._scan_pci_devices() + + def _categorize_pci_device(self, dev: Device) -> None: + pattern_to_list = [ + (NETWORK_DEVICES, self.network_devices), + (BASEDBAND_DEVICES, self.baseband_devices), + (CRYPTO_DEVICES, self.crypto_devices), + (DMA_DEVICES, self.dma_devices), + (EVENTDEV_DEVICES, self.eventdev_devices), + (MEMPOOL_DEVICES, self.mempool_devices), + (COMPRESS_DEVICES, self.compress_devices), + (REGEX_DEVICES, self.regex_devices), + (ML_DEVICES, self.ml_devices), + (MISC_DEVICES, self.misc_devices), + ] + for patterns, devices in pattern_to_list: + for pattern in patterns: + if dev.match(pattern): + devices.append(dev) + break + # special case: if this was a network device, find if any interfaces are active. non-network + # interfaces will not have any interfaces at all, so it's safe to check all devices + if any(iface in self._routed_interfaces for iface in dev.interfaces): + dev.active_interface = True + + def _scan_uio_modules(self) -> None: + loaded = set(self.loaded_pci_modules) + supported = set(DPDK_KERNEL_MODULES) + self.uio_modules = list(loaded & supported) + + def add_uio_modules(self, dev: Device) -> None: + """Add loaded UIO modules to list of available modules.""" + # add UIO modules to list of supported modules + modules = set(dev.modules + self.uio_modules) + + # make sure driver and module string do not have any duplicates + if dev.driver in modules: + modules.remove(dev.driver) + + # update list of compatible modules + dev.modules = list(modules) + + def _scan_pci_devices(self) -> None: + """Find all devices on the system.""" + self.pci_devices = {} + + for dev in read_devices_from_lspci(): + # categorize device + self._categorize_pci_device(dev) + + # fixup module and driver fields + self.add_uio_modules(dev) + + # save the device in common list + self.pci_devices[dev.slot] = dev + + # do we use driver_override? + self.use_driver_override = os.path.exists( + sysfs_device_get_path(dev.slot, "driver_override") + ) + + def resolve_device(self, devstr: str) -> str: + """Try to resolve a device into a PCI D:B:D:F.""" + if not devstr: + raise ValueError( + f"Unknown device '{devstr}'. Please specify device in 'bus:slot.func' format." + ) + # is this already a valid device? + if devstr in self.pci_devices: + return devstr + # can we append domain to it? + if "0000:" + devstr in self.pci_devices: + return "0000:" + devstr + # can we find a network interface name? + for dev in self.network_devices: + if devstr in dev.interfaces: + return dev.slot + # we can't figure out what this is + raise ValueError( + f"Unknown device '{devstr}'. Please specify device in 'bus:slot.func' format." + ) + + def unbind(self, dbdf: str, force: bool) -> None: + """Unbind one device from its current driver.""" + dev = self.pci_devices[dbdf] + + # are we allowed to unbind this device? + if dev.active_interface and not force: + print( + f"Warning: routing table indicates that interface {dev} is active. " + "Not modifying.", + file=sys.stderr, + ) + return + + # is this device bound to anything? + cur_drv = dev.driver + if not cur_drv: + print( + f"Notice: {dev} is not currently managed by any driver, skipping", + file=sys.stderr, + ) + return + + # we're OK, unbind + print(f"Unbinding {dev} from {cur_drv}...") + + try: + unbind_path = sysfs_driver_get_path(cur_drv, "unbind") + with open(unbind_path, "w", encoding="utf-8") as f: + f.write(dbdf) + except OSError as e: + raise DevbindError(f"Unbind failed for {dev}: {e}") from e + # update device state + dev.update() + self.add_uio_modules(dev) + + def bind(self, dbdf: str, driver: str, force: bool) -> None: + """Bind one device to the specified driver.""" + dev = self.pci_devices[dbdf] + rollback_driver = "" + override_path = sysfs_device_get_path(dev.slot, "driver_override") + new_id_path = sysfs_driver_get_path(driver, "new_id") + bind_path = sysfs_driver_get_path(driver, "bind") + + # are we allowed to bind this device? + if dev.active_interface and not force: + print( + f"Warning: routing table indicates that interface {dev} is active. " + "Not modifying.", + file=sys.stderr, + ) + return + + # check if device is bound to anything + rollback_driver = dev.driver + if driver == rollback_driver: + print( + f"Notice: {dev} is already bound to driver {driver}, skipping", + file=sys.stderr, + ) + return + self.unbind(dbdf, force) + + print(f"Binding {dev} to {driver}...") + + # For kernels >= 3.15 driver_override can be used to specify the driver for a device rather + # than relying on the driver to provide a positive match of the device. The existing + # process of looking up the vendor and device ID, adding them to the driver new_id, will + # erroneously bind other devices too which has the additional burden of unbinding those + # devices. + + # are we binding to UIO module? + if driver in self.uio_modules: + try: + if self.use_driver_override: + with open(override_path, "w", encoding="utf-8") as f: + f.write(driver) + else: + # For kernels < 3.15 use new_id to add PCI id's to the driver + with open(new_id_path, "w", encoding="utf-8") as f: + ven_id = int(dev.get_str("Vendor"), 16) + dev_id = int(dev.get_str("Device"), 16) + f.write(f"{ven_id:04x} {dev_id:04x}") + except OSError as e: + raise DevbindError(f"Bind failed for {dev}: {e}") from e + + try: + with open(bind_path, "a", encoding="utf-8") as f: + f.write(dbdf) + dev.update() + self.add_uio_modules(dev) + except OSError as e: + # for some reason, closing fd after adding a new PCI ID to new_id results in IOError on + # some kernel versions. however, if the device was successfully bound, we don't care + # for any errors and can safely proceed. + + # update device from lspci and fixup modules + dev.update() + self.add_uio_modules(dev) + + # check if the correct driver is bound + cur_drv = dev.driver + + if cur_drv == driver: + # we're OK, ignore error and don't rollback + rollback_driver = "" + elif cur_drv: + # bound to wrong driver + print( + f"Error: bind failed for {dev}: bound to unexpected driver {cur_drv}", + file=sys.stderr, + ) + else: + # bind failed + print(f"Error: bind failed for {dev}: {e}", file=sys.stderr) + if rollback_driver: + self.bind(dbdf, rollback_driver, force) + + # For kernels > 3.15 driver_override is used to bind a device to a driver. Before unbinding + # it, overwrite driver_override with empty string so that the device can be bound to any + # other driver + if self.use_driver_override: + try: + with open(override_path, "w", encoding="utf-8") as f: + f.write("\00") + except OSError as e: + raise DevbindError(f"Bind failed for {dev}: {e}") from e + + +class DevbindCtx: + """POD class to keep commmand-line arguments and context.""" + + def __init__(self) -> None: + self.status = False + self.bind = False + self.status_group: str + self.driver: str + self.devices: T.List[str] + self.force: bool + self.noiommu: bool + self.vfio_uid: int + self.vfio_gid: int + + self.devbind: Devbind + + +def bind_devices(ctx: DevbindCtx) -> None: + """Bind devices to the specified driver.""" + devbind = ctx.devbind + use_vfio = ctx.driver == "vfio-pci" + + # a common user error is to forget to specify the driver the devices need to be bound to. check + # if the driver is a valid device, and if it is, show a meaningful error. try: - dev_id_from_dev_name(driver) - # if we've made it this far, this means that the "driver" was a valid - # device string, so it's probably not a valid driver name. - sys.exit("Error: Driver '%s' does not look like a valid driver. " - "Did you forget to specify the driver to bind devices to?" % driver) + devbind.resolve_device(ctx.driver) + # if we got here, the driver is a valid device, which is an error + raise DevbindError(f"""\ +Driver '{ctx.driver}' does not look like a valid driver. Did you +forget to specify the driver to bind the devices to?""") except ValueError: - # driver generated error - it's not a valid device ID, so all is well + # driver generated error - it's not a valid device pass - # check if we're attempting to bind to a driver that isn't loaded - if not module_is_loaded(driver.replace('-', '_')): - sys.exit("Error: Driver '%s' is not loaded." % driver) - + # validate all devices try: - dev_list = map(dev_id_from_dev_name, dev_list) - except ValueError as ex: - sys.exit(ex) + ctx.devices = [devbind.resolve_device(dev) for dev in ctx.devices] + except ValueError as e: + raise DevbindError(str(e)) from e + + # do we want to bind or unbind? + if not ctx.driver: + # unbind devices + for dbdf in ctx.devices: + devbind.unbind(dbdf, ctx.force) + return + + # validate driver + if ctx.driver not in devbind.loaded_pci_modules: + raise DevbindError(f"Driver '{ctx.driver}' is not loaded.") # check for IOMMU support - if driver == "vfio-pci" and not has_iommu(): - check_noiommu_mode() + if use_vfio and not sysfs_iommu_enabled(): + sysfs_enable_unsafe_noiommu() - for d in dev_list: - bind_one(d, driver, force) + # bind all devices + for dbdf in ctx.devices: + devbind.bind(dbdf, ctx.driver, ctx.force) + # if we're binding to vfio-pci, set IOMMU user/group ownership if one was specified + if use_vfio and (ctx.vfio_uid != -1 or ctx.vfio_gid != -1): + device_vfio_set_ownership(dbdf, ctx.vfio_uid, ctx.vfio_gid) - # For kernels < 3.15 when binding devices to a generic driver - # (i.e. one that doesn't have a PCI ID table) using new_id, some devices - # that are not bound to any other driver could be bound even if no one has - # asked them to. hence, we check the list of drivers again, and see if - # some of the previously-unbound devices were erroneously bound. - if not exists("/sys/bus/pci/devices/%s/driver_override" % d): - for d in devices.keys(): + # For kernels < 3.15 when binding devices to a generic driver (i.e. one that doesn't have a PCI + # ID table) using new_id, some devices that are not bound to any other driver could be bound + # even if no one has asked them to. hence, we check the list of drivers again, and see if some + # of the previously-unbound devices were erroneously bound. + if not devbind.use_driver_override: + for dbdf, dev in devbind.pci_devices.items(): # skip devices that were already bound or that we know should be bound - if "Driver_str" in devices[d] or d in dev_list: + if dev.driver or dbdf in ctx.devices: continue - - # update information about this device - devices[d] = dict(devices[d].items() - + get_pci_device_details(d, True).items()) - + dev.update() + devbind.add_uio_modules(dev) # check if updated information indicates that the device was bound - if "Driver_str" in devices[d]: - unbind_one(d, force) + if dev.driver: + devbind.unbind(dbdf, ctx.force) -def display_devices(title, dev_list, extra_params=None): - '''Displays to the user the details of a list of devices given in - "dev_list". The "extra_params" parameter, if given, should contain a string - with %()s fields in it for replacement by the named fields in each - device's dictionary.''' - strings = [] # this holds the strings to print. We sort before printing - print("\n%s" % title) - print("=" * len(title)) - if not dev_list: - strings.append("") - else: - for dev in dev_list: - if extra_params is not None: - strings.append("%s '%s %s' %s" % (dev["Slot"], - dev["Device_str"], - dev["Device"], - extra_params % dev)) - else: - strings.append("%s '%s'" % (dev["Slot"], dev["Device_str"])) - # sort before printing, so that the entries appear in PCI order +def print_status_section(title: str, section_devs: T.List[Device]) -> None: + """Prints subsection of device status (e.g. only kernel devices).""" + # we will sort strings before printing + strings: T.List[str] = [] + + # generate device strings + for dev in section_devs: + # construct strings + devstr = f'{dev.get_str("Device_str")} {dev.get_str("Device")}' + strs = [ + dev.slot, + f"'{devstr}'", + fmt_key_val("drv", dev.get_str("Driver")), + fmt_key_val("unused", dev.get_str("Module")), + fmt_key_val("if", dev.get_str("Interface")), + fmt_key_val("numa_node", dev.get_str("NUMANode")), + fmt_key_val("", dev.get_str("Active")), + ] + # filter out empty strings and join + strings.append(" ".join(filter(None, strs))) strings.sort() - print("\n".join(strings)) # print one per line - - -def show_device_status(devices_type, device_name, if_field=False): - global dpdk_drivers - kernel_drv = [] - dpdk_drv = [] - no_drv = [] - - # split our list of network devices into the three categories above - for d in devices.keys(): - if device_type_match(devices[d], devices_type): - if not has_driver(d): - no_drv.append(devices[d]) - continue - if devices[d]["Driver_str"] in dpdk_drivers: - dpdk_drv.append(devices[d]) - else: - kernel_drv.append(devices[d]) - - n_devs = len(dpdk_drv) + len(kernel_drv) + len(no_drv) - - # don't bother displaying anything if there are no devices - if n_devs == 0: - msg = "No '%s' devices detected" % device_name - print("") + print(f"{title}") + print("=" * len(title)) + print("\n".join(strings)) + print() + + +def print_status_group( + ctx: DevbindCtx, group_name: str, group_devs: T.List[Device] +) -> None: + """Print status for a specific device group.""" + # do we have any devices at all? + if not group_devs: + msg = f"No {group_name} devices found." print(msg) - print("".join('=' * len(msg))) + print("=" * len(msg)) + print() return - print_numa = is_numa() - - # print each category separately, so we can clearly see what's used by DPDK - if dpdk_drv: - extra_param = "drv=%(Driver_str)s unused=%(Module_str)s" - if print_numa: - extra_param = "numa_node=%(NUMANode)s " + extra_param - display_devices("%s devices using DPDK-compatible driver" % device_name, - dpdk_drv, extra_param) - if kernel_drv: - extra_param = "drv=%(Driver_str)s unused=%(Module_str)s" - if if_field: - extra_param = "if=%(Interface)s " + extra_param - if print_numa: - extra_param = "numa_node=%(NUMANode)s " + extra_param - display_devices("%s devices using kernel driver" % device_name, - kernel_drv, extra_param) - if no_drv: - extra_param = "unused=%(Module_str)s" - if print_numa: - extra_param = "numa_node=%(NUMANode)s " + extra_param - display_devices("Other %s devices" % device_name, no_drv, extra_param) - - -def show_status(): - '''Function called when the script is passed the "--status" option. - Displays to the user what devices are bound to the igb_uio driver, the - kernel driver or to no driver''' - - if status_dev in ["net", "all"]: - show_device_status(network_devices, "Network", if_field=True) - - if status_dev in ["baseband", "all"]: - show_device_status(baseband_devices, "Baseband") - - if status_dev in ["crypto", "all"]: - show_device_status(crypto_devices, "Crypto") - - if status_dev in ["dma", "all"]: - show_device_status(dma_devices, "DMA") - - if status_dev in ["event", "all"]: - show_device_status(eventdev_devices, "Eventdev") - - if status_dev in ["mempool", "all"]: - show_device_status(mempool_devices, "Mempool") - - if status_dev in ["compress", "all"]: - show_device_status(compress_devices, "Compress") - - if status_dev in ["misc", "all"]: - show_device_status(misc_devices, "Misc (rawdev)") - - if status_dev in ["regex", "all"]: - show_device_status(regex_devices, "Regex") - - if status_dev in ["ml", "all"]: - show_device_status(ml_devices, "ML") - - -def pci_glob(arg): - '''Returns a list containing either: - * List of PCI B:D:F matching arg, using shell wildcards e.g. 80:04.* - * Only the passed arg if matching list is empty''' - sysfs_path = "/sys/bus/pci/devices" - for _glob in [arg, '0000:' + arg]: - paths = [basename(path) for path in glob(path_join(sysfs_path, _glob))] - if paths: - return paths - return [arg] - - -def parse_args(): - '''Parses the command-line arguments given by the user and takes the - appropriate action for each''' - global b_flag - global status_flag - global status_dev - global force_flag - global noiommu_flag - global args + devbind = ctx.devbind + + # split out all devices into three groups: kernel, non-kernel, and unbound + kernel: T.List[Device] = [] + dpdk: T.List[Device] = [] + unbound: T.List[Device] = [] + + for dev in group_devs: + driver = dev.driver + if driver in devbind.uio_modules: + dpdk.append(dev) + elif driver: + kernel.append(dev) + else: + unbound.append(dev) + + # print out each group + if dpdk: + print_status_section(f"{group_name} devices using DPDK-compatible driver", dpdk) + if kernel: + print_status_section(f"{group_name} devices using kernel driver", kernel) + if unbound: + print_status_section(f"Other {group_name} devices", unbound) + + +def print_status(ctx: DevbindCtx) -> None: + """Print status of all devices.""" + status_groups = [ + ("Network", "net", ctx.devbind.network_devices), + ("Baseband", "baseband", ctx.devbind.baseband_devices), + ("Crypto", "crypto", ctx.devbind.crypto_devices), + ("DMA", "dma", ctx.devbind.dma_devices), + ("Eventdev", "event", ctx.devbind.eventdev_devices), + ("Mempool", "mempool", ctx.devbind.mempool_devices), + ("Compress", "compress", ctx.devbind.compress_devices), + ("Misc (rawdev)", "misc", ctx.devbind.misc_devices), + ("Regex", "regex", ctx.devbind.regex_devices), + ("ML", "ml", ctx.devbind.ml_devices), + ] + for group_name, group_type, group_devs in status_groups: + if ctx.status_group in ["all", group_type]: + print_status_group(ctx, group_name, group_devs) + + +def parse_args() -> DevbindCtx: + """Parse command-line arguments into devbind context.""" parser = argparse.ArgumentParser( - description='Utility to bind and unbind devices from Linux kernel', + description="Utility to bind and unbind devices from Linux kernel", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -709,7 +902,7 @@ def parse_args(): %(prog)s --status To display current network device status: - %(prog)s --status-dev net + %(prog)s --status net To bind eth1 from the current driver and move to use vfio-pci %(prog)s --bind=vfio-pci eth1 @@ -719,137 +912,144 @@ def parse_args(): To bind 0000:02:00.0 and 0000:02:00.1 to the ixgbe kernel driver %(prog)s -b ixgbe 02:00.0 02:00.1 -""") +""", + ) parser.add_argument( - '-s', - '--status', - action='store_true', - help="Print the current status of all known devices.") + "-s", + "--status", + # backwards compatibility + "--status-dev", + # None if flag was not specified + default=None, + # "all" if flag was specified without arguments + const="all", + # otherwise, match against the choice table + nargs="?", + choices=[ + "baseband", + "compress", + "crypto", + "dma", + "event", + "mempool", + "misc", + "net", + "regex", + "ml", + "all", + ], + help="Print the status of device group (default: all devices).", + ) + bind_action = parser.add_mutually_exclusive_group() + bind_action.add_argument( + "-b", + "--bind", + metavar="DRIVER", + help='Select the driver to use ("none" to unbind the device)', + ) + bind_action.add_argument( + "-u", + "--unbind", + action="store_true", + help='Unbind a device (equivalent to "-b none")', + ) parser.add_argument( - '--status-dev', - help="Print the status of given device group.", - choices=['baseband', 'compress', 'crypto', 'dma', 'event', - 'mempool', 'misc', 'net', 'regex', 'ml']) - bind_group = parser.add_mutually_exclusive_group() - bind_group.add_argument( - '-b', - '--bind', - metavar='DRIVER', - help="Select the driver to use or \"none\" to unbind the device") - bind_group.add_argument( - '-u', - '--unbind', - action='store_true', - help="Unbind a device (equivalent to \"-b none\")") + "--noiommu-mode", + action="store_true", + help="If IOMMU is not available, enable no IOMMU mode for VFIO drivers", + ) parser.add_argument( - '--noiommu-mode', - action='store_true', - help="If IOMMU is not available, enable no IOMMU mode for VFIO drivers") + "--force", + action="store_true", + help="""\ +Override restriction on binding devices in use by Linux. WARNING: This can lead +to loss of network connection and should be used with caution. +""", + ) parser.add_argument( - '--force', - action='store_true', - help=""" -Override restriction on binding devices in use by Linux" -WARNING: This can lead to loss of network connection and should be used with caution. -""") + "-G", + "--gid", + type=lambda g: grp.getgrnam(g).gr_gid, + default=-1, + help="For VFIO, specify the group ID to set IOMMU group ownership", + ) parser.add_argument( - 'devices', - metavar='DEVICE', - nargs='*', - help=""" + "-U", + "--uid", + type=lambda u: pwd.getpwnam(u).pw_uid, + default=-1, + help="For VFIO, specify the user ID to set IOMMU group ownership", + ) + parser.add_argument( + "devices", + metavar="DEVICE", + nargs="*", + help="""\ Device specified as PCI "domain:bus:slot.func" syntax or "bus:slot.func" syntax. For devices bound to Linux kernel drivers, they may be referred to by interface name. -""") +""", + ) opt = parser.parse_args() - if opt.status_dev: - status_flag = True - status_dev = opt.status_dev + ctx = DevbindCtx() + if opt.status: - status_flag = True - status_dev = "all" - if opt.force: - force_flag = True - if opt.noiommu_mode: - noiommu_flag = True - if opt.bind: - b_flag = opt.bind - elif opt.unbind: - b_flag = "none" - args = opt.devices - - if not b_flag and not status_flag: - print("Error: No action specified for devices. " - "Please give a --bind, --ubind or --status option", - file=sys.stderr) + ctx.status = True + ctx.status_group = opt.status + if opt.bind or opt.unbind: + ctx.bind = True + ctx.driver = "" if opt.unbind else opt.bind + # support any capitalization for binding to "none" + if ctx.driver.lower() == "none": + ctx.driver = "" + if not ctx.status and not ctx.bind: + print("Error: No action specified.", file=sys.stderr) parser.print_usage() sys.exit(1) - if b_flag and not args: - print("Error: No devices specified.", file=sys.stderr) + ctx.noiommu = opt.noiommu_mode + ctx.force = opt.force + ctx.devices = opt.devices + ctx.vfio_uid = opt.uid + ctx.vfio_gid = opt.gid + + # if status is displayed, devices shouldn't be passed + if not ctx.bind and ctx.devices: + print("Error: Devices should not be specified with --status action.") parser.print_usage() sys.exit(1) + # if bind is used, devices should be passed + elif ctx.bind and not ctx.devices: + print("Error: No devices specified for --bind/--unbind action.") + parser.print_usage() + sys.exit(1) + return ctx - # resolve any PCI globs in the args - new_args = [] - for arg in args: - new_args.extend(pci_glob(arg)) - args = new_args - - -def do_arg_actions(): - '''do the actual action requested by the user''' - global b_flag - global status_flag - global force_flag - global args - if b_flag in ["none", "None"]: - unbind_all(args, force_flag) - elif b_flag is not None: - bind_all(args, b_flag, force_flag) - if status_flag: - if b_flag is not None: - clear_data() - # refresh if we have changed anything - get_device_details(network_devices) - get_device_details(baseband_devices) - get_device_details(crypto_devices) - get_device_details(dma_devices) - get_device_details(eventdev_devices) - get_device_details(mempool_devices) - get_device_details(compress_devices) - get_device_details(regex_devices) - get_device_details(ml_devices) - get_device_details(misc_devices) - show_status() +def _main(): + ctx = parse_args() + # initialize devbind data + ctx.devbind = Devbind() -def main(): - '''program main function''' - # check if lspci is installed, suppress any output - with open(os.devnull, 'w') as devnull: - ret = subprocess.call(['which', 'lspci'], - stdout=devnull, stderr=devnull) - if ret != 0: - sys.exit("'lspci' not found - please install 'pciutils'") - parse_args() - check_modules() - clear_data() - get_device_details(network_devices) - get_device_details(baseband_devices) - get_device_details(crypto_devices) - get_device_details(dma_devices) - get_device_details(eventdev_devices) - get_device_details(mempool_devices) - get_device_details(compress_devices) - get_device_details(regex_devices) - get_device_details(ml_devices) - get_device_details(misc_devices) - do_arg_actions() + if ctx.bind: + # resolve any PCI globs in devices + ctx.devices = [d for dev in ctx.devices for d in resolve_pci_glob(dev)] + bind_devices(ctx) + print() + if ctx.status: + print_status(ctx) if __name__ == "__main__": - main() + try: + # check if lspci and ip are installed before doing anything + check_installed("lspci", "pciutils") + check_installed("ip", "iproute2") + + # run the main function + _main() + except DevbindError as e: + sys.exit(f"Error: {e}") -- 2.43.5