Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0cb78d93 authored by bidsharma's avatar bidsharma
Browse files

GD: Create remote address with correct type

Bug: 230123996
Test: cert/run LeAclManagerTest
Tag: #refactor

Change-Id: I72e0b3822a934cc22c975ffbd487af957e49fdb8
parent b4a88c7d
Loading
Loading
Loading
Loading
+65 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement
from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
from blueberry.facade import common_pb2 as common
from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade
from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
from blueberry.facade.hci import hci_facade_pb2 as hci_facade
@@ -95,6 +96,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        cert_le_acl = self.cert_hci.incoming_le_connection()
        return dut_le_acl, cert_le_acl

    def cert_advertises_resolvable(self):
        self.cert_hci.add_device_to_resolving_list(hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                                                   self.dut_public_address,
                                                   b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f',
                                                   b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f')

        # Cert Advertises
        advertising_handle = 0
        py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci)

        self.cert_hci.create_advertisement(
            advertising_handle,
            self.cert_random_address,
            hci_packets.LegacyAdvertisingProperties.ADV_IND,
            own_address_type=hci_packets.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS,
            peer_address=self.dut_public_address,
            peer_address_type=hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS)

        py_hci_adv.set_data(b'Im_A_Cert')
        py_hci_adv.set_scan_response(b'Im_A_C')
        py_hci_adv.start()

    def dut_connects_cert_resolvable(self):
        self.dut.hci_le_acl_manager.AddDeviceToResolvingList(
            le_acl_manager_facade.IrkMsg(
                peer=common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")),
                    type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)),
                peer_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f',
                local_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f',
            ))

        dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")),
                type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)))

        cert_le_acl = self.cert_hci.incoming_le_connection()
        return dut_le_acl, cert_le_acl

    def send_receive_and_check(self, dut_le_acl, cert_le_acl):
        self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                              hci_packets.BroadcastFlag.POINT_TO_POINT,
@@ -128,6 +169,9 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address)
        assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
@@ -135,6 +179,27 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_resolvable_address_public(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
            rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f',
            minimum_rotation_time=7 * 60 * 1000,
            maximum_rotation_time=15 * 60 * 1000)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
        self.cert_advertises_resolvable()
        dut_le_acl, cert_le_acl = self.dut_connects_cert_resolvable()

        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address)
        assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_public_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_non_resolvable_address(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS,
+13 −8
Original line number Diff line number Diff line
@@ -365,15 +365,20 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback {
      on_le_connection_canceled_on_pause();
      return;
    }
    AddressWithType remote_address(address, peer_address_type);
    // The address added to the connect list is the one that callbacks
    // should be sent for, given that is the address passed in
    // call to BluetoothDevice#connectGatt and tied to app layer callbacks.
    if (!peer_resolvable_address.IsEmpty() &&
        is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) {
      LOG_INFO("Using resolvable address");
      remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS);

    AddressType remote_address_type;

    switch (peer_address_type) {
      case AddressType::PUBLIC_DEVICE_ADDRESS:
      case AddressType::PUBLIC_IDENTITY_ADDRESS:
        remote_address_type = AddressType::PUBLIC_DEVICE_ADDRESS;
        break;
      case AddressType::RANDOM_DEVICE_ADDRESS:
      case AddressType::RANDOM_IDENTITY_ADDRESS:
        remote_address_type = AddressType::RANDOM_DEVICE_ADDRESS;
        break;
    }
    AddressWithType remote_address(address, remote_address_type);

    on_common_le_connection_complete(remote_address);
    if (status == ErrorCode::UNKNOWN_CONNECTION) {