Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ce7534ba authored by Zach Johnson's avatar Zach Johnson Committed by Automerger Merge Worker
Browse files

Merge changes Ic66d8fcd,I9ffbe874,I59e6926b,I1fb172f7,I0eb60ace, ... am: fd31d9cc

Original change: https://android-review.googlesource.com/c/platform/system/bt/+/1499266

Change-Id: I85ef1eb82c1763a19189ef1b5de7bcebb69328dc
parents e6c8ef92 fd31d9cc
Loading
Loading
Loading
Loading
+45 −10
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import bluetooth_packets_python3 as bt_packets
import logging

@@ -55,6 +56,32 @@ class HciMatchers(object):
            else:
                return complete

    @staticmethod
    def CommandStatus(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)

    @staticmethod
    def _is_matching_command_status(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None

    @staticmethod
    def _extract_matching_command_status(packet_bytes, opcode=None):
        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
        if event is None:
            return None
        complete = hci_packets.CommandStatusView(event)
        if opcode is None or complete is None:
            return complete
        else:
            if complete.GetCommandOpCode() != opcode:
                return None
            else:
                return complete

    @staticmethod
    def EventWithCode(event_code):
        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
@@ -86,10 +113,10 @@ class HciMatchers(object):

    @staticmethod
    def _extract_matching_le_event(packet_bytes, subevent_code):
        event = hci_packets.LeMetaEventView(
            HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT))
        if event is None:
        inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)
        if inner_event is None:
            return None
        event = hci_packets.LeMetaEventView(inner_event)
        if event.GetSubeventCode() != subevent_code:
            return None
        return event
@@ -104,12 +131,16 @@ class HciMatchers(object):

    @staticmethod
    def _extract_le_connection_complete(packet_bytes):
        event = hci_packets.LeConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE))
        if event is not None:
            return event
        return hci_packets.LeEnhancedConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE))
        inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeConnectionCompleteView(inner_event)

        inner_event = HciMatchers._extract_matching_le_event(packet_bytes,
                                                             hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeEnhancedConnectionCompleteView(inner_event)

        return None

    @staticmethod
    def LogEventCode():
@@ -165,7 +196,11 @@ class HciMatchers(object):

    @staticmethod
    def LoopbackOf(packet):
        data = bytes(hci_packets.LoopbackCommandBuilder(packet).Serialize())
        return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet))

    @staticmethod
    def Exactly(packet):
        data = bytes(packet.Serialize())
        return lambda event: data == event.payload


+167 −0
Original line number Diff line number Diff line
@@ -30,6 +30,26 @@ from bluetooth_packets_python3 import RawBuilder
from bluetooth_packets_python3.hci_packets import BroadcastFlag
from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
from bluetooth_packets_python3 import hci_packets
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHalAclConnection(IEventStream):
@@ -50,6 +70,47 @@ class PyHalAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHalAdvertisement(object):

    def __init__(self, handle, py_hal):
        self.handle = handle
        self.py_hal = py_hal

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE)

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)

    def stop(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)


class PyHal(Closable):

    def __init__(self, device):
@@ -67,6 +128,12 @@ class PyHal(Closable):
    def get_hci_event_stream(self):
        return self.hci_event_stream

    def wait_for_complete(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))

    def wait_for_status(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))

    def get_acl_stream(self):
        return self.acl_stream

@@ -86,6 +153,38 @@ class PyHal(Closable):
        assertThat(self.hci_event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()

    def set_random_le_address(self, addr):
        self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)

    def set_scan_parameters(self):
        phy_scan_params = hci_packets.PhyScanParameters()
        phy_scan_params.le_scan_interval = 6553
        phy_scan_params.le_scan_window = 6553
        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE

        self.send_hci_command(
            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
                                                           [phy_scan_params]))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)

    def start_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def stop_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def reset(self):
        self.send_hci_command(hci_packets.ResetBuilder())
        self.wait_for_complete(OpCode.RESET)

    def enable_inquiry_and_page_scan(self):
        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))

@@ -114,3 +213,71 @@ class PyHal(Closable):

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def initiate_le_connection(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)

    def add_to_connect_list(self, remote_addr):
        self.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, remote_addr))

    def initiate_le_connection_by_connect_list(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))

    def complete_le_connection(self):
        connection_complete = HciCaptures.LeConnectionCompleteCapture()
        assertThat(self.hci_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_hci_command(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)

        self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS)

        return PyHalAdvertisement(handle, self)
+75 −0
Original line number Diff line number Diff line
@@ -24,6 +24,26 @@ from cert.captures import HciCaptures
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
from hci.facade import facade_pb2 as hci_facade
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHciAclConnection(IEventStream):
@@ -51,6 +71,38 @@ class PyHciAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHciAdvertisement(object):

    def __init__(self, handle, py_hci):
        self.handle = handle
        self.py_hci = py_hci

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        assertThat(self.py_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))


class PyHci(Closable):

    event_stream = None
@@ -139,3 +191,26 @@ class PyHci(Closable):
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))

        self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        return PyHciAdvertisement(handle, self)
+52 −203

File changed.

Preview size limit exceeded, changes collapsed.

+14 −79
Original line number Diff line number Diff line
@@ -215,52 +215,10 @@ class DirectHciTest(GdBaseTestClass):
                                              OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                              '0D:05:04:03:02:01', 1, [phy_scan_params]))

        # DUT Advertises
        advertising_handle = 0
        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                LegacyAdvertisingProperties.ADV_IND,
                400,
                450,
                7,
                OwnAddressType.RANDOM_DEVICE_ADDRESS,
                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                '00:00:00:00:00:00',
                AdvertisingFilterPolicy.ALL_DEVICES,
                0xF8,
                1,  #SID
                Enable.DISABLED  # Scan request notification
            ))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))

        gap_name = GapData()
        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_The_DUT'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))

        gap_short_name = GapData()
        gap_short_name.data_type = GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_The_D'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))

        enabled_set = EnabledSet()
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.dut_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))

        # Check for success of Enable
        assertThat(self.dut_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
        advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01')
        advertisement.set_data(b'Im_The_DUT')
        advertisement.set_scan_response(b'Im_The_D')
        advertisement.start()

        (dut_handle, cert_handle) = self._verify_le_connection_complete()

@@ -286,39 +244,16 @@ class DirectHciTest(GdBaseTestClass):
                                              OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                              'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))

        # CERT Advertises
        advertising_handle = 1
        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                LegacyAdvertisingProperties.ADV_IND,
                512,
                768,
                7,
                OwnAddressType.RANDOM_DEVICE_ADDRESS,
                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                'A6:A5:A4:A3:A2:A1',
                AdvertisingFilterPolicy.ALL_DEVICES,
                0x7F,
                0,  # SID
                Enable.DISABLED  # Scan request notification
            ))

        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = GapData()
        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))

        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = 1
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        advertisement = self.cert_hal.create_advertisement(
            1,
            '0C:05:04:03:02:01',
            min_interval=512,
            max_interval=768,
            peer_address='A6:A5:A4:A3:A2:A1',
            tx_power=0x7f,
            sid=0)
        advertisement.set_data(b'Im_A_Cert')
        advertisement.start()

        # LeConnectionComplete
        self._verify_le_connection_complete()