blob: 176ac286b0346b71dff13948b7cfae0511323be1 [file] [edit]
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2020 NVIDIA Corporation . All rights reserved. See COPYING file
import unittest
import errno
from tests.mlx5_base import Mlx5DcResources, Mlx5RDMATestCase, Mlx5DcStreamsRes
from pyverbs.pyverbs_error import PyverbsRDMAError
from pyverbs.providers.mlx5.mlx5dv import Mlx5QP
from pyverbs.libibverbs_enums import ibv_access_flags, ibv_qp_create_send_ops_flags, ibv_wr_opcode, \
ibv_odp_transport_cap_bits, ibv_qp_attr_mask, ibv_qp_state
import tests.utils as u
class OdpDc(Mlx5DcResources):
def create_mr(self):
try:
self.mr = u.create_custom_mr(self, ibv_access_flags.IBV_ACCESS_ON_DEMAND)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Reg ODP MR is not supported')
raise ex
class DCTest(Mlx5RDMATestCase):
def setUp(self):
super().setUp()
self.iters = 10
self.server = None
self.client = None
self.traffic_args = None
def sync_remote_attr(self):
"""
Exchange the remote attributes between the server and the client.
"""
super().sync_remote_attr()
self.client.remote_dct_num = self.server.dct_qp.qp_num
self.server.remote_dct_num = self.client.dct_qp.qp_num
def test_dc_rdma_write(self):
self.create_players(Mlx5DcResources, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_RDMA_WRITE)
u.rdma_traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_RDMA_WRITE)
def test_dc_send(self):
self.create_players(Mlx5DcResources, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_SEND)
u.traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_SEND)
def test_dc_atomic(self):
self.create_players(Mlx5DcResources, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD)
client_max_log = self.client.ctx.query_mlx5_device().max_dc_rd_atom
server_max_log = self.server.ctx.query_mlx5_device().max_dc_rd_atom
u.atomic_traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_ATOMIC_FETCH_AND_ADD,
client_wr=client_max_log, server_wr=server_max_log)
def test_dc_ah_to_qp_mapping(self):
self.create_players(Mlx5DcResources, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_SEND)
client_ah = u.get_global_ah(self.client, self.gid_index, self.ib_port)
try:
Mlx5QP.map_ah_to_qp(client_ah, self.server.qps[0].qp_num)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Mapping AH to QP is not supported')
raise ex
u.traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_SEND)
def check_odp_dc_support(self):
"""
Check if the device supports ODP with DC.
:raises SkipTest: In case ODP is not supported with DC
"""
dc_odp_caps = self.server.ctx.query_mlx5_device().dc_odp_caps
required_odp_caps = ibv_odp_transport_cap_bits.IBV_ODP_SUPPORT_SEND | \
ibv_odp_transport_cap_bits.IBV_ODP_SUPPORT_SRQ_RECV
if required_odp_caps & dc_odp_caps != required_odp_caps:
raise unittest.SkipTest('ODP is not supported using DC')
def test_odp_dc_traffic(self):
send_ops_flag = ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_SEND
self.create_players(OdpDc, qp_count=2, send_ops_flags=send_ops_flag)
self.check_odp_dc_support()
u.traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_SEND)
def test_dc_rdma_write_stream(self):
"""
Check good flow of DCS.
Calculate stream_id for DCS test by setting same stream id
twice for WR and after increase it. Setting goes by loop
and after stream_id is more than number of concurrent
streams + 1 then stream_id returns to 1.
:raises SkipTest: In case DCI is not supported with HW
"""
self.create_players(Mlx5DcStreamsRes, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_RDMA_WRITE)
u.rdma_traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_RDMA_WRITE)
def test_dc_stream_qp_recovery(self):
"""
Test DC QP error state transition with stream channel error accumulation.
Creates DC QPs with restricted MR access and generates remote access errors
via RDMA_WRITE operations. Verifies QP transitions to ERR state after enough
channels entered error mode. Validates QP recovery after reset.
"""
self.create_players(Mlx5DcStreamsRes, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_RDMA_WRITE,
mr_access=ibv_access_flags.IBV_ACCESS_LOCAL_WRITE)
qp_idx = 0
error_threshold = self.client.dcis[qp_idx]['errored']
u.traffic(**self.traffic_args, new_send=True, send_op=ibv_wr_opcode.IBV_WR_SEND)
for _ in range(error_threshold):
with self.assertRaisesRegex(PyverbsRDMAError, r'Remote access error'):
u.rdma_traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_RDMA_WRITE)
# Retry mechanism: QP state update to ERR takes time after errors occur
qp_in_err_state = False
for _ in range(3):
qp_attr, _ = self.client.qps[qp_idx].query(ibv_qp_attr_mask.IBV_QP_STATE)
if qp_attr.cur_qp_state == ibv_qp_state.IBV_QPS_ERR:
qp_in_err_state = True
break
if not qp_in_err_state:
raise PyverbsRDMAError(f'QP is not in ERR state after {error_threshold} errors')
for qp_idx in range(self.client.qp_count):
self.client.reset_qp(qp_idx)
for qp_idx in range(self.server.qp_count):
self.server.reset_qp(qp_idx)
u.traffic(**self.traffic_args, new_send=True, send_op=ibv_wr_opcode.IBV_WR_SEND)
def test_dc_stream_ids_recovery(self):
"""
Test DC stream ID reset functionality after remote access errors.
Creates DC QPs with restricted MR access and generates
remote access errors via RDMA_WRITE operations. After each error, resets
the stream ID and verifies QP remains functional.
Validates normal SEND traffic continues to work after stream resets.
"""
self.create_players(Mlx5DcStreamsRes, qp_count=2,
send_ops_flags=ibv_qp_create_send_ops_flags.IBV_QP_EX_WITH_RDMA_WRITE,
mr_access=ibv_access_flags.IBV_ACCESS_LOCAL_WRITE)
qp_idx = 0
error_threshold = self.client.dcis[qp_idx]['errored']
u.traffic(**self.traffic_args, new_send=True, send_op=ibv_wr_opcode.IBV_WR_SEND)
for _ in range(error_threshold):
with self.assertRaisesRegex(PyverbsRDMAError, r'Remote access error'):
u.rdma_traffic(**self.traffic_args, new_send=True,
send_op=ibv_wr_opcode.IBV_WR_RDMA_WRITE)
self.client.dci_reset_stream_id(qp_idx)
qp_attr, _ = self.client.qps[qp_idx].query(ibv_qp_attr_mask.IBV_QP_STATE)
if qp_attr.cur_qp_state == ibv_qp_state.IBV_QPS_ERR:
raise PyverbsRDMAError('QP is in ERR state after reset stream id')
u.traffic(**self.traffic_args, new_send=True, send_op=ibv_wr_opcode.IBV_WR_SEND)