blob: aa43faf8bbc5c97f5a8d9f5d27d2bf41066397e8 [file]
#!/usr/bin/env python3
# SPDX-License-Identifier: Linux-OpenIB
# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES
# PYTHON_ARGCOMPLETE_OK
from __future__ import annotations
import argparse
import collections
import copy
import importlib
import inspect
import itertools
import json
import os
import re
import subprocess
import sys
import tempfile
from abc import ABC, abstractmethod
from base64 import b64encode, b64decode
from typing import *
from zlib import compress, decompress
DEVDIR = os.environ.get("RDMA_TOPO_DEVDIR", "/sys/bus/pci/devices/")
BDF_RE = re.compile(r"^([0-9a-f]+?):([0-9a-f]{2}?):([0-9a-f]{2}?)\.([0-9a-f])$")
KERNEL_ACS_ISOLATED = "xx111x1"
pci_vendors = {
"MELLANOX": 0x15B3,
"NVIDIA": 0x10DE,
}
PCI_EXT_CAP_ID_ACS = 0x000D
PCI_EXT_CAP_ID_ATS = 0x000F
PCI_VPD_LRDT = 0x80 # Large Resource Data Type flag
PCI_VPD_END_SMALL = 0x78 # Small Resource End Tag
PCI_VPD_END_LARGE = 0x79 # Large Resource End Tag
PCI_VPD_LRDT_ID = 0x82 # Identifier String
PCI_VPD_LRDT_RO = 0x90 # VPD-R (Read-Only)
class CommandError(Exception):
pass
TOPO_NOT_SUPPORTED = CommandError("No supported topology detected")
def yesno(b: bool) -> str:
return "yes" if b else "no"
class SysfsDevice(object):
REQUIRED_KEYS = ["realpath", "config", "modalias"]
ENCODED_KEYS = ["config", "vpd"]
@property
def realpath(self) -> str:
return self.data["realpath"]
@property
def config(self) -> bytes:
return self.data["config"]
@property
def iommu_group(self) -> Optional[int]:
return self.data.get("iommu_group", None)
@property
def modalias(self) -> str:
return self.data["modalias"]
@property
def numa_node(self) -> Optional[int]:
return self.data.get("numa_node", None)
@property
def vpd(self) -> Optional[bytes]:
return self.data.get("vpd", None)
@property
def subsystems(self) -> Optional[Dict[str, List[str]]]:
return self.data.get("subsystems", None)
@property
def id(self) -> str:
return os.path.basename(self.data["realpath"])
def __init__(self, id: str):
def read(*parts: str) -> bytes:
with open(os.path.join(devdir, *parts), "rb") as F:
return F.read()
def string(b: bytes) -> str:
return b.decode("ascii").strip()
def subsystems() -> Dict[str, List[str]]:
res: Dict[str, List[str]] = collections.defaultdict(list)
for fn in os.listdir(devdir):
if fn in {"drm", "infiniband", "net", "nvme"}:
res[fn].extend(os.listdir(os.path.join(devdir, fn)))
return dict(res)
def iommu_group() -> int:
return int(
os.path.basename(os.readlink(os.path.join(devdir, "iommu_group")))
)
devdir = os.path.join(DEVDIR, id)
readers = {
"realpath": lambda: os.path.realpath(devdir),
"config": lambda: read("config"),
"iommu_group": iommu_group,
"modalias": lambda: string(read("modalias")),
"numa_node": lambda: int(string(read("numa_node"))),
"vpd": lambda: read("vpd"),
"subsystems": subsystems,
}
self.data: Dict[str, Any] = {}
for k, reader in readers.items():
try:
self.data[k] = reader()
except FileNotFoundError as e:
if k in SysfsDevice.REQUIRED_KEYS:
raise CommandError(f"Missing required sysfs path: {e.filename}")
self.data[k] = None
except PermissionError as e:
raise CommandError(
f"Cannot read sysfs path: {e.filename}. Are you root?"
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> SysfsDevice:
obj = object.__new__(cls)
obj.data = copy.deepcopy(data)
for k in SysfsDevice.REQUIRED_KEYS:
if k not in obj.data or obj.data[k] is None:
raise ValueError(f"Missing required key '{k}'")
for k in SysfsDevice.ENCODED_KEYS:
if k in obj.data and obj.data[k] is not None:
try:
obj.data[k] = decompress(b64decode(obj.data[k]))
except Exception as e:
raise ValueError(f"Invalid encoded value for key '{k}': {e}")
return obj
def to_dict(self) -> Dict[str, Any]:
res = copy.deepcopy(self.data)
for k in SysfsDevice.ENCODED_KEYS:
if k not in res:
continue
if res[k] is not None:
res[k] = b64encode(compress(res[k])).decode("ascii")
else:
del res[k]
return res
def parse_vpd(vpd: Optional[bytes]) -> Tuple[Optional[str], Optional[str]]:
"""Parse VPD name and V3 UUID"""
if vpd is None:
return None, None
name = None
v3 = None
def items(data: bytes) -> Generator[Tuple[int, bytes]]:
while len(data) > 0:
tag = data[0]
if tag in [PCI_VPD_END_SMALL, PCI_VPD_END_LARGE]:
break
if tag & PCI_VPD_LRDT:
offset = 3
if len(data) < 3:
break
length = int.from_bytes(data[1:3], "little")
else:
offset = 1
length = tag & 0x07
if length > len(data) - offset:
break
yield (tag, data[offset : offset + length])
data = data[offset + length :]
def keywords(data: bytes) -> Generator[Tuple[str, bytes]]:
while len(data) >= 4:
length = int(data[2])
if length > len(data) - 3:
break
yield (data[:2].decode("ascii"), data[3 : 3 + length])
data = data[3 + length :]
try:
for tag, item in items(vpd):
if tag == PCI_VPD_LRDT_ID:
name = item.decode("ascii").strip()
if tag == PCI_VPD_LRDT_RO:
for keyword, value in keywords(item):
if keyword == "V3":
v3 = value.decode("ascii")
except UnicodeDecodeError:
pass
return (v3, name)
def parse_ext_cap(config: bytes, cap_id: int) -> Optional[bytes]:
"""Parse an extended capability from the PCI configuration space"""
if len(config) < 0x104:
return None
offset = 0x100
while offset and offset < len(config) - 4:
header = int.from_bytes(config[offset : offset + 4], "little")
next_offset = (header >> 20) & 0xFFC
if next_offset == 0 or next_offset <= offset + 4 or next_offset > len(config):
next_offset = len(config)
if (header & 0xFFFF) == cap_id:
return config[offset + 4 : next_offset]
if next_offset == len(config):
break
offset = next_offset
return None
def parse_acs_ctrl(config: bytes) -> Optional[int]:
"""Parse the ACS control register from the PCI configuration space"""
raw = parse_ext_cap(config, PCI_EXT_CAP_ID_ACS)
if raw is None or len(raw) < 4:
return None
return int.from_bytes(raw[2:4], "little")
def has_ats_cap(config: bytes) -> bool:
"""True if the device exposes an ATS capability"""
return parse_ext_cap(config, PCI_EXT_CAP_ID_ATS) is not None
def PCI_VDEVICE(vendor: str, device_id: int) -> re.Pattern:
"""Match a Vendor and device ID"""
vendor_id = pci_vendors[vendor]
return re.compile(rf"^pci:v{vendor_id:08X}d{device_id:08X}.*$")
def PCI_DEVICE_CLASS(cid: int) -> re.Pattern:
"""Match by exact programming class using the int coding from the kernel"""
class_id = (cid >> 16) & 0xFF
subclass_id = (cid >> 8) & 0xFF
progif = cid & 0xFF
return re.compile(rf"^pci:.*bc{class_id:02X}sc{subclass_id:02X}i{progif:02X}.*$")
def PCI_NVGPU() -> re.Pattern:
"""Match all NVIDIA GPUs"""
vendor_id = pci_vendors["NVIDIA"]
class_id = 0x03
return re.compile(rf"^pci:v{vendor_id:08X}.*bc{class_id:02X}.*$")
# Table of modalias matches to the device_type string.
# Order is important. The first matching device type is used.
pci_device_types = {
PCI_VDEVICE("NVIDIA", 0x22B1): "grace_rp", # NVIDIA Grace PCI Root Port Bridge
PCI_VDEVICE("NVIDIA", 0x22B2): "grace_rp", # NVIDIA Grace PCI Root Port Bridge
PCI_VDEVICE("NVIDIA", 0x22B8): "grace_rp", # NVIDIA Grace PCI Root Port Bridge
PCI_VDEVICE("MELLANOX", 0x1021): "cx_nic", # ConnectX-7
PCI_VDEVICE("MELLANOX", 0x1023): "cx_nic", # ConnectX-8
PCI_VDEVICE("MELLANOX", 0xA2DC): "bf3_nic", # BlueField-3
PCI_VDEVICE("MELLANOX", 0x2100): "cx_dma", # ConnectX-8 DMA Controller
PCI_VDEVICE("MELLANOX", 0x197B): "bf3_switch", # USP/DSP of a BF3 switch
PCI_VDEVICE("MELLANOX", 0x197C): "cx_switch", # USP/DSP of a CX switch
PCI_VDEVICE("MELLANOX", 0x1979): "cx_switch", # USP/DSP of a CX switch
PCI_DEVICE_CLASS(0x010802): "nvme",
PCI_NVGPU(): "nvgpu",
PCI_DEVICE_CLASS(0x060400): "bridge", # Generic PCI-PCI bridge / Root Port
}
dump_ignored = [
PCI_DEVICE_CLASS(0x060000), # Generic system peripheral
PCI_DEVICE_CLASS(0x060100), # ISA bridge
PCI_DEVICE_CLASS(0x080700), # Non-Essential Instrumentation
PCI_DEVICE_CLASS(0x088000), # System peripheral
PCI_DEVICE_CLASS(0x110100), # Performance counters
PCI_DEVICE_CLASS(0x130000), # Non-Essential Instrumentation
]
class PCIBDF(
collections.namedtuple("PCIBDF", ["segment", "bus", "device", "function"])
):
"""Bus Device Function for a PCI device"""
def as_pci(self):
return f"{self.segment}:{self.bus}:{self.device}.{self.function}"
def __str__(self):
return self.as_pci()
def __repr__(self):
return f"PCIBDF({self.segment}, {self.bus}, {self.device}, {self.function})"
def to_pcibdf(s: str) -> Optional[PCIBDF]:
g = BDF_RE.match(s)
if not g:
return None
return PCIBDF(*g.groups())
class PCIDevice(object):
device_type = ""
vpd_v3: Optional[str] = None
vpd_name: Optional[str] = None
parent: PCIDevice = None
def __init__(self, bdf: PCIBDF, sysfs_device: SysfsDevice):
self.bdf = bdf
self.sysfs_device = sysfs_device
self.iommu_group = self.sysfs_device.iommu_group
self.numa_node = self.sysfs_device.numa_node
self.modalias = self.sysfs_device.modalias
parent = os.path.basename(os.path.dirname(self.sysfs_device.realpath))
self.parent_bdf = to_pcibdf(parent)
for k, v in pci_device_types.items():
if k.match(self.modalias):
if self.parent_bdf is None and v == "bridge":
v = "generic_rp"
self.device_type = v
break
self.children: Set[PCIDevice] = set()
self.has_ats = False
def finish_loading(self):
"""Do more expensive parsing operations"""
if self.device_type == "cx_nic" or self.device_type == "cx_dma":
self.vpd_v3, self.vpd_name = parse_vpd(self.sysfs_device.vpd)
if "switch" in self.device_type or self.device_type.endswith("_rp"):
self.has_acs = self.get_acs_ctrl() is not None
if self.device_type == "cx_nic":
self.has_ats = has_ats_cap(self.sysfs_device.config)
def iterdownstream(self) -> Generator[PCIDevice, None, None]:
"""Iterate over all downstream devices of this device recursively"""
for pdev in self.children:
yield pdev
yield from pdev.iterdownstream()
def iterfulltree(self):
for pdev in self.iterupstream_path():
if not pdev.parent:
yield from pdev.iterdownstream()
def iterupstream_path(self):
"""Iterate over each step along the upstream path from the devices
parent to the root."""
pdev = self.parent
while pdev:
yield pdev
pdev = pdev.parent
def __repr__(self):
return f"PCIDevice({self.bdf})"
def get_acs_ctrl(self):
"""Read the ACS control register from the PCI configuration space"""
return parse_acs_ctrl(self.sysfs_device.config)
def get_subsystems(self):
"""Return a list of subsystem the PCI device is connected to"""
return self.sysfs_device.subsystems or {}
class NVCX_Complex(ABC):
@property
@abstractmethod
def primary_nic(self) -> PCIDevice:
"""Primary ConnectX PF for this complex."""
pass
@abstractmethod
def compute_acs(self, virt: Optional[bool]) -> Dict[PCIDevice, str]:
"""Computes the ACS values for this complex.
Used to implement commands which check and/or set ACS values.
"""
pass
@abstractmethod
def to_dict(self) -> Dict[str, Any]:
"""Returns a JSON-serializable dictionary which represents this complex.
Used to implement topology dump command with `-j / --json` flag.
Output format should be maintained for backwards compatibility.
"""
pass
@abstractmethod
def check(self, virt: Optional[bool]) -> bool:
"""Runs additional checks on this complex.
Returns True if all checks pass, False otherwise.
Used to implement the `check` command.
"""
pass
@abstractmethod
def __str__(self) -> str:
"""Returns a string representation of this complex.
Used to implement the `topo` command.
"""
pass
class NVCX_DMA_Complex(NVCX_Complex):
"""Hold the related PCI functions together. A complex includes a CX PF, a CX
DMA function, an GPU and related PCI switches in the DMA function
segment."""
def __init__(self, cx_pfs: Set[PCIDevice], cx_dma: PCIDevice, nvgpu: PCIDevice):
self.cx_pfs = cx_pfs - {cx_dma}
self.cx_pf = sorted(self.cx_pfs, key=lambda x: x.bdf)[0]
self.cx_dma = cx_dma
self.nvgpu = nvgpu
# Identify the switch ports that are part of the shared path that
# handles the P2P traffic
self.shared_usp = self.__find_shared_usp()
for pdev in self.cx_dma.iterupstream_path():
if pdev in self.shared_usp.children:
self.cx_dma_dsp = pdev
for pdev in self.nvgpu.iterupstream_path():
if pdev in self.shared_usp.children:
self.nvgpu_dsp = pdev
# There can be a NVMe device connected to the CX NIC as well. For NVMe
# it is best to match with GPUs on the same socket, so a NUMA aware
# approach would be fine, but also the GPU/NIC/NVMe could be
# consistently paired based on the physical layout.
self.nvmes: Set[PCIDevice] = set()
for pdev in self.cx_pf.iterfulltree():
if pdev.device_type == "nvme":
self.nvmes.add(pdev)
@property
def primary_nic(self) -> PCIDevice:
return self.cx_pf
def __find_shared_usp(self) -> PCIDevice:
"""Find the USP that is shared by both devices, the immediate downstream
bus is the point in the topology where P2P traffic will switch from an
upstream to downstream direction."""
common_path = set(self.cx_dma.iterupstream_path()).intersection(
set(self.nvgpu.iterupstream_path())
)
assert common_path
for pdev in self.cx_dma.iterupstream_path():
if pdev in common_path:
assert pdev.device_type == "cx_switch"
for i in pdev.children:
assert i.device_type == "cx_switch"
return pdev
def compute_acs(self, _: Optional[bool]) -> Dict[PCIDevice, str]:
acs: Dict[PCIDevice, str] = {}
# For the DSP in the shared switch toward the CX8 DMA Direct interface:
# Enable these bits:
# bit-4 : ACS Upstream Forwarding
# bit-3 : ACS P2P Completion Redirect
# bit-0 : ACS Source Validation
# Disable these bits:
# bit-2 : ACS P2P Request Redirect
assert self.cx_dma_dsp.has_acs
acs[self.cx_dma_dsp] = "xx110x1"
# For the DSP in the shared switch toward the GPU:
# Enable the following bits:
# bit-4 : ACS Upstream Forwarding
# bit-2 : ACS P2P Request Redirect
# bit-0 : ACS Source Validation
# Disable the following bits:
# bit-3 : ACS P2P Completion Redirect
assert self.nvgpu_dsp.has_acs
acs[self.nvgpu_dsp] = "xx101x1"
# Disable ACS SV on the root port, this forces the entire segment
# into one iommu_group and avoids kernel bugs building groups for
# irregular ACS.
for pdev in self.cx_dma_dsp.iterupstream_path():
if not pdev.parent:
assert pdev.has_acs
acs[pdev] = "xx111x0"
return acs
def to_dict(self) -> Dict[str, Any]:
res = {
"rdma_nic_pf_bdf": str(self.cx_pf.bdf),
"rdma_dma_bdf": str(self.cx_dma.bdf),
"gpu_bdf": str(self.nvgpu.bdf),
"subsystems": {},
}
devname = self.cx_pf.vpd_name
if devname:
res["rdma_nic_vpd_name"] = self.cx_pf.vpd_name
if self.cx_pf.numa_node is not None:
res["numa_node"] = self.cx_pf.numa_node
if self.nvmes:
res["nvme_bdf"] = str(next(iter(self.nvmes)).bdf)
for pdev in sorted(
itertools.chain(self.cx_pfs, [self.nvgpu, self.cx_dma], self.nvmes),
key=lambda x: x.bdf,
):
subsys = pdev.get_subsystems()
if subsys:
res["subsystems"][str(pdev.bdf)] = {
subsys: list(devs) for subsys, devs in subsys.items()
}
return res
def __str__(self):
res = f"RDMA NIC={self.cx_pf.bdf}, GPU={self.nvgpu.bdf}, RDMA DMA Function={self.cx_dma.bdf}\n"
devname = self.cx_pf.vpd_name
if devname:
res += f"\t{devname}\n"
if self.cx_pf.numa_node is not None:
res += f"\tNUMA Node: {self.cx_pf.numa_node}\n"
if len(self.cx_pfs):
res += print_list("NIC PCI device", [str(I.bdf) for I in self.cx_pfs])
subsystems: Dict[str, Set[str]] = collections.defaultdict(set)
for pdev in itertools.chain(self.cx_pfs, [self.nvgpu, self.cx_dma], self.nvmes):
for k, v in pdev.get_subsystems().items():
subsystems[k].update(v)
res += print_list("RDMA device", subsystems["infiniband"])
res += print_list("Net device", subsystems["net"])
res += print_list("DRM device", subsystems["drm"])
res += print_list("NVMe device", subsystems["nvme"])
return res[:-1]
def check(self, _: Optional[bool]) -> bool:
# Correct iommu_groups are required to avoid NVreg_GrdmaPciTopoCheckOverride
if (
self.cx_dma.iommu_group == self.nvgpu.iommu_group
and self.cx_dma.iommu_group is not None
):
check_ok(
f"Kernel iommu_group for DMA {self.cx_dma.bdf} and GPU {self.nvgpu.bdf} are both {self.cx_dma.iommu_group}"
)
return True
check_fail(
f"Kernel iommu_group for DMA {self.cx_dma.bdf} and GPU {self.nvgpu.bdf} are not equal {self.cx_dma.iommu_group} != {self.nvgpu.iommu_group}"
)
return False
class NVCX_Inline_Complex(NVCX_Complex):
def __init__(
self,
root_port: PCIDevice,
shared_usp: PCIDevice,
cx_pf: PCIDevice,
nvgpu: PCIDevice,
):
self.root_port = root_port
self.cx_pf = cx_pf
self.nvgpu = nvgpu
self.cx_pf_dsp = None
self.nvgpu_dsp = None
for dsp in shared_usp.children:
for pdev in dsp.iterdownstream():
if pdev.device_type == "cx_nic":
if self.cx_pf_dsp is not None:
raise ValueError(
f"Multiple CX NIC DSPs under the same shared switch not supported"
)
self.cx_pf_dsp = dsp
break
if pdev.device_type == "nvgpu":
if self.nvgpu_dsp is not None:
raise ValueError(
f"Multiple GPU DSPs under the same shared switch not supported"
)
self.nvgpu_dsp = dsp
break
if not self.cx_pf_dsp:
raise ValueError(f"CX NIC DSP not found in the topology")
if not self.nvgpu_dsp:
raise ValueError(f"GPU DSP not found in the topology")
@property
def primary_nic(self) -> PCIDevice:
return self.cx_pf
def compute_acs(self, virt: Optional[bool]) -> Dict[PCIDevice, str]:
if not self.cx_pf_dsp.has_acs:
raise CommandError(f"CX NIC DSP {self.cx_pf_dsp.bdf} lacks ACS")
if not self.nvgpu_dsp.has_acs:
raise CommandError(f"GPU DSP {self.nvgpu_dsp.bdf} lacks ACS")
if not self.root_port.has_acs:
raise CommandError(f"Root port {self.root_port.bdf} lacks ACS")
if virt is None:
raise CommandError("Unexpected: Could not determine virt mode")
if virt:
return {
# The DSPs of the NIC which is non DD in the shared switch should
# have the following enabled:
# bit-6 : ACS Direct Translated P2P
# bit-4 : ACS Upstream Forwarding
# bit-3 : ACS P2P Completion Redirect
# bit-2 : ACS P2P Request Redirect
# bit-0 : ACS Source Validation
self.cx_pf_dsp: "1x111x1",
# The DSPs of the GPU in the shared switch and the RP of the NIC/GPU
# should have the following enabled, matching the kernel default:
# bit-4 : ACS Upstream Forwarding
# bit-3 : ACS P2P Completion Redirect
# bit-2 : ACS P2P Request Redirect
# bit-0 : ACS Source Validation
self.nvgpu_dsp: KERNEL_ACS_ISOLATED,
self.root_port: KERNEL_ACS_ISOLATED,
}
else:
return {
# The DSPs of both the NIC and GPU in the shared switch and
# RPs of the NIC/GPU should have the following disabled:
# bit-4 : ACS Upstream Forwarding
# bit-3 : ACS P2P Completion Redirect
# bit-2 : ACS P2P Request Redirect
# bit-0 : ACS Source Validation
self.cx_pf_dsp: "xx000x0",
self.nvgpu_dsp: "xx000x0",
self.root_port: "xx000x0",
}
def to_dict(self) -> Dict[str, Any]:
res = {
"rdma_nic_pf_bdf": str(self.cx_pf.bdf),
"gpu_bdf": str(self.nvgpu.bdf),
"subsystems": {},
}
devname = self.cx_pf.vpd_name
if devname:
res["rdma_nic_vpd_name"] = self.cx_pf.vpd_name
if self.cx_pf.numa_node is not None:
res["numa_node"] = self.cx_pf.numa_node
if self.cx_pf.has_ats:
res["rdma_nic_ats"] = self.cx_pf.has_ats
for pdev in sorted(
itertools.chain([self.cx_pf, self.nvgpu]),
key=lambda x: x.bdf,
):
subsys = pdev.get_subsystems()
if subsys:
res["subsystems"][str(pdev.bdf)] = {
subsys: list(devs) for subsys, devs in subsys.items()
}
return res
def __check_ats(self, virt: bool) -> bool:
status = "available" if self.cx_pf.has_ats else "not available"
msg = f"ATS capability for {self.cx_pf.device_type} {self.cx_pf.bdf} is {status}"
if self.cx_pf.has_ats != virt:
check_fail(msg)
return False
check_ok(msg)
return True
def __check_iommu_group(self, virt: bool) -> bool:
cxpf = f"{self.cx_pf.device_type} {self.cx_pf.bdf}"
nvgpu = f"{self.nvgpu.device_type} {self.nvgpu.bdf}"
prefix = f"Kernel iommu_group for {cxpf} and {nvgpu}"
equal = f"equal {self.cx_pf.iommu_group} == {self.nvgpu.iommu_group}"
not_equal = f"not equal {self.cx_pf.iommu_group} != {self.nvgpu.iommu_group}"
if virt:
if self.cx_pf.iommu_group is None:
check_fail(f"Kernel iommu_group is missing for {cxpf}")
return False
if self.nvgpu.iommu_group is None:
check_fail(f"Kernel iommu_group is missing for {nvgpu}")
return False
if self.cx_pf.iommu_group == self.nvgpu.iommu_group:
check_fail(f"{prefix} are {equal}")
return False
check_ok(f"{prefix} are {not_equal}")
return True
else:
if self.cx_pf.iommu_group is None and self.nvgpu.iommu_group is None:
check_ok(f"{prefix} are not set")
return True
if self.cx_pf.iommu_group != self.nvgpu.iommu_group:
check_fail(f"{prefix} are {not_equal}")
return False
check_ok(f"{prefix} are {equal}")
return True
def check(self, virt: Optional[bool]) -> bool:
assert virt is not None
res_ats = self.__check_ats(virt)
res_iommu_group = self.__check_iommu_group(virt)
return res_ats and res_iommu_group
def __str__(self):
res = f"RDMA NIC={self.cx_pf.bdf}, GPU={self.nvgpu.bdf}\n"
devname = self.cx_pf.vpd_name
if devname:
res += f"\t{devname}\n"
if self.cx_pf.numa_node is not None:
res += f"\tNUMA Node: {self.cx_pf.numa_node}\n"
res += f"\tNIC ATS: {yesno(self.cx_pf.has_ats)}\n"
subsystems: Dict[str, Set[str]] = collections.defaultdict(set)
for pdev in [self.cx_pf, self.nvgpu]:
for k, v in pdev.get_subsystems().items():
subsystems[k].update(v)
res += print_list("RDMA device", subsystems["infiniband"])
res += print_list("Net device", subsystems["net"])
res += print_list("DRM device", subsystems["drm"])
res += print_list("NVMe device", subsystems["nvme"])
return res[:-1]
def check_parent(pdev: PCIDevice, parent_type: str):
if not pdev or not pdev.parent:
return None
if pdev.parent.device_type != parent_type:
return None
return pdev.parent
class PCITopo(object):
"""Load the PCI topology from sysfs and organize it"""
def __init__(
self,
sysfs_dump: Optional[str] = None,
virt: Optional[bool] = None,
autodetect_virt: bool = True,
):
if sysfs_dump:
sysfs_devices = self.__parse_dump(sysfs_dump)
else:
sysfs_devices = [SysfsDevice(fn) for fn in os.listdir(DEVDIR)]
self.devices = self.__load_devices(sysfs_devices)
self.nvcxs: List[NVCX_Complex] = []
self.has_cx_dma = any(
pdev.device_type == "cx_dma" for pdev in self.devices.values()
)
self.has_gpu_and_nic = False
if self.has_cx_dma and virt is not None:
raise CommandError(
"--virt / --no-virt is not supported on DMA-based topologies"
)
self.virt = virt
self._autodetect_virt = autodetect_virt
if not self.has_cx_dma:
found = {
"cx_switch": False,
"nvgpu": False,
"cx_nic": False,
}
for pdev in self.devices.values():
if pdev.device_type not in found.keys():
continue
found[pdev.device_type] = True
self.has_gpu_and_nic = all(found.values())
if not self.has_gpu_and_nic:
return
for pdev in self.devices.values():
pdev.finish_loading()
self.__build_topo()
def __parse_dump(self, filename: str) -> List[SysfsDevice]:
res: List[SysfsDevice] = []
try:
with open(filename, "rt") as F:
data = json.load(F)
if not isinstance(data, list):
raise ValueError(f"Expected list, got '{type(data).__name__}'")
num_items = len(data)
for i, item in enumerate(data):
if not isinstance(item, dict):
raise ValueError(
f"Item {i}/{num_items}: Expected dictionary, got '{type(item).__name__}'"
)
try:
res.append(SysfsDevice.from_dict(item))
except Exception as e:
raise ValueError(f"Item {i}/{num_items}: {e}") from e
return res
except (json.JSONDecodeError, ValueError) as e:
raise CommandError(f"Invalid sysfs dump file: {e}")
except (FileNotFoundError, PermissionError) as e:
raise CommandError(f"Failed to read sysfs dump file: {e}")
def __load_devices(self, sysfs_devices: List[SysfsDevice]):
res: Dict[PCIBDF, PCIDevice] = {}
for sdev in sysfs_devices:
bdf = to_pcibdf(sdev.id)
if not bdf:
continue
assert bdf not in res
res[bdf] = PCIDevice(bdf, sdev)
return res
def __get_nvcx_complex(self, cx_dma: PCIDevice):
"""Match the topology for the switch complex using a CX DMA function and a
single GPU. It has two nested switches:
RP --> SW -> CX_DMA
-> SW -> GPU
"""
assert cx_dma.device_type == "cx_dma"
if not cx_dma.vpd_v3:
raise ValueError(f"CX DMA function {cx_dma} does not have a VPD V3 UUID")
# The DMA and PF are matched using the UUID from the VPD
cx_pfs = self.vpd_v3s.get(cx_dma.vpd_v3)
if cx_pfs is None:
raise ValueError(
f"CX DMA function {cx_dma} does not have a matching PF, V3 UUID matching failed"
)
return None
# Path from the DMA to the root port
cx_dma_dsp = check_parent(cx_dma, "cx_switch")
cx_usp = check_parent(cx_dma_dsp, "cx_switch")
grace_rp = check_parent(cx_usp, "grace_rp")
if not grace_rp:
raise ValueError(
f"CX DMA function {cx_dma} has an unrecognized upstream path"
)
# Path from the GPU to the root port
nvgpus = [
pdev for pdev in grace_rp.iterdownstream() if pdev.device_type == "nvgpu"
]
if len(nvgpus) != 1:
raise ValueError(f"CX DMA function {cx_dma} does not have a nearby GPU")
nvgpu = nvgpus[0]
nvgpu_dsp2 = check_parent(nvgpu, "cx_switch")
nvgpu_usp2 = check_parent(nvgpu_dsp2, "cx_switch")
nvgpu_dsp1 = check_parent(nvgpu_usp2, "cx_switch")
if cx_usp != check_parent(nvgpu_dsp1, "cx_switch"):
raise ValueError(
f"CX DMA function {cx_dma} has an unrecognized upstream path from the GPU"
)
# Sanity check there is nothing unexpected in the topology
alldevs = {
cx_dma,
cx_dma_dsp,
cx_usp,
nvgpu,
nvgpu_dsp2,
nvgpu_usp2,
nvgpu_dsp1,
}
topodevs = set(grace_rp.iterdownstream())
if alldevs != topodevs:
raise ValueError(
f"CX DMA function {cx_dma} has unexpected PCI devices in the topology"
)
return NVCX_DMA_Complex(cx_pfs, cx_dma, nvgpu)
def __get_nvcx_inline_complex(self, nvgpu: PCIDevice):
"""Match the topology for the inline complex using a GPU and a CX NIC.
RP --> SW -> CX_NIC
-> SW -> GPU
"""
assert nvgpu.device_type == "nvgpu"
nvgpu_dsp2 = check_parent(nvgpu, "cx_switch")
nvgpu_usp2 = check_parent(nvgpu_dsp2, "cx_switch")
nvgpu_dsp1 = check_parent(nvgpu_usp2, "cx_switch")
shared_usp1 = check_parent(nvgpu_dsp1, "cx_switch")
if not shared_usp1:
raise ValueError(f"GPU {nvgpu} has an unrecognized upstream path")
for pdev in shared_usp1.iterupstream_path():
if pdev.device_type == "generic_rp":
root_port = pdev
break
else:
raise ValueError(
f"Could not find root port for shared USP {shared_usp1.bdf}"
)
for pdev in shared_usp1.iterdownstream():
if pdev.device_type == "cx_nic":
cx_nic = pdev
break
else:
raise ValueError(f"GPU {nvgpu} does not have a nearby CX NIC")
return NVCX_Inline_Complex(root_port, shared_usp1, cx_nic, nvgpu)
def __auto_detect_virt(self) -> bool:
"""Auto-detect if virtualization will be used on this system"""
first = self.nvcxs[0].primary_nic.has_ats
if not all(nvcx.primary_nic.has_ats == first for nvcx in self.nvcxs):
raise CommandError(
"Could not auto-detect virtualization: CX NICs have different ATS settings"
)
return first
def __build_topo(self):
"""Collect cross-device information together and build the NVCX_Complex
objects for the cx_dma functions"""
self.vpd_v3s: Dict[str, Set[PCIDevice]] = collections.defaultdict(set)
for pdev in self.devices.values():
if pdev.parent_bdf and pdev.parent_bdf in self.devices:
pdev.parent = self.devices[pdev.parent_bdf]
pdev.parent.children.add(pdev)
# Many PCI functions may share the same V3
if pdev.vpd_v3:
self.vpd_v3s[pdev.vpd_v3].add(pdev)
if self.has_cx_dma:
for pdev in self.devices.values():
if pdev.device_type == "cx_dma":
nvcx = self.__get_nvcx_complex(pdev)
self.nvcxs.append(nvcx)
elif self.has_gpu_and_nic:
for pdev in self.devices.values():
if pdev.device_type == "nvgpu":
nvcx = self.__get_nvcx_inline_complex(pdev)
self.nvcxs.append(nvcx)
if self.has_gpu_and_nic and len(self.nvcxs) > 0:
if self.virt is None and self._autodetect_virt:
self.virt = self.__auto_detect_virt()
self.nvcxs.sort(key=lambda x: x.primary_nic.bdf)
@property
def supported(self) -> bool:
"""True if the system has a topology that is supported by the rdma_topo tool"""
return (self.has_cx_dma or self.has_gpu_and_nic) and len(self.nvcxs) > 0
def compute_acs(self):
"""Return a dictionary of PCI devices and the ACS mask the device should
have"""
acs: Dict[PCIDevice, str] = {}
for nvcx in self.nvcxs:
acs.update(nvcx.compute_acs(self.virt))
# Enable, using kernel default, or disable ACS on all other CX
# bridges and Grace RP based on the virt parameter or if the topology
# has CX DMA functions.
#
# To enable (matches kernel default):
# bit-4 : ACS Upstream Forwarding
# bit-3 : ACS P2P Completion Redirect
# bit-2 : ACS P2P Request Redirect
# bit-0 : ACS Source Validation
for pdev in self.devices.values():
if (
pdev not in acs
and ("switch" in pdev.device_type or "grace_rp" in pdev.device_type)
and pdev.has_acs
):
acs[pdev] = (
KERNEL_ACS_ISOLATED if self.has_cx_dma or self.virt else "xx000x0"
)
return acs
def add_sysfs_dump_argument(parser):
parser.add_argument(
"-F",
"--sysfs-dump-file",
action="store",
default=None,
dest="sysfs_dump",
help="Use a file produced by the rdma_topo dump command as input",
)
def add_virt_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--virt",
action=argparse.BooleanOptionalAction,
default=None,
dest="virt",
help="Whether virtualization will be used on this system. Auto-detect if not set.",
)
# -------------------------------------------------------------------
def print_list(title: str, items: list[str]):
if not items:
return ""
if len(items) > 1:
title = title + "s"
list_str = ", ".join(sorted(items))
return f"\t{title}: {list_str}\n"
def args_topology(parser):
parser.add_argument(
"-j",
"--json",
action="store_true",
dest="json",
help="Output in machine readable JSON format",
)
add_sysfs_dump_argument(parser)
def topo_json(topo: PCITopo):
import json
jtop = []
for nvcx in topo.nvcxs:
jtop.append(nvcx.to_dict())
print(json.dumps(jtop, indent=4))
def cmd_topology(args):
"""List the ConnectX NICs in the system with the corresponding NIC
function, associated GPU, and, optionally, DMA Direct function."""
topo = PCITopo(args.sysfs_dump, virt=None, autodetect_virt=False)
if not topo.supported:
raise TOPO_NOT_SUPPORTED
if args.json:
return topo_json(topo)
for nvcx in topo.nvcxs:
print(nvcx)
cmd_topology.__aliases__ = ("topo",)
# -------------------------------------------------------------------
def update_file(fn: str, new_content: str):
"""Make fn have new_content. If fn already has new_content nothing is
done."""
try:
with open(fn, "rt") as F:
old = F.read()
if old == new_content:
return False
except FileNotFoundError:
pass
with tempfile.NamedTemporaryFile(dir=os.path.dirname(fn), mode="wt") as F:
F.write(new_content)
F.flush()
os.chmod(F.name, 0o644)
try:
os.link(F.name, fn)
except FileExistsError:
os.unlink(fn)
os.link(F.name, fn)
return True
def args_write_grub_acs(parser):
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
dest="dry_run",
help="Output the grub configuration to stdout and make no changes",
)
parser.add_argument(
"--output",
action="store",
default="/etc/default/grub.d/config-acs.cfg",
help="Grub dropin file to use for the kernel command line",
)
add_virt_argument(parser)
def cmd_write_grub_acs(args):
"""Generate a grub dropin file to have the kernel commandline set the
required ACS flags during system boot. This is the recommended way to
configure ACS on systems but requires a compatible kernel.
If the system does not have any need of ACS flags the dropin file will be
removed. This command is intended for Debian style systems with a
/etc/default/grub.d and update-grub command."""
topo = PCITopo(None, args.virt)
if not topo.supported:
if args.dry_run:
raise TOPO_NOT_SUPPORTED
if os.path.exists(args.output):
print(
f"W: Found ACS drop-in file {args.output} but the system does not have a supported topology. Deleting file."
)
os.unlink(args.output)
return
acs = topo.compute_acs()
config_acs = [
f"{acs}@{pdev.bdf}"
for pdev, acs in sorted(acs.items(), key=lambda x: x[0].bdf)
if acs != KERNEL_ACS_ISOLATED
]
acs_arg = ";".join(config_acs)
grub_conf = [
f"# Generated by {sys.argv[0]} do not change. ACS settings for RDMA GPU Direct",
f'GRUB_CMDLINE_LINUX="$GRUB_CMDLINE_LINUX pci=config_acs=\\"{acs_arg}\\""',
]
grub_conf = "\n".join(grub_conf)
if args.dry_run:
print(grub_conf)
return
try:
os.makedirs(os.path.dirname(args.output))
except FileExistsError:
pass
if update_file(args.output, grub_conf + "\n"):
subprocess.check_call(["update-grub"])
# -------------------------------------------------------------------
def combine_acs(cur_acs, new_acs):
for idx, val in enumerate(new_acs[::-1]):
if val == "1":
cur_acs = cur_acs | (1 << idx)
elif val == "0":
cur_acs = cur_acs & (0xFFFF ^ (1 << idx))
return cur_acs
def args_setpci_acs(parser):
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
dest="dry_run",
help="Output the setpci commands to stdout and make no changes",
)
add_virt_argument(parser)
def cmd_setpci_acs(args):
"""Execute a series of set_pci commands that will immediately change the ACS
settings to the required values. This is compatible with older kernels, but
is not recommended. The kernel must boot with ACS enabled and the GPU driver
must have the NVreg_GrdmaPciTopoCheckOverride=1 reg key set to disable
safety checks that old kernels cannot support.
NOTE: In this configuration unprivileged userspace can trigger platform RAS
failures, use with caution!
"""
topo = PCITopo(None, args.virt)
if not topo.supported:
raise TOPO_NOT_SUPPORTED
acs = topo.compute_acs()
cmds: List[List[str]] = []
for pdev, acs in sorted(acs.items(), key=lambda x: x[0].bdf):
cur_acs = pdev.get_acs_ctrl()
if cur_acs is None:
raise CommandError(
f"Could not read ACS control register for {pdev.device_type} {pdev.bdf}"
)
new_acs = combine_acs(cur_acs, acs)
if new_acs == cur_acs:
continue
cmd = ["setpci", "-r", "-s", str(pdev.bdf), f"ECAP_ACS+0x6.w={new_acs:04x}"]
cmds.append(cmd)
if args.dry_run:
for cmd in cmds:
print(" ".join(cmd))
return
for cmd in cmds:
subprocess.check_call(cmd)
# -------------------------------------------------------------------
def args_check(parser):
add_sysfs_dump_argument(parser)
add_virt_argument(parser)
def check_ok(msg: str):
print(f"OK\t{msg}")
def check_fail(msg: str):
print(f"FAIL\t{msg}")
def cmd_check(args):
"""Check that the running kernel and PCI environment are setup correctly for
GPU Direct with ConnectX DMA Direct PCI functions."""
topo = PCITopo(args.sysfs_dump, args.virt)
if not topo.supported:
raise TOPO_NOT_SUPPORTED
if topo.has_cx_dma:
check_ok("All ConnectX DMA functions have correct PCI topology")
elif topo.has_gpu_and_nic:
check_ok("All NIC/GPU complexes have correct PCI topology")
fatal = False
acs = topo.compute_acs()
for pdev, acs in sorted(acs.items(), key=lambda x: x[0].bdf):
cur_acs = pdev.get_acs_ctrl()
if cur_acs is None:
check_fail(
f"Could not read ACS control register for {pdev.device_type} {pdev.bdf}"
)
fatal = True
continue
new_acs = combine_acs(cur_acs, acs)
if new_acs == cur_acs:
check_ok(
f"ACS for {pdev.device_type} {pdev.bdf} has correct values {cur_acs:07b} = {acs}"
)
else:
check_fail(
f"ACS for {pdev.device_type} {pdev.bdf} has incorrect values {cur_acs:07b} != {acs}, (0x{cur_acs:x} != 0x{new_acs:x})"
)
fatal = True
for nvcx in topo.nvcxs:
if not nvcx.check(topo.virt):
fatal = True
if fatal:
sys.exit(100)
# -------------------------------------------------------------------
def args_dump(parser):
pass
def cmd_dump(args) -> None:
"""Dump the PCI topology to a file that can be used as input"""
sd_json: List[Dict[str, Any]] = []
for fn in sorted(os.listdir(DEVDIR)):
sd = SysfsDevice(fn)
if any(d.match(sd.modalias) for d in dump_ignored):
continue
sd_json.append(sd.to_dict())
json.dump(sd_json, sys.stdout, indent=4)
# -------------------------------------------------------------------
def load_all_commands(name):
module = importlib.import_module(name)
for k in dir(module):
fn = getattr(module, k)
argsfn = getattr(module, "args_" + k[4:], None)
if argsfn is None or not k.startswith("cmd_") or not inspect.isfunction(fn):
continue
yield (k, fn, argsfn)
def get_cmd_aliases(fn):
if hasattr(fn, "__aliases__"):
return fn.__aliases__
return ()
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""NVIDIA ConnectX GPU Direct ACS tool for Direct NIC platforms
This tool is used to view and control the PCI Access Control Flags (ACS) related
to the Direct NIC topology on supported NVIDIA platforms with ConnectX and
Blackwell family GPUs.
Direct NIC platforms have a unique multipath PCI topology where the ConnectX
has a main PCI function and a related DMA Direct function linked to the GPU.
This platform requires specific ACS flags in the PCI topology for reliable
operation, this tool helps users generate ACS settings for the local system.
""",
)
subparsers = parser.add_subparsers(title="Sub Commands", dest="command")
subparsers.required = True
commands = [I for I in load_all_commands(__name__)]
commands.sort()
# build sub parsers for all the loaded commands
for k, fn, argsfn in commands:
sparser = subparsers.add_parser(
k[4:].replace("_", "-"), aliases=get_cmd_aliases(fn), help=fn.__doc__
)
sparser.required = True
argsfn(sparser)
sparser.set_defaults(func=fn)
try:
import argcomplete
argcomplete.autocomplete(parser)
except ImportError:
pass
# argparse will set 'func' to the cmd_* that executes this command
args = parser.parse_args()
try:
args.func(args)
except CommandError as e:
print(f"E: {e}")
sys.exit(100)
main()