| import unittest |
| import struct |
| import errno |
| import json |
| import os |
| |
| from pyverbs.providers.mlx5.mlx5dv_mkey import Mlx5Mkey, Mlx5MrInterleaved, \ |
| Mlx5MkeyConfAttr, Mlx5SigCrc, Mlx5SigBlockDomain, Mlx5SigBlockAttr |
| from pyverbs.providers.mlx5.mlx5dv_crypto import Mlx5CryptoLoginAttr, Mlx5DEK, \ |
| Mlx5DEKInitAttr, Mlx5CryptoAttr, Mlx5CryptoLogin, Mlx5CryptoExtLoginAttr |
| from pyverbs.providers.mlx5.mlx5dv import Mlx5Context, Mlx5DVContextAttr, \ |
| Mlx5DVQPInitAttr, Mlx5QP |
| from pyverbs.pyverbs_error import PyverbsRDMAError, PyverbsUserError |
| from tests.mlx5_base import Mlx5RDMATestCase, Mlx5PyverbsAPITestCase |
| import pyverbs.providers.mlx5.mlx5_enums as dve |
| from pyverbs.wr import SGE, SendWR, RecvWR |
| from pyverbs.qp import QPInitAttrEx, QPCap |
| from tests.base import RCResources |
| from pyverbs.pd import PD |
| import pyverbs.enums as e |
| import tests.utils as u |
| |
| DEK_OPAQUE = b'dek' |
| |
| """ |
| Crytpo operation requires specific input from the user, e.g. the wrapped credential that the |
| device is configured with. The input should be provided in JSON format in a file in this path: |
| "/tmp/mlx5_crypto_test.txt". |
| User can also set this environment variable with his file path: MLX5_CRYPTO_TEST_INFO |
| |
| This doc describes the input option with examples to the format: |
| Mandatory: |
| Wrapped credential: The credential that was configured in the device wrapped. |
| Wrapped key: Wrapped key for the DEK creation. The key should be encrypted using the KEK |
| (Key Encrypted Key). |
| Optional: |
| Wrapped 256 bits key: Wrapped key for the DEK creation when the key size of 256 bits is required. |
| If not provided, that test will only use the key in size of 128 bits. |
| Encrypted data for 512 of 'c': If a user wants to have data validation, he needs to provide |
| expected encrypted data for a plain text of 512 bytes of the |
| character 'c'. If not provided, data validation will be skipped. |
| |
| Example of content of such file: |
| [{"credential": [8704278040424473809, 4403447855848063568, 13892768337045135232, |
| 5942481448427925932, 171338997253969038, 5703425261028721211], |
| "wrapped_key": [contains 5 integers of 64bits each], |
| "plaintext_key": [contains 8 integers of 32bits each], |
| "plaintext_256_bits_key": [contains 16 integers of 32bits each], |
| "encrypted_data_for_512_c": [contains 64 integers of 64bits each], |
| "wrapped_256_bits_key": [contains 9 integers of 64bits each]}] |
| """ |
| |
| |
| def check_crypto_caps(dev_name, is_wrapped_dek_mode, multi_block_support=False): |
| """ |
| Check that this device support crypto actions. |
| :param dev_name: The device name. |
| :param is_wrapped_dek_mode: True when wrapped_dek and False when plaintext. |
| :param multi_block_support: If True, check for multi-block support. |
| """ |
| mlx5dv_attr = Mlx5DVContextAttr() |
| ctx = Mlx5Context(mlx5dv_attr, name=dev_name) |
| crypto_caps = ctx.query_mlx5_device().crypto_caps |
| failed_selftests = crypto_caps['failed_selftests'] |
| if failed_selftests: |
| raise unittest.SkipTest(f'The device crypto selftest failed ({failed_selftests})') |
| single_block_cap = dve.MLX5DV_CRYPTO_ENGINES_CAP_AES_XTS_SINGLE_BLOCK \ |
| & crypto_caps['crypto_engines'] |
| multi_block_cap = dve.MLX5DV_CRYPTO_ENGINES_CAP_AES_XTS_MULTI_BLOCK \ |
| & crypto_caps['crypto_engines'] |
| if not single_block_cap and not multi_block_cap: |
| raise unittest.SkipTest('The device crypto engines does not support AES') |
| elif multi_block_support and not multi_block_cap: |
| raise unittest.SkipTest('The device crypto engines does not support multi blocks') |
| elif multi_block_cap: |
| assert single_block_cap, \ |
| 'The device crypto engines do not support single block but support multi blocks' |
| dev_wrapped_import_method = crypto_caps['wrapped_import_method'] |
| if is_wrapped_dek_mode: |
| if not dve.MLX5DV_CRYPTO_WRAPPED_IMPORT_METHOD_CAP_AES_XTS & dev_wrapped_import_method or \ |
| not dve.MLX5DV_CRYPTO_CAPS_WRAPPED_CRYPTO_OPERATIONAL & crypto_caps['flags']: |
| raise unittest.SkipTest('The device does not support wrapped DEK') |
| elif dve.MLX5DV_CRYPTO_WRAPPED_IMPORT_METHOD_CAP_AES_XTS & dev_wrapped_import_method and \ |
| dve.MLX5DV_CRYPTO_CAPS_WRAPPED_CRYPTO_OPERATIONAL & crypto_caps['flags']: |
| raise unittest.SkipTest('The device does not support plaintext DEK') |
| |
| |
| def require_crypto_login_details(instance): |
| """ |
| Parse the crypto login session details from this file: |
| '/tmp/mlx5_crypto_test.txt' |
| If the file doesn't exists or the content is not in Json format, skip the test. |
| :param instance: The test instance. |
| """ |
| crypto_file = '/tmp/mlx5_crypto_test.txt' |
| if 'MLX5_CRYPTO_TEST_INFO' in os.environ: |
| crypto_file = os.environ['MLX5_CRYPTO_TEST_INFO'] |
| try: |
| with open(crypto_file, 'r') as f: |
| test_details = f.read() |
| setattr(instance, 'crypto_details', test_details) |
| json.loads(test_details) |
| instance.crypto_details = json.loads(test_details)[0] |
| except json.JSONDecodeError: |
| raise unittest.SkipTest(f'The crypto data in {crypto_file} must be in Json format') |
| except FileNotFoundError: |
| raise unittest.SkipTest(f'Crypto login details must be supplied in {crypto_file}') |
| |
| |
| def requires_crypto_support(is_wrapped_dek_mode, multi_block_support=False): |
| """ |
| :param is_wrapped_dek_mode: True when wapped_dek and False when plaintext. |
| :param multi_block_support: If True, check for multi-block support. |
| """ |
| def outer(func): |
| def inner(instance): |
| require_crypto_login_details(instance) |
| check_crypto_caps(instance.dev_name, is_wrapped_dek_mode, multi_block_support) |
| return func(instance) |
| return inner |
| return outer |
| |
| |
| class Mlx5CryptoResources(RCResources): |
| def __init__(self, dev_name, ib_port, gid_index, dv_send_ops_flags=0, |
| mkey_create_flags=dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT, |
| msg_size=1024): |
| self.dv_send_ops_flags = dv_send_ops_flags |
| self.mkey_create_flags = mkey_create_flags |
| self.max_inline_data = 512 |
| self.send_ops_flags = e.IBV_QP_EX_WITH_SEND |
| super().__init__(dev_name, ib_port, gid_index, msg_size=msg_size) |
| self.create_mkeys() |
| |
| def create_mkeys(self): |
| try: |
| self.wire_enc_mkey = Mlx5Mkey(self.pd, self.mkey_create_flags, max_entries=1) |
| self.mem_enc_mkey = Mlx5Mkey(self.pd, self.mkey_create_flags, max_entries=1) |
| except PyverbsRDMAError as ex: |
| if ex.error_code == errno.EOPNOTSUPP: |
| raise unittest.SkipTest('Create Mkey is not supported') |
| raise ex |
| |
| def create_qp_cap(self): |
| return QPCap(max_send_wr=self.num_msgs, max_recv_wr=self.num_msgs, |
| max_inline_data=self.max_inline_data) |
| |
| def create_qp_init_attr(self): |
| comp_mask = e.IBV_QP_INIT_ATTR_PD | e.IBV_QP_INIT_ATTR_SEND_OPS_FLAGS |
| return QPInitAttrEx(cap=self.create_qp_cap(), pd=self.pd, scq=self.cq, |
| rcq=self.cq, qp_type=e.IBV_QPT_RC, |
| send_ops_flags=self.send_ops_flags, |
| comp_mask=comp_mask) |
| |
| def create_qps(self): |
| try: |
| qp_init_attr = self.create_qp_init_attr() |
| comp_mask = dve.MLX5DV_QP_INIT_ATTR_MASK_SEND_OPS_FLAGS |
| attr = Mlx5DVQPInitAttr(comp_mask=comp_mask, |
| send_ops_flags=self.dv_send_ops_flags) |
| qp = Mlx5QP(self.ctx, qp_init_attr, attr) |
| self.qps.append(qp) |
| self.qps_num.append(qp.qp_num) |
| self.psns.append(0) |
| except PyverbsRDMAError as ex: |
| if ex.error_code == errno.EOPNOTSUPP: |
| raise unittest.SkipTest('Create Mlx5DV QP is not supported') |
| raise ex |
| |
| |
| class Mlx5CryptoAPITest(Mlx5PyverbsAPITestCase): |
| def __init__(self, methodName='runTest'): |
| super().__init__(methodName) |
| self.crypto_details = None |
| |
| def verify_create_dek_out_of_login_session(self): |
| """ |
| Verify that create DEK out of crypto login session is not permited. |
| """ |
| with self.assertRaises(PyverbsRDMAError) as ex: |
| Mlx5DEK(self.ctx, self.dek_init_attr) |
| self.assertEqual(ex.exception.error_code, errno.EINVAL) |
| |
| def verify_login_state(self, expected_state): |
| """ |
| Query the session login state and verify that it's as expected. |
| """ |
| state = Mlx5Context.query_login_state(self.ctx) |
| self.assertEqual(state, expected_state) |
| |
| def verify_login_twice(self): |
| """ |
| Verify that when there is already a login session alive the second login |
| fails. |
| """ |
| with self.assertRaises(PyverbsRDMAError) as ex: |
| Mlx5Context.crypto_login(self.ctx, self.login_attr) |
| self.assertEqual(ex.exception.error_code, errno.EEXIST) |
| |
| def verify_dek_opaque(self): |
| """ |
| Query the DEK and verify that its opaque is as expected. |
| """ |
| dek_attr = self.dek.query() |
| self.assertEqual(dek_attr.opaque, DEK_OPAQUE) |
| |
| @requires_crypto_support(is_wrapped_dek_mode=True) |
| def test_mlx5_dek_management(self): |
| """ |
| Test crypto login and DEK management APIs. |
| The test checks also that invalid actions are not permited, e.g, create |
| DEK not in login session on wrapped DEK mode. |
| """ |
| try: |
| self.pd = PD(self.ctx) |
| cred_bytes = struct.pack('!6Q', *self.crypto_details['credential']) |
| key = struct.pack('!5Q', *self.crypto_details['wrapped_key']) |
| self.dek_init_attr = \ |
| Mlx5DEKInitAttr(self.pd, key=key, |
| key_size=dve.MLX5DV_CRYPTO_KEY_SIZE_128, |
| key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS, |
| opaque=DEK_OPAQUE) |
| self.verify_create_dek_out_of_login_session() |
| self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_NO_LOGIN) |
| |
| # Login to crypto session |
| self.login_attr = Mlx5CryptoLoginAttr(cred_bytes) |
| Mlx5Context.crypto_login(self.ctx, self.login_attr) |
| self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_VALID) |
| self.verify_login_twice() |
| self.dek = Mlx5DEK(self.ctx, self.dek_init_attr) |
| self.verify_dek_opaque() |
| self.dek.close() |
| |
| # Logout from crypto session |
| Mlx5Context.crypto_logout(self.ctx) |
| self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_NO_LOGIN) |
| except PyverbsRDMAError as ex: |
| print(ex) |
| if ex.error_code == errno.EOPNOTSUPP: |
| raise unittest.SkipTest('Create crypto elements is not supported') |
| raise ex |
| |
| |
| class Mlx5CryptoTrafficTest(Mlx5RDMATestCase): |
| """ |
| Test the mlx5 cryto APIs. |
| """ |
| def setUp(self): |
| super().setUp() |
| self.iters = 10 |
| self.crypto_details = None |
| self.validate_data = False |
| self.is_multi_block = False |
| self.msg_size = 1024 |
| self.key_size = dve.MLX5DV_CRYPTO_KEY_SIZE_128 |
| |
| def create_client_dek(self): |
| """ |
| Create DEK using the client resources. |
| """ |
| cred_bytes = struct.pack('!6Q', *self.crypto_details['credential']) |
| log_attr = Mlx5CryptoLoginAttr(cred_bytes) |
| Mlx5Context.crypto_login(self.client.ctx, log_attr) |
| self._create_wrapped_dek() |
| |
| def create_client_wrapped_dek_login_obj(self): |
| """ |
| Create CryptoExtLoginAttr and crypto login object |
| """ |
| cred_bytes = struct.pack('!6Q', *self.crypto_details['credential']) |
| log_attr = Mlx5CryptoExtLoginAttr(cred_bytes, 48) |
| crypto_login_obj = Mlx5CryptoLogin(self.client.ctx, log_attr) |
| comp_mask=dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN |
| self._create_wrapped_dek(comp_mask, crypto_login_obj) |
| |
| def _create_wrapped_dek(self, comp_mask=0, crypto_login_obj=None): |
| """ |
| Create wrapped DEK using the client resources. |
| """ |
| key = struct.pack('!5Q', *self.crypto_details['wrapped_key']) |
| if self.key_size == dve.MLX5DV_CRYPTO_KEY_SIZE_256: |
| key = struct.pack('!9Q', *self.crypto_details['wrapped_256_bits_key']) |
| |
| if comp_mask == dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN: |
| self.dek_attr = Mlx5DEKInitAttr(self.client.pd, key=key, |
| key_size=self.key_size, |
| key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS, |
| comp_mask=comp_mask, crypto_login=crypto_login_obj) |
| else: |
| self.dek_attr = Mlx5DEKInitAttr(self.client.pd, key=key, |
| key_size=self.key_size, |
| key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS) |
| |
| self.dek = Mlx5DEK(self.client.ctx, self.dek_attr) |
| |
| def create_client_plaintext_dek(self): |
| """ |
| Create DEK using the client resources. |
| """ |
| key = struct.pack('8I',*self.crypto_details['plaintext_key']) |
| if self.key_size == dve.MLX5DV_CRYPTO_KEY_SIZE_256: |
| key = struct.pack('16I', *self.crypto_details['plaintext_256_bits_key']) |
| self.dek_attr = Mlx5DEKInitAttr(self.client.pd,key=key, |
| key_size=self.key_size, |
| comp_mask=dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN, |
| crypto_login=None) |
| self.dek = Mlx5DEK(self.client.ctx, self.dek_attr) |
| |
| def reg_client_mkey(self, signature=False): |
| """ |
| Configure an mkey with crypto attributes. |
| :param signature: True if signature configuration requested. |
| """ |
| num_of_configuration = 4 if signature else 3 |
| for mkey in [self.client.wire_enc_mkey, self.client.mem_enc_mkey]: |
| self.client.qp.wr_start() |
| self.client.qp.wr_flags = e.IBV_SEND_SIGNALED | e.IBV_SEND_INLINE |
| offset = 0 if mkey == self.client.wire_enc_mkey else self.client.msg_size/2 |
| sge = SGE(self.client.mr.buf + offset, self.client.msg_size/2, self.client.mr.lkey) |
| self.client.qp.wr_mkey_configure(mkey, num_of_configuration, Mlx5MkeyConfAttr()) |
| self.client.qp.wr_set_mkey_access_flags(e.IBV_ACCESS_LOCAL_WRITE) |
| self.client.qp.wr_set_mkey_layout_list([sge]) |
| if signature: |
| self.configure_mkey_signature() |
| initial_tweak = struct.pack('!2Q', int(0), int(0)) |
| encrypt_on_tx = mkey == self.client.wire_enc_mkey |
| sign_crypto_order = dve.MLX5DV_SIGNATURE_CRYPTO_ORDER_SIGNATURE_BEFORE_CRYPTO_ON_TX |
| crypto_attr = Mlx5CryptoAttr(crypto_standard=dve.MLX5DV_CRYPTO_STANDARD_AES_XTS, |
| encrypt_on_tx=encrypt_on_tx, |
| signature_crypto_order=sign_crypto_order, |
| data_unit_size=dve.MLX5DV_BLOCK_SIZE_512, |
| dek=self.dek, initial_tweak=initial_tweak) |
| self.client.qp.wr_set_mkey_crypto(crypto_attr) |
| self.client.qp.wr_complete() |
| u.poll_cq(self.client.cq) |
| |
| def configure_mkey_signature(self): |
| """ |
| Configure an mkey with signature attributes. |
| """ |
| sig_crc = Mlx5SigCrc(crc_type=dve.MLX5DV_SIG_CRC_TYPE_CRC32, seed=0xFFFFFFFF) |
| sig_block_domain = Mlx5SigBlockDomain(sig_type=dve.MLX5DV_SIG_TYPE_CRC, crc=sig_crc, |
| block_size=dve.MLX5DV_BLOCK_SIZE_512) |
| sig_attr = Mlx5SigBlockAttr(wire=sig_block_domain, |
| check_mask=dve.MLX5DV_SIG_MASK_CRC32) |
| self.client.qp.wr_set_mkey_sig_block(sig_attr) |
| |
| def get_send_wr(self, player, wire_encryption): |
| mkey = player.wire_enc_mkey if wire_encryption else player.mem_enc_mkey |
| sge = SGE(0, player.msg_size/2, mkey.lkey) |
| return SendWR(opcode=e.IBV_WR_SEND, num_sge=1, sg=[sge]) |
| |
| def get_recv_wr(self, player, wire_encryption): |
| offset = 0 if wire_encryption else player.msg_size/2 |
| sge = SGE(player.mr.buf + offset, player.msg_size/2, player.mr.lkey) |
| return RecvWR(sg=[sge], num_sge=1) |
| |
| def prepare_validate_data(self): |
| data_size = int(self.msg_size / 2) |
| self.client.mr.write('c' * data_size, data_size) |
| if self.is_multi_block: |
| encrypted_data = struct.pack('!128Q', *self.crypto_details['encrypted_data_for_1024_c']) |
| else: |
| encrypted_data = struct.pack('!64Q', *self.crypto_details['encrypted_data_for_512_c']) |
| self.client.mr.write(encrypted_data, data_size, offset=data_size) |
| |
| def validate_crypto_data(self): |
| """ |
| Validate the server MR data. Verify that the encryption/decryption works well. |
| """ |
| data_size= int(self.msg_size / 2) |
| send_msg = self.client.mr.read(self.msg_size, 0) |
| recv_msg = self.server.mr.read(self.msg_size, 0) |
| self.assertEqual(send_msg[0:data_size], recv_msg[data_size:self.msg_size]) |
| self.assertEqual(send_msg[data_size:self.msg_size], recv_msg[0:data_size]) |
| |
| def init_data_validation(self): |
| if 'encrypted_data_for_512_c' in self.crypto_details and not self.is_multi_block: |
| self.validate_data = True |
| elif 'encrypted_data_for_1024_c' in self.crypto_details and self.is_multi_block: |
| self.validate_data = True |
| |
| def traffic(self): |
| """ |
| Perform RC traffic using the configured mkeys. |
| """ |
| if self.validate_data: |
| self.prepare_validate_data() |
| for _ in range(self.iters): |
| self.server.qp.post_recv(self.get_recv_wr(self.server, wire_encryption=True)) |
| self.server.qp.post_recv(self.get_recv_wr(self.server, wire_encryption=False)) |
| self.client.qp.post_send(self.get_send_wr(self.client, wire_encryption=True)) |
| self.client.qp.post_send(self.get_send_wr(self.client, wire_encryption=False)) |
| u.poll_cq(self.client.cq, count=2) |
| u.poll_cq(self.server.cq, count=2) |
| if self.validate_data: |
| self.validate_crypto_data() |
| |
| def run_crypto_dek_test(self, create_dek_func): |
| """ |
| Creates player and DEK, using the give function, for different test |
| cases, and runs traffic. |
| :param create_dek_func: Function that creates a DEK. |
| """ |
| self.init_data_validation() |
| mkey_flags = dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_CRYPTO | \ |
| dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT |
| self.create_players(Mlx5CryptoResources, |
| dv_send_ops_flags=dve.MLX5DV_QP_EX_WITH_MKEY_CONFIGURE, |
| mkey_create_flags=mkey_flags, msg_size=self.msg_size) |
| create_dek_func() |
| self.reg_client_mkey() |
| self.traffic() |
| |
| @requires_crypto_support(is_wrapped_dek_mode=True) |
| def test_mlx5_crypto_mkey_old_api(self): |
| """ |
| Create Mkeys, register a memory layout using the mkeys, configure |
| crypto attributes on it and then run traffic. |
| """ |
| self.run_crypto_dek_test(self.create_client_dek) |
| |
| @requires_crypto_support(is_wrapped_dek_mode=True) |
| def test_mlx5_crypto_wrapped_dek(self): |
| """ |
| Create Mkeys, register a memory layout using the mkeys, |
| configure crypto attributes on it using login object API and then run |
| traffic. |
| Use wrapped DEK with new API. |
| """ |
| self.run_crypto_dek_test(self.create_client_wrapped_dek_login_obj) |
| |
| @requires_crypto_support(is_wrapped_dek_mode=False) |
| def test_mlx5_crypto_plaintext_dek(self): |
| """ |
| Create Mkeys, register a memory layout using the mkeys, |
| configure crypto attributes on it using login object API and then run |
| traffic. |
| Use plaintext DEK. |
| """ |
| self.run_crypto_dek_test(self.create_client_plaintext_dek) |
| |
| @requires_crypto_support(is_wrapped_dek_mode=True) |
| def test_mlx5_crypto_signature_mkey(self): |
| """ |
| Create Mkeys, register a memory layout using this mkey, configure |
| crypto and signature attributes on it and then perform traffic using |
| this mkey. |
| """ |
| if 'wrapped_256_bits_key' in self.crypto_details: |
| self.key_size = dve.MLX5DV_CRYPTO_KEY_SIZE_256 |
| mkey_flags = dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_CRYPTO | \ |
| dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT | \ |
| dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_BLOCK_SIGNATURE |
| self.create_players(Mlx5CryptoResources, |
| dv_send_ops_flags=dve.MLX5DV_QP_EX_WITH_MKEY_CONFIGURE, |
| mkey_create_flags=mkey_flags) |
| self.create_client_dek() |
| self.reg_client_mkey(signature=True) |
| self.traffic() |
| |
| @requires_crypto_support(is_wrapped_dek_mode=False, multi_block_support=True) |
| def test_mlx5_plaintext_dek_multi_block(self): |
| self.is_multi_block = True |
| self.msg_size = 2048 |
| self.run_crypto_dek_test(self.create_client_plaintext_dek) |
| |
| @requires_crypto_support(is_wrapped_dek_mode=True, multi_block_support=True) |
| def test_mlx5_wrapped_dek_multi_block(self): |
| self.is_multi_block = True |
| self.msg_size = 2048 |
| self.run_crypto_dek_test(self.create_client_wrapped_dek_login_obj) |