| import unittest |
| import random |
| import errno |
| |
| from pyverbs.qp import QPCap, QPInitAttrEx, QPAttr, QPEx, QP |
| from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError |
| from pyverbs.mr import MW, MWBindInfo |
| from pyverbs.base import inc_rkey |
| from tests.utils import wc_status_to_str |
| import pyverbs.enums as e |
| |
| from tests.base import UDResources, RCResources, RDMATestCase, XRCResources |
| import tests.utils as u |
| |
| class QpExUDSend(UDResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND) |
| |
| |
| class QpExRCSend(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND) |
| |
| |
| class QpExXRCSend(XRCResources): |
| def create_qps(self): |
| qp_attr = QPAttr(port_num=self.ib_port) |
| qp_attr.pkey_index = 0 |
| for _ in range(self.qp_count): |
| attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV, |
| comp_mask=e.IBV_QP_INIT_ATTR_XRCD, |
| xrcd=self.xrcd) |
| qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \ |
| e.IBV_ACCESS_REMOTE_READ |
| recv_qp = QP(self.ctx, attr_ex, qp_attr) |
| self.rqp_lst.append(recv_qp) |
| |
| send_qp = u.create_qp_ex(self, e.IBV_QPT_XRC_SEND, e.IBV_QP_EX_WITH_SEND) |
| self.sqp_lst.append(send_qp) |
| self.qps_num.append((recv_qp.qp_num, send_qp.qp_num)) |
| self.psns.append(random.getrandbits(24)) |
| |
| |
| class QpExUDSendImm(UDResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_UD, e.IBV_QP_EX_WITH_SEND_WITH_IMM) |
| |
| |
| class QpExRCSendImm(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_SEND_WITH_IMM) |
| |
| |
| class QpExXRCSendImm(XRCResources): |
| def create_qps(self): |
| qp_attr = QPAttr(port_num=self.ib_port) |
| qp_attr.pkey_index = 0 |
| for _ in range(self.qp_count): |
| attr_ex = QPInitAttrEx(qp_type=e.IBV_QPT_XRC_RECV, |
| comp_mask=e.IBV_QP_INIT_ATTR_XRCD, |
| xrcd=self.xrcd) |
| qp_attr.qp_access_flags = e.IBV_ACCESS_REMOTE_WRITE | \ |
| e.IBV_ACCESS_REMOTE_READ |
| recv_qp = QP(self.ctx, attr_ex, qp_attr) |
| self.rqp_lst.append(recv_qp) |
| |
| send_qp = u.create_qp_ex(self, e.IBV_QPT_XRC_SEND, |
| e.IBV_QP_EX_WITH_SEND_WITH_IMM) |
| self.sqp_lst.append(send_qp) |
| self.qps_num.append((recv_qp.qp_num, send_qp.qp_num)) |
| self.psns.append(random.getrandbits(24)) |
| |
| |
| class QpExRCFlush(RCResources): |
| ptype = e.IBV_FLUSH_GLOBAL |
| level = e.IBV_FLUSH_RANGE |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_FLUSH | e.IBV_QP_EX_WITH_RDMA_WRITE) |
| |
| def create_mr(self): |
| try: |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_FLUSH_GLOBAL | e.IBV_ACCESS_REMOTE_WRITE) |
| except PyverbsRDMAError as ex: |
| if ex.error_code == errno.EINVAL: |
| raise unittest.SkipTest('Create mr with IBV_ACCESS_FLUSH_GLOBAL access flag is not supported in kernel') |
| raise ex |
| |
| |
| class QpExRCAtomicWrite(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_ATOMIC_WRITE) |
| |
| def create_mr(self): |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) |
| |
| |
| class QpExRCRDMAWrite(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_WRITE) |
| |
| def create_mr(self): |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) |
| |
| |
| class QpExRCRDMAWriteImm(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, |
| e.IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM) |
| |
| def create_mr(self): |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE) |
| |
| |
| class QpExRCRDMARead(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_READ) |
| |
| def create_mr(self): |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_READ) |
| |
| |
| class QpExRCAtomicCmpSwp(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, |
| e.IBV_QP_EX_WITH_ATOMIC_CMP_AND_SWP) |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC) |
| |
| |
| class QpExRCAtomicFetchAdd(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, |
| e.IBV_QP_EX_WITH_ATOMIC_FETCH_AND_ADD) |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_ATOMIC) |
| |
| |
| class QpExRCBindMw(RCResources): |
| def create_qps(self): |
| u.create_qp_ex(self, e.IBV_QPT_RC, e.IBV_QP_EX_WITH_RDMA_WRITE | |
| e.IBV_QP_EX_WITH_BIND_MW) |
| |
| def create_mr(self): |
| self.mr = u.create_custom_mr(self, e.IBV_ACCESS_REMOTE_WRITE | |
| e.IBV_ACCESS_MW_BIND) |
| |
| |
| class QpExTestCase(RDMATestCase): |
| """ Run traffic using the new post send API. """ |
| def setUp(self): |
| super().setUp() |
| self.iters = 100 |
| |
| def test_qp_ex_ud_send(self): |
| self.create_players(QpExUDSend) |
| u.traffic(**self.traffic_args, new_send=True, send_op=e.IBV_WR_SEND) |
| |
| def test_qp_ex_ud_zero_size(self): |
| self.create_players(QpExUDSend) |
| self.client.msg_size = 0 |
| self.server.msg_size = 0 |
| u.traffic(**self.traffic_args, new_send=True, send_op=e.IBV_WR_SEND) |
| |
| def test_qp_ex_rc_send(self): |
| self.create_players(QpExRCSend) |
| u.traffic(**self.traffic_args, new_send=True, send_op=e.IBV_WR_SEND) |
| |
| def test_qp_ex_xrc_send(self): |
| self.create_players(QpExXRCSend) |
| u.xrc_traffic(self.client, self.server, send_op=e.IBV_WR_SEND) |
| |
| def test_qp_ex_ud_send_imm(self): |
| self.create_players(QpExUDSendImm) |
| u.traffic(**self.traffic_args, new_send=True, send_op=e.IBV_WR_SEND_WITH_IMM) |
| |
| def test_qp_ex_rc_send_imm(self): |
| self.create_players(QpExRCSendImm) |
| u.traffic(**self.traffic_args, new_send=True, send_op=e.IBV_WR_SEND_WITH_IMM) |
| |
| def test_qp_ex_xrc_send_imm(self): |
| self.create_players(QpExXRCSendImm) |
| u.xrc_traffic(self.client, self.server, send_op=e.IBV_WR_SEND_WITH_IMM) |
| |
| def test_qp_ex_rc_flush(self): |
| self.create_players(QpExRCFlush) |
| wcs = u.flush_traffic(**self.traffic_args, new_send=True, |
| send_op=e.IBV_WR_FLUSH) |
| if wcs[0].status != e.IBV_WC_SUCCESS: |
| raise PyverbsError(f'Unexpected {wc_status_to_str(wcs[0].status)}') |
| |
| self.client.level = e.IBV_FLUSH_MR |
| wcs = u.flush_traffic(**self.traffic_args, new_send=True, |
| send_op=e.IBV_WR_FLUSH) |
| if wcs[0].status != e.IBV_WC_SUCCESS: |
| raise PyverbsError(f'Unexpected {wc_status_to_str(wcs[0].status)}') |
| |
| def test_qp_ex_rc_flush_type_violate(self): |
| self.create_players(QpExRCFlush) |
| self.client.ptype = e.IBV_FLUSH_PERSISTENT |
| wcs = u.flush_traffic(**self.traffic_args, new_send=True, |
| send_op=e.IBV_WR_FLUSH) |
| if wcs[0].status != e.IBV_WC_REM_ACCESS_ERR: |
| raise PyverbsError(f'Expected errors {wc_status_to_str(e.IBV_WC_REM_ACCESS_ERR)} - got {wc_status_to_str(wcs[0].status)}') |
| |
| def test_qp_ex_rc_atomic_write(self): |
| self.create_players(QpExRCAtomicWrite) |
| self.client.msg_size = 8 |
| self.server.msg_size = 8 |
| u.rdma_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_ATOMIC_WRITE) |
| |
| def test_qp_ex_rc_rdma_write(self): |
| self.create_players(QpExRCRDMAWrite) |
| u.rdma_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_RDMA_WRITE) |
| |
| def test_qp_ex_rc_rdma_write_imm(self): |
| self.create_players(QpExRCRDMAWriteImm) |
| u.traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_RDMA_WRITE_WITH_IMM) |
| |
| def test_qp_ex_rc_rdma_write_zero_size(self): |
| self.create_players(QpExRCRDMAWrite) |
| self.client.msg_size = 0 |
| self.server.msg_size = 0 |
| u.rdma_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_RDMA_WRITE) |
| |
| def test_qp_ex_rc_rdma_read(self): |
| self.create_players(QpExRCRDMARead) |
| self.server.mr.write('s' * self.server.msg_size, self.server.msg_size) |
| u.rdma_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_RDMA_READ) |
| |
| def test_qp_ex_rc_rdma_read_zero_size(self): |
| self.create_players(QpExRCRDMARead) |
| self.client.msg_size = 0 |
| self.server.msg_size = 0 |
| self.server.mr.write('s' * self.server.msg_size, self.server.msg_size) |
| u.rdma_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_RDMA_READ) |
| |
| def test_qp_ex_rc_atomic_cmp_swp(self): |
| self.create_players(QpExRCAtomicCmpSwp) |
| self.client.msg_size = 8 # Atomic work on 64b operators |
| self.server.msg_size = 8 |
| self.server.mr.write('s' * 8, 8) |
| u.atomic_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_ATOMIC_CMP_AND_SWP) |
| |
| def test_qp_ex_rc_atomic_fetch_add(self): |
| self.create_players(QpExRCAtomicFetchAdd) |
| self.client.msg_size = 8 # Atomic work on 64b operators |
| self.server.msg_size = 8 |
| self.server.mr.write('s' * 8, 8) |
| u.atomic_traffic(**self.traffic_args, |
| new_send=True, send_op=e.IBV_WR_ATOMIC_FETCH_AND_ADD) |
| |
| def test_qp_ex_rc_bind_mw(self): |
| """ |
| Verify bind memory window operation using the new post_send API. |
| Instead of checking through regular pingpong style traffic, we'll |
| do as follows: |
| - Register an MR with remote write access |
| - Bind a MW without remote write permission to the MR |
| - Verify that remote write fails |
| Since it's a unique flow, it's an integral part of that test rather |
| than a utility method. |
| """ |
| self.create_players(QpExRCBindMw) |
| client_sge = u.get_send_elements(self.client, False)[1] |
| # Create a MW and bind it |
| self.server.qp.wr_start() |
| self.server.qp.wr_id = 0x123 |
| self.server.qp.wr_flags = e.IBV_SEND_SIGNALED |
| bind_info = MWBindInfo(self.server.mr, self.server.mr.buf, self.server.mr.length, |
| e.IBV_ACCESS_LOCAL_WRITE) |
| try: |
| mw = MW(self.server.pd, mw_type=e.IBV_MW_TYPE_2) |
| except PyverbsRDMAError as ex: |
| if ex.error_code == errno.EOPNOTSUPP: |
| raise unittest.SkipTest('Memory Window allocation is not supported') |
| raise ex |
| new_key = inc_rkey(mw.rkey) |
| self.server.qp.wr_bind_mw(mw, new_key, bind_info) |
| self.server.qp.wr_complete() |
| u.poll_cq(self.server.cq) |
| # Verify that remote write fails |
| self.client.qp.wr_start() |
| self.client.qp.wr_id = 0x124 |
| self.client.qp.wr_flags = e.IBV_SEND_SIGNALED |
| self.client.qp.wr_rdma_write(new_key, self.server.mr.buf) |
| self.client.qp.wr_set_sge(client_sge) |
| self.client.qp.wr_complete() |
| wcs = u._poll_cq(self.client.cq) |
| if wcs[0].status != e.IBV_WC_REM_ACCESS_ERR: |
| raise PyverbsRDMAError(f'Completion status is {wc_status_to_str(wcs[0].status)}') |
| |
| def test_post_receive_qp_state_bad_flow(self): |
| self.create_players(QpExUDSend) |
| u.post_rq_state_bad_flow(self) |
| |
| def test_post_send_qp_state_bad_flow(self): |
| self.create_players(QpExUDSend) |
| u.post_sq_state_bad_flow(self) |
| |
| def test_full_rq_bad_flow(self): |
| self.create_players(QpExUDSend) |
| u.full_rq_bad_flow(self) |
| |
| def test_rq_with_larger_sgl_bad_flow(self): |
| self.create_players(QpExUDSend) |
| u.create_rq_with_larger_sgl_bad_flow(self) |