| # SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB) |
| # Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved. See COPYING file |
| """ |
| Provide some useful helper function for pyverbs rdmacm' tests. |
| """ |
| import sys |
| from tests.utils import validate, poll_cq, get_send_elements, get_recv_wr |
| from tests.base_rdmacm import AsyncCMResources, SyncCMResources |
| from pyverbs.cmid import CMEvent, AddrInfo, JoinMCAttrEx |
| from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError |
| import pyverbs.cm_enums as ce |
| from pyverbs.addr import AH |
| import pyverbs.enums as e |
| import abc |
| import errno |
| |
| GRH_SIZE = 40 |
| MULTICAST_QPN = 0xffffff |
| REJECT_MSG = 'connection rejected' |
| |
| |
| class CMConnection(abc.ABC): |
| """ |
| RDMA CM base abstract connection class. The class contains the rdmacm |
| resources and other methods to easily establish a connection and run |
| traffic using the rdmacm resources. Each type of connection or traffic |
| should inherit from this class and implement the necessary methods such as |
| connection establishment and traffic. |
| """ |
| def __init__(self, syncer=None, notifier=None): |
| """ |
| Initializes a connection object. |
| :param syncer: Barrier object to sync between all the test processes. |
| :param notifier: Queue object to pass objects between the connection |
| sides. |
| """ |
| self.syncer = syncer |
| self.notifier = notifier |
| self.cm_res = None |
| |
| def rdmacm_traffic(self, server=None, multicast=False): |
| """ |
| Run rdmacm traffic. This method runs the compatible traffic flow |
| depending on the CMResources. If self.with_ext_qp is set the traffic |
| will go through the external QP. |
| :param server: Run as server. |
| :param multicast: Run multicast traffic. |
| """ |
| server = server if server is not None else self.cm_res.passive |
| if self.cm_res.with_ext_qp: |
| if server: |
| self._ext_qp_server_traffic() |
| else: |
| self._ext_qp_client_traffic() |
| elif multicast: |
| if server: |
| self._cmid_server_multicast_traffic() |
| else: |
| self._cmid_client_multicast_traffic() |
| else: |
| if server: |
| self._cmid_server_traffic() |
| else: |
| self._cmid_client_traffic() |
| |
| def remote_traffic(self, passive, remote_op='write'): |
| """ |
| Run rdmacm remote traffic. This method runs RDMA remote traffic from |
| the active to the passive. |
| :param passive: If True, run as server. |
| :param remote_op: 'write'/'read', The type of the RDMA remote operation. |
| """ |
| msg_size = self.cm_res.msg_size |
| if passive: |
| self.cm_res.mr.write((msg_size) * 's', msg_size) |
| mr_details = (self.cm_res.mr.rkey, self.cm_res.mr.buf) |
| self.notifier.put(mr_details) |
| self.syncer.wait() |
| self.syncer.wait() |
| if remote_op == 'write': |
| msg_received = self.cm_res.mr.read(msg_size, 0) |
| validate(msg_received, True, msg_size) |
| else: |
| self.cm_res.mr.write((msg_size) * 'c', msg_size) |
| self.syncer.wait() |
| rkey, remote_addr = self.notifier.get() |
| cmid = self.cm_res.cmid |
| post_func = cmid.post_write if remote_op == 'write' else \ |
| cmid.post_read |
| for _ in range(self.cm_res.num_msgs): |
| post_func(self.cm_res.mr, msg_size, remote_addr, rkey, |
| flags=e.IBV_SEND_SIGNALED) |
| cmid.get_send_comp() |
| self.syncer.wait() |
| if remote_op == 'read': |
| msg_received = self.cm_res.mr.read(msg_size, 0) |
| validate(msg_received, False, msg_size) |
| |
| def _ext_qp_server_traffic(self): |
| """ |
| RDMACM server side traffic function which sends and receives a message, |
| and then validates the received message. This traffic method uses the CM |
| external QP and CQ for send, recv and get_completion. |
| :return: None |
| """ |
| recv_wr = get_recv_wr(self.cm_res) |
| self.cm_res.qp.post_recv(recv_wr) |
| self.syncer.wait() |
| for _ in range(self.cm_res.num_msgs): |
| poll_cq(self.cm_res.cq) |
| self.cm_res.qp.post_recv(recv_wr) |
| msg_received = self.cm_res.mr.read(self.cm_res.msg_size, 0) |
| validate(msg_received, self.cm_res.passive, self.cm_res.msg_size) |
| send_wr = get_send_elements(self.cm_res, self.cm_res.passive)[0] |
| self.cm_res.qp.post_send(send_wr) |
| poll_cq(self.cm_res.cq) |
| |
| def _ext_qp_client_traffic(self): |
| """ |
| RDMACM client side traffic function which sends and receives a message, |
| and then validates the received message. This traffic method uses the CM |
| external QP and CQ for send, recv and get_completion. |
| :return: None |
| """ |
| recv_wr = get_recv_wr(self.cm_res) |
| self.syncer.wait() |
| for _ in range(self.cm_res.num_msgs): |
| send_wr = get_send_elements(self.cm_res, self.cm_res.passive)[0] |
| self.cm_res.qp.post_send(send_wr) |
| poll_cq(self.cm_res.cq) |
| self.cm_res.qp.post_recv(recv_wr) |
| poll_cq(self.cm_res.cq) |
| msg_received = self.cm_res.mr.read(self.cm_res.msg_size, 0) |
| validate(msg_received, self.cm_res.passive, self.cm_res.msg_size) |
| |
| def _cmid_server_traffic(self): |
| """ |
| RDMACM server side traffic function which sends and receives a message, |
| and then validates the received message. This traffic method uses the |
| RDMACM API for send, recv and get_completion. |
| :return: None |
| """ |
| grh_offset = GRH_SIZE if self.cm_res.qp_type == e.IBV_QPT_UD else 0 |
| send_msg = (self.cm_res.msg_size + grh_offset) * 's' |
| cmid = self.cm_res.child_id |
| for _ in range(self.cm_res.num_msgs): |
| cmid.post_recv(self.cm_res.mr) |
| self.syncer.wait() |
| self.syncer.wait() |
| wc = cmid.get_recv_comp() |
| msg_received = self.cm_res.mr.read(self.cm_res.msg_size, grh_offset) |
| validate(msg_received, True, self.cm_res.msg_size) |
| if self.cm_res.port_space == ce.RDMA_PS_TCP: |
| self.cm_res.mr.write(send_msg, self.cm_res.msg_size) |
| cmid.post_send(self.cm_res.mr) |
| else: |
| ah = AH(cmid.pd, wc=wc, port_num=self.cm_res.ib_port, grh=self.cm_res.mr.buf) |
| rqpn = self.cm_res.remote_qpn |
| self.cm_res.mr.write(send_msg, self.cm_res.msg_size + GRH_SIZE) |
| cmid.post_ud_send(self.cm_res.mr, ah, rqpn=rqpn, |
| length=self.cm_res.msg_size) |
| cmid.get_send_comp() |
| self.syncer.wait() |
| |
| def _cmid_client_traffic(self): |
| """ |
| RDMACM client side traffic function which sends and receives a message, |
| and then validates the received message. This traffic method uses the |
| RDMACM API for send, recv and get_completion. |
| :return: None |
| """ |
| grh_offset = GRH_SIZE if self.cm_res.qp_type == e.IBV_QPT_UD else 0 |
| send_msg = (self.cm_res.msg_size + grh_offset) * 'c' |
| cmid = self.cm_res.cmid |
| for _ in range(self.cm_res.num_msgs): |
| self.cm_res.mr.write(send_msg, self.cm_res.msg_size + grh_offset) |
| self.syncer.wait() |
| if self.cm_res.port_space == ce.RDMA_PS_TCP: |
| cmid.post_send(self.cm_res.mr) |
| else: |
| ah = AH(cmid.pd, attr=self.cm_res.ud_params.ah_attr) |
| cmid.post_ud_send(self.cm_res.mr, ah, rqpn=self.cm_res.ud_params.qp_num, |
| length=self.cm_res.msg_size) |
| cmid.get_send_comp() |
| cmid.post_recv(self.cm_res.mr) |
| self.syncer.wait() |
| self.syncer.wait() |
| cmid.get_recv_comp() |
| msg_received = self.cm_res.mr.read(self.cm_res.msg_size, grh_offset) |
| validate(msg_received, False, self.cm_res.msg_size) |
| |
| def _cmid_server_multicast_traffic(self): |
| """ |
| RDMACM server side multicast traffic function which receives a message, |
| and then validates its data. |
| """ |
| for _ in range(self.cm_res.num_msgs): |
| self.cm_res.cmid.post_recv(self.cm_res.mr) |
| self.syncer.wait() |
| self.syncer.wait() |
| self.cm_res.cmid.get_recv_comp() |
| msg_received = self.cm_res.mr.read(self.cm_res.msg_size, GRH_SIZE) |
| validate(msg_received, True, self.cm_res.msg_size) |
| |
| def _cmid_client_multicast_traffic(self): |
| """ |
| RDMACM client side multicast traffic function which sends a message to |
| the multicast group. |
| """ |
| send_msg = (self.cm_res.msg_size + GRH_SIZE) * 'c' |
| for _ in range(self.cm_res.num_msgs): |
| self.cm_res.mr.write(send_msg, self.cm_res.msg_size + GRH_SIZE) |
| self.syncer.wait() |
| ah = AH(self.cm_res.cmid.pd, attr=self.cm_res.ud_params.ah_attr) |
| self.cm_res.cmid.post_ud_send(self.cm_res.mr, ah, rqpn=MULTICAST_QPN, |
| length=self.cm_res.msg_size) |
| self.cm_res.cmid.get_send_comp() |
| self.syncer.wait() |
| |
| def event_handler(self, expected_event=None): |
| """ |
| Handle and execute corresponding API for RDMACM events of asynchronous |
| communication. |
| :param expected_event: The user expected event. |
| :return: None |
| """ |
| cm_event = CMEvent(self.cm_res.cmid.event_channel) |
| if cm_event.event_type == ce.RDMA_CM_EVENT_CONNECT_REQUEST: |
| self.cm_res.create_child_id(cm_event) |
| elif cm_event.event_type in [ce.RDMA_CM_EVENT_ESTABLISHED, |
| ce.RDMA_CM_EVENT_MULTICAST_JOIN]: |
| self.cm_res.set_ud_params(cm_event) |
| if expected_event and expected_event != cm_event.event_type: |
| raise PyverbsError('Expected this event: {}, got this event: {}'. |
| format(expected_event, cm_event.event_str())) |
| if expected_event == ce.RDMA_CM_EVENT_REJECTED: |
| assert cm_event.private_data[:len(REJECT_MSG)].decode() == REJECT_MSG, \ |
| f'CM event data ({cm_event.private_data}) is different than the expected ({REJECT_MSG})' |
| cm_event.ack_cm_event() |
| |
| @abc.abstractmethod |
| def establish_connection(self): |
| pass |
| |
| @abc.abstractmethod |
| def disconnect(self): |
| pass |
| |
| |
| class CMAsyncConnection(CMConnection): |
| """ |
| Implement RDMACM connection management for asynchronous CMIDs. It includes |
| connection establishment, disconnection and other methods such as traffic. |
| """ |
| def __init__(self, ip_addr, syncer=None, notifier=None, passive=False, |
| num_conns=1, qp_timeout=-1, reject_conn=False, **kwargs): |
| """ |
| Init the CMConnection and then init the AsyncCMResources. |
| :param ip_addr: IP address to use. |
| :param syncer: Barrier object to sync between all the test processes. |
| :param notifier: Queue object to pass objects between the connection |
| sides. |
| :param passive: Indicate if it's a passive side. |
| :param num_conns: Number of connections. |
| :param qp_timeout: Value of the QP timeout. |
| :param reject_conn: True if the server will reject the connection. |
| :param kwargs: Arguments used to initialize the CM resources. For more |
| info please check CMResources. |
| """ |
| super(CMAsyncConnection, self).__init__(syncer=syncer, notifier=notifier) |
| self.num_conns = num_conns |
| self.create_cm_res(ip_addr, passive=passive, **kwargs) |
| self.qp_timeout = qp_timeout |
| self.reject_conn = reject_conn |
| |
| def create_cm_res(self, ip_addr, passive, **kwargs): |
| self.cm_res = AsyncCMResources(addr=ip_addr, passive=passive, **kwargs) |
| if passive: |
| self.cm_res.create_cmid() |
| else: |
| for i in range(self.num_conns): |
| self.cm_res.create_cmid(i) |
| |
| def join_to_multicast(self, mc_addr=None, src_addr=None, extended=False): |
| """ |
| Join the CMID to multicast group. |
| :param mc_addr: The multicast IP address. |
| :param src_addr: The CMIDs source address. |
| :param extended: Use the join_multicast_ex API. |
| """ |
| self.cm_res.cmid.bind_addr(self.cm_res.ai) |
| resolve_addr_info = AddrInfo(src=src_addr, dst=mc_addr) |
| self.cm_res.cmid.resolve_addr(resolve_addr_info) |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_ADDR_RESOLVED) |
| self.cm_res.create_qp() |
| mc_addr_info = AddrInfo(src=mc_addr) |
| if not extended: |
| self.cm_res.cmid.join_multicast(addr=mc_addr_info) |
| else: |
| flags = ce.RDMA_MC_JOIN_FLAG_FULLMEMBER |
| comp_mask = ce.RDMA_CM_JOIN_MC_ATTR_ADDRESS | \ |
| ce.RDMA_CM_JOIN_MC_ATTR_JOIN_FLAGS |
| mcattr = JoinMCAttrEx(addr=mc_addr_info, comp_mask=comp_mask, |
| join_flags=flags) |
| self.cm_res.cmid.join_multicast(mc_attr=mcattr) |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_MULTICAST_JOIN) |
| self.cm_res.create_mr() |
| |
| def leave_multicast(self, mc_addr=None): |
| """ |
| Leave multicast group. |
| :param mc_addr: The multicast IP address. |
| """ |
| mc_addr_info = AddrInfo(src=mc_addr) |
| self.cm_res.cmid.leave_multicast(mc_addr_info) |
| |
| def establish_connection(self): |
| """ |
| Establish RDMACM connection between two Async CMIDs. |
| """ |
| if self.cm_res.passive: |
| self.cm_res.cmid.bind_addr(self.cm_res.ai) |
| self.cm_res.cmid.listen() |
| for conn_idx in range(self.num_conns): |
| if self.cm_res.passive: |
| self.syncer.wait() |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_CONNECT_REQUEST) |
| self.cm_res.create_qp(conn_idx=conn_idx) |
| if self.qp_timeout >= 0: |
| self.set_qp_timeout(self.cm_res.child_ids[conn_idx], self.qp_timeout) |
| if self.cm_res.with_ext_qp: |
| self.set_cmids_qp_ece(self.cm_res.passive) |
| self.cm_res.modify_ext_qp_to_rts(conn_idx=conn_idx) |
| self.set_cmid_ece(self.cm_res.passive) |
| child_id = self.cm_res.child_ids[conn_idx] |
| if self.reject_conn: |
| child_id.reject(REJECT_MSG.encode()) |
| return |
| child_id.accept(self.cm_res.create_conn_param(conn_idx=conn_idx)) |
| if self.qp_timeout >= 0: |
| attr, _ = child_id.query_qp(e.IBV_QP_TIMEOUT) |
| assert self.qp_timeout == attr.timeout |
| if self.cm_res.port_space == ce.RDMA_PS_TCP: |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_ESTABLISHED) |
| else: |
| cmid = self.cm_res.cmids[conn_idx] |
| cmid.resolve_addr(self.cm_res.ai) |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_ADDR_RESOLVED) |
| self.syncer.wait() |
| cmid.resolve_route() |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_ROUTE_RESOLVED) |
| self.cm_res.create_qp(conn_idx=conn_idx) |
| if self.qp_timeout >= 0: |
| self.set_qp_timeout(self.cm_res.cmid, self.qp_timeout) |
| if self.cm_res.with_ext_qp: |
| self.set_cmid_ece(self.cm_res.passive) |
| cmid.connect(self.cm_res.create_conn_param(conn_idx=conn_idx)) |
| if self.cm_res.with_ext_qp: |
| self.event_handler(expected_event=\ |
| ce.RDMA_CM_EVENT_CONNECT_RESPONSE) |
| self.set_cmids_qp_ece(self.cm_res.passive) |
| self.cm_res.modify_ext_qp_to_rts(conn_idx=conn_idx) |
| cmid.establish() |
| else: |
| if self.reject_conn: |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_REJECTED) |
| return |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_ESTABLISHED) |
| if self.qp_timeout >= 0: |
| attr, _ = self.cm_res.cmid.query_qp(e.IBV_QP_TIMEOUT) |
| assert self.qp_timeout == attr.timeout |
| self.cm_res.create_mr() |
| self.sync_qp_numbers() |
| |
| def set_qp_timeout(self, cm_id, ack_timeout): |
| cm_id.set_option(ce.RDMA_OPTION_ID, ce.RDMA_OPTION_ID_ACK_TIMEOUT, ack_timeout, 1) |
| |
| def sync_qp_numbers(self): |
| """ |
| Sync the QP numbers of the connections sides. |
| """ |
| if self.cm_res.passive: |
| self.syncer.wait() |
| self.notifier.put(self.cm_res.my_qp_number()) |
| self.syncer.wait() |
| self.cm_res.remote_qpn = self.notifier.get() |
| else: |
| self.syncer.wait() |
| self.cm_res.remote_qpn = self.notifier.get() |
| self.notifier.put(self.cm_res.my_qp_number()) |
| self.syncer.wait() |
| |
| def disconnect(self): |
| """ |
| Disconnect the connection. |
| """ |
| if self.cm_res.port_space == ce.RDMA_PS_TCP: |
| if self.cm_res.passive: |
| for child_id in self.cm_res.child_ids.values(): |
| child_id.disconnect() |
| else: |
| self.event_handler(expected_event=ce.RDMA_CM_EVENT_DISCONNECTED) |
| for cmid in self.cm_res.cmids.values(): |
| cmid.disconnect() |
| |
| def set_cmid_ece(self, passive): |
| """ |
| Set the local CMIDs ECE. The ECE is taken from the CMIDs QP ECE. |
| :param passive: Indicates if this CMID is participate as passive in |
| this connection. |
| """ |
| cmid = self.cm_res.child_id if passive else self.cm_res.cmid |
| try: |
| ece = self.cm_res.qp.query_ece() |
| cmid.set_local_ece(ece) |
| except PyverbsRDMAError as ex: |
| if ex.error_code != errno.EOPNOTSUPP: |
| raise ex |
| |
| def set_cmids_qp_ece(self, passive): |
| """ |
| Set the CMIDs QP ECE. |
| :param passive: Indicates if this CMID is participate as passive in |
| this connection. |
| """ |
| cmid = self.cm_res.child_id if passive else self.cm_res.cmid |
| try: |
| ece = cmid.get_remote_ece() |
| self.cm_res.qp.set_ece(ece) |
| except PyverbsRDMAError as ex: |
| if ex.error_code != errno.EOPNOTSUPP: |
| raise ex |
| |
| class CMSyncConnection(CMConnection): |
| """ |
| Implement RDMACM connection management for synchronous CMIDs. It includes |
| connection establishment, disconnection and other methods such as traffic. |
| """ |
| def __init__(self, ip_addr, syncer=None, notifier=None, passive=False, **kwargs): |
| """ |
| Init the CMConnection and then init the SyncCMResources. |
| :param ip_addr: IP address to use. |
| :param syncer: Barrier object to sync between all the test processes. |
| :param notifier: Queue object to pass objects between the connection |
| sides. |
| :param passive: Indicate if it's a passive side. |
| :param kwargs: Arguments used to initialize the CM resources. For more |
| info please check CMResources. |
| """ |
| super(CMSyncConnection, self).__init__(syncer=syncer, notifier=notifier) |
| self.create_cm_res(ip_addr, passive=passive, **kwargs) |
| |
| def create_cm_res(self, ip_addr, passive, **kwargs): |
| self.cm_res = SyncCMResources(addr=ip_addr, passive=passive, **kwargs) |
| self.cm_res.create_cmid() |
| |
| def establish_connection(self): |
| """ |
| Establish RDMACM connection between two Sync CMIDs. |
| """ |
| if self.cm_res.passive: |
| self.cm_res.cmid.listen() |
| self.syncer.wait() |
| self.cm_res.create_child_id() |
| self.cm_res.child_id.accept() |
| self.cm_res.create_mr() |
| else: |
| self.syncer.wait() |
| self.cm_res.cmid.connect() |
| self.cm_res.create_mr() |
| |
| def disconnect(self): |
| """ |
| Disconnect the connection. |
| """ |
| if self.cm_res.port_space == ce.RDMA_PS_TCP: |
| if self.cm_res.passive: |
| self.cm_res.child_id.disconnect() |
| else: |
| self.cm_res.cmid.disconnect() |