Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 87899718 authored by Myles Watson's avatar Myles Watson
Browse files

Direct HCI classic connection test

Bug: 147444951
Test: ./cert/run_cert_facade_only.sh
Change-Id: Ie075bbac29f2ac08d89b13d7a6c454f7f82bec2f
parent f979d618
Loading
Loading
Loading
Loading
+228 −0
Original line number Diff line number Diff line
@@ -349,3 +349,231 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass):
                lambda packet: b'\x3e\x13\x01\x00' in packet.payload)
            le_event_asserts.assert_event_occurs(
                lambda packet: b'\x3e\x13\x01\x00' in packet.event)

    def test_connection_dut_connects(self):
        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
        with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
            EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \
            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \
            EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream:

            address = hci_packets.Address()

            def get_address_from_complete(packet):
                packet_bytes = packet.payload
                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
                    nonlocal address
                    addr_view = hci_packets.ReadBdAddrCompleteView(
                        hci_packets.CommandCompleteView(
                            hci_packets.EventPacketView(
                                bt_packets.PacketViewLittleEndian(
                                    list(packet_bytes)))))
                    address = addr_view.GetBdAddr()
                    return True
                return False

            # CERT Enables scans and gets its address
            self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder())

            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
            cert_hci_event_asserts.assert_event_occurs(
                get_address_from_complete)
            self.send_hal_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            # DUT Connects
            self.enqueue_hci_command(
                hci_packets.CreateConnectionBuilder(
                    address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22,
                    hci_packets.ClockOffsetValid.VALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
                False)

            # Cert Accepts
            connection_request = None

            def get_connect_request(packet):
                if b'\x04\x0a' in packet.payload:
                    nonlocal connection_request
                    connection_request = hci_packets.ConnectionRequestView(
                        hci_packets.EventPacketView(
                            bt_packets.PacketViewLittleEndian(
                                list(packet.payload))))
                    return True
                return False

            # Cert Accepts
            cert_hci_event_asserts.assert_event_occurs(get_connect_request)
            self.send_hal_hci_command(
                hci_packets.AcceptConnectionRequestBuilder(
                    connection_request.GetBdAddr(),
                    hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

            conn_handle = 0xfff

            def get_handle(packet_bytes):
                if b'\x03\x0b\x00' in packet_bytes:
                    nonlocal conn_handle
                    cc_view = hci_packets.ConnectionCompleteView(
                        hci_packets.EventPacketView(
                            bt_packets.PacketViewLittleEndian(
                                list(packet_bytes))))
                    conn_handle = cc_view.GetConnectionHandle()
                    return True
                return False

            def event_handle(packet):
                packet_bytes = packet.event
                return get_handle(packet_bytes)

            def payload_handle(packet):
                packet_bytes = packet.payload
                return get_handle(packet_bytes)

            hci_event_asserts = EventAsserts(hci_event_stream)

            cert_hci_event_asserts.assert_event_occurs(payload_handle)
            cert_handle = conn_handle
            conn_handle = 0xfff
            hci_event_asserts.assert_event_occurs(event_handle)
            dut_handle = conn_handle
            if dut_handle == 0xfff:
                logging.warning("Failed to get the DUT handle")
                return False
            if cert_handle == 0xfff:
                logging.warning("Failed to get the CERT handle")
                return False

            # Send ACL Data
            self.enqueue_acl_data(
                dut_handle, hci_packets.PacketBoundaryFlag.
                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                hci_packets.BroadcastFlag.POINT_TO_POINT,
                bytes(b'This is just SomeAclData'))
            self.send_hal_acl_data(
                cert_handle, hci_packets.PacketBoundaryFlag.
                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                hci_packets.BroadcastFlag.POINT_TO_POINT,
                bytes(b'This is just SomeMoreAclData'))

            acl_data_asserts = EventAsserts(acl_data_stream)
            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
            cert_acl_data_asserts.assert_event_occurs(
                lambda packet: b'SomeAclData' in packet.payload)
            acl_data_asserts.assert_event_occurs(
                lambda packet: b'SomeMoreAclData' in packet.data)

    def test_connection_cert_connects(self):
        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
        self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
        with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
            EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \
            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \
            EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream:

            address = hci_packets.Address()

            def get_address_from_complete(packet):
                packet_bytes = packet.event
                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
                    nonlocal address
                    addr_view = hci_packets.ReadBdAddrCompleteView(
                        hci_packets.CommandCompleteView(
                            hci_packets.EventPacketView(
                                bt_packets.PacketViewLittleEndian(
                                    list(packet_bytes)))))
                    address = addr_view.GetBdAddr()
                    return True
                return False

            # DUT Enables scans and gets its address
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)

            dut_hci_event_asserts = EventAsserts(hci_event_stream)
            dut_hci_event_asserts.assert_event_occurs(get_address_from_complete)

            # Cert Connects
            self.send_hal_hci_command(
                hci_packets.CreateConnectionBuilder(
                    address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22,
                    hci_packets.ClockOffsetValid.VALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

            # DUT Accepts
            connection_request = None

            def get_connect_request(packet):
                if b'\x04\x0a' in packet.event:
                    nonlocal connection_request
                    connection_request = hci_packets.ConnectionRequestView(
                        hci_packets.EventPacketView(
                            bt_packets.PacketViewLittleEndian(
                                list(packet.event))))
                    return True
                return False

            dut_hci_event_asserts.assert_event_occurs(get_connect_request)
            self.enqueue_hci_command(
                hci_packets.AcceptConnectionRequestBuilder(
                    connection_request.GetBdAddr(),
                    hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                False)

            conn_handle = 0xfff

            def get_handle(packet_bytes):
                if b'\x03\x0b\x00' in packet_bytes:
                    nonlocal conn_handle
                    cc_view = hci_packets.ConnectionCompleteView(
                        hci_packets.EventPacketView(
                            bt_packets.PacketViewLittleEndian(
                                list(packet_bytes))))
                    conn_handle = cc_view.GetConnectionHandle()
                    return True
                return False

            def event_handle(packet):
                packet_bytes = packet.event
                return get_handle(packet_bytes)

            def payload_handle(packet):
                packet_bytes = packet.payload
                return get_handle(packet_bytes)

            hci_event_asserts = EventAsserts(hci_event_stream)

            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
            cert_hci_event_asserts.assert_event_occurs(payload_handle)
            cert_handle = conn_handle
            conn_handle = 0xfff
            hci_event_asserts.assert_event_occurs(event_handle)
            dut_handle = conn_handle
            if dut_handle == 0xfff:
                logging.warning("Failed to get the DUT handle")
                return False
            if cert_handle == 0xfff:
                logging.warning("Failed to get the CERT handle")
                return False

            # Send ACL Data
            self.enqueue_acl_data(
                dut_handle, hci_packets.PacketBoundaryFlag.
                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                hci_packets.BroadcastFlag.POINT_TO_POINT,
                bytes(b'This is just SomeAclData'))
            self.send_hal_acl_data(
                cert_handle, hci_packets.PacketBoundaryFlag.
                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                hci_packets.BroadcastFlag.POINT_TO_POINT,
                bytes(b'This is just SomeMoreAclData'))

            acl_data_asserts = EventAsserts(acl_data_stream)
            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
            cert_acl_data_asserts.assert_event_occurs(
                lambda packet: b'SomeAclData' in packet.payload)
            acl_data_asserts.assert_event_occurs(
                lambda packet: b'SomeMoreAclData' in packet.data)