Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cebe3655 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Fix L2CAP cert and remove legacy HCI cert

Test: run_cert.sh
Change-Id: I792ece4a241b4733c3a4e496f8deb20e2096a109
parent c4ec18e6
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
SimpleHciTest
SimpleL2capTest
+0 −2
Original line number Diff line number Diff line
@@ -5,5 +5,3 @@
# Run normal facade to cert API tests
PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd
# Run new facade to facade API tests
PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config_facade_only.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases_facade_only -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -207,7 +207,7 @@ class HciHalHostRootcanal : public HciHal {
    RUN_NO_INTR(received_size = recv(sock_fd_, buf, kH4HeaderSize, 0));
    ASSERT_LOG(received_size != -1, "Can't receive from socket: %s", strerror(errno));
    if (received_size == 0) {
      LOG_WARN("Can't read H4 header.");
      LOG_WARN("Can't read H4 header. EOF received");
      raise(SIGINT);
      return;
    }
+0 −357
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from __future__ import print_function

import logging
import os
import sys
sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd')

from cert.gd_base_test import GdBaseTestClass
from cert.event_callback_stream import EventCallbackStream
from cert.event_asserts import EventAsserts
from cert import rootservice_pb2 as cert_rootservice_pb2
from facade import common_pb2
from facade import rootservice_pb2 as facade_rootservice_pb2
from google.protobuf import empty_pb2
from hci import facade_pb2 as hci_facade_pb2
from hci.cert import api_pb2 as hci_cert_pb2
from hci.cert import api_pb2_grpc as hci_cert_pb2_grpc


class SimpleHciTest(GdBaseTestClass):

    def setup_test(self):
        self.device_under_test = self.gd_devices[0]
        self.cert_device = self.gd_cert_devices[0]

        self.device_under_test.rootservice.StartStack(
            facade_rootservice_pb2.StartStackRequest(
                module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
                    'HCI'),))
        self.cert_device.rootservice.StartStack(
            cert_rootservice_pb2.StartStackRequest(
                module_to_test=cert_rootservice_pb2.BluetoothModule.Value(
                    'HCI'),))

        self.device_under_test.wait_channel_ready()
        self.cert_device.wait_channel_ready()

        self.device_under_test.hci.SetPageScanMode(
            hci_facade_pb2.PageScanMode(enabled=True))
        self.cert_device.hci.SetPageScanMode(
            hci_cert_pb2.PageScanMode(enabled=True))

        dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
            empty_pb2.Empty()).address
        self.device_under_test.address = dut_address
        cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(
            empty_pb2.Empty()).address
        self.cert_device.address = cert_address

        self.dut_address = common_pb2.BluetoothAddress(
            address=self.device_under_test.address)
        self.cert_address = common_pb2.BluetoothAddress(
            address=self.cert_device.address)

    def teardown_test(self):
        self.device_under_test.rootservice.StopStack(
            facade_rootservice_pb2.StopStackRequest())
        self.cert_device.rootservice.StopStack(
            cert_rootservice_pb2.StopStackRequest())

    def test_none_event(self):
        with EventCallbackStream(
                self.device_under_test.hci.FetchConnectionComplete(
                    empty_pb2.Empty())) as connection_complete_stream:
            connection_complete_asserts = EventAsserts(
                connection_complete_stream)
            connection_complete_asserts.assert_none()

    def _connect_from_dut(self):
        logging.debug("_connect_from_dut")
        policy = hci_cert_pb2.IncomingConnectionPolicy(
            remote=self.dut_address, accepted=True)
        self.cert_device.hci.SetIncomingConnectionPolicy(policy)

        with EventCallbackStream(
                self.device_under_test.hci.FetchConnectionComplete(
                    empty_pb2.Empty())) as dut_connection_complete_stream:
            dut_connection_complete_asserts = EventAsserts(
                dut_connection_complete_stream)
            self.device_under_test.hci.Connect(self.cert_address)
            dut_connection_complete_asserts.assert_event_occurs(
                lambda event: self._get_handle(event))

    def _disconnect_from_dut(self):
        logging.debug("_disconnect_from_dut")
        with EventCallbackStream(
                self.device_under_test.hci.FetchDisconnection(
                    empty_pb2.Empty())) as dut_disconnection_stream:
            dut_disconnection_asserts = EventAsserts(dut_disconnection_stream)
            self.device_under_test.hci.Disconnect(self.cert_address)
            dut_disconnection_asserts.assert_event_occurs(
                lambda event: event.remote.address == self.cert_device.address)

    def _get_handle(self, event):
        if event.remote.address == self.cert_device.address:
            self.connection_handle = event.connection_handle
            return True
        return False

    def test_connect_disconnect_send_acl(self):
        self._connect_from_dut()

        with EventCallbackStream(
                self.cert_device.hci.FetchAclData(
                    empty_pb2.Empty())) as cert_acl_stream:
            cert_acl_asserts = EventAsserts(cert_acl_stream)
            acl_data = hci_facade_pb2.AclData(
                remote=self.cert_address, payload=b'123')
            self.device_under_test.hci.SendAclData(acl_data)
            self.device_under_test.hci.SendAclData(acl_data)
            self.device_under_test.hci.SendAclData(acl_data)
            cert_acl_asserts.assert_event_occurs(
                lambda packet: b'123' in packet.payload and packet.remote == self.dut_address
            )

        self._disconnect_from_dut()

    def test_connect_disconnect_receive_acl(self):
        self._connect_from_dut()

        with EventCallbackStream(
                self.device_under_test.hci.FetchAclData(
                    empty_pb2.Empty())) as dut_acl_stream:
            dut_acl_asserts = EventAsserts(dut_acl_stream)
            acl_data = hci_cert_pb2.AclData(
                remote=self.dut_address, payload=b'123')
            self.cert_device.hci.SendAclData(acl_data)
            self.cert_device.hci.SendAclData(acl_data)
            self.cert_device.hci.SendAclData(acl_data)
            dut_acl_asserts.assert_event_occurs(
                lambda packet: b'123' in packet.payload and packet.remote == self.cert_address
            )

        self._disconnect_from_dut()

    def test_reject_connection_request(self):
        with EventCallbackStream(
                self.device_under_test.hci.FetchConnectionFailed(
                    empty_pb2.Empty())) as dut_connection_failed_stream:
            dut_connection_failed_asserts = EventAsserts(
                dut_connection_failed_stream)
            self.device_under_test.hci.Connect(self.cert_address)
            dut_connection_failed_asserts.assert_event_occurs(
                lambda event: event.remote == self.cert_address)

    def test_send_classic_security_command(self):
        self._connect_from_dut()

        with EventCallbackStream(
                self.device_under_test.hci_classic_security.
                FetchCommandCompleteEvent(
                    empty_pb2.Empty())) as dut_command_complete_stream:
            dut_command_complete_asserts = EventAsserts(
                dut_command_complete_stream)

            self.device_under_test.hci.AuthenticationRequested(
                self.cert_address)

            # Link request
            self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x040c)

            # Pin code request
            message = hci_facade_pb2.PinCodeRequestReplyMessage(
                remote=self.cert_address,
                len=4,
                pin_code=bytes("1234", encoding="ASCII"))
            self.device_under_test.hci_classic_security.PinCodeRequestReply(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x040d)
            self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x040e)

            # IO capability request
            message = hci_facade_pb2.IoCapabilityRequestReplyMessage(
                remote=self.cert_address,
                io_capability=0,
                oob_present=0,
                authentication_requirements=0)
            self.device_under_test.hci_classic_security.IoCapabilityRequestReply(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x042b)

            # message = hci_facade_pb2.IoCapabilityRequestNegativeReplyMessage(
            #     remote=self.cert_address,
            #     reason=1
            # )
            # # link_layer_controller.cc(447)] Check failed: security_manager_.GetAuthenticationAddress() == peer
            # self.device_under_test.hci_classic_security.IoCapabilityRequestNegativeReply(message)

            # User confirm request
            self.device_under_test.hci_classic_security.UserConfirmationRequestReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x042c)

            message = hci_facade_pb2.LinkKeyRequestReplyMessage(
                remote=self.cert_address,
                link_key=bytes(
                    "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"))
            self.device_under_test.hci_classic_security.LinkKeyRequestReply(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x040b)

            self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x042d)

            # User passkey request
            message = hci_facade_pb2.UserPasskeyRequestReplyMessage(
                remote=self.cert_address,
                passkey=999999,
            )
            self.device_under_test.hci_classic_security.UserPasskeyRequestReply(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x042e)

            self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x042f)

            # Remote OOB data request
            message = hci_facade_pb2.RemoteOobDataRequestReplyMessage(
                remote=self.cert_address,
                c=
                b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26',
                r=
                b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37',
            )
            self.device_under_test.hci_classic_security.RemoteOobDataRequestReply(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0430)
            self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply(
                self.cert_address)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0433)

            # Read/Write/Delete link key
            message = hci_facade_pb2.ReadStoredLinkKeyMessage(
                remote=self.cert_address,
                read_all_flag=0,
            )
            self.device_under_test.hci_classic_security.ReadStoredLinkKey(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c0d)

            message = hci_facade_pb2.WriteStoredLinkKeyMessage(
                num_keys_to_write=1,
                remote=self.cert_address,
                link_keys=bytes(
                    "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"),
            )
            self.device_under_test.hci_classic_security.WriteStoredLinkKey(
                message)

            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c11)

            message = hci_facade_pb2.DeleteStoredLinkKeyMessage(
                remote=self.cert_address,
                delete_all_flag=0,
            )
            self.device_under_test.hci_classic_security.DeleteStoredLinkKey(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c12)

            # Refresh Encryption Key
            message = hci_facade_pb2.RefreshEncryptionKeyMessage(
                connection_handle=self.connection_handle,)
            self.device_under_test.hci_classic_security.RefreshEncryptionKey(
                message)

            # Read/Write Simple Pairing Mode
            self.device_under_test.hci_classic_security.ReadSimplePairingMode(
                empty_pb2.Empty())
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c55)

            message = hci_facade_pb2.WriteSimplePairingModeMessage(
                simple_pairing_mode=1,)
            self.device_under_test.hci_classic_security.WriteSimplePairingMode(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c56)

            # Read local oob data
            self.device_under_test.hci_classic_security.ReadLocalOobData(
                empty_pb2.Empty())
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c57)

            # Send keypress notification
            message = hci_facade_pb2.SendKeypressNotificationMessage(
                remote=self.cert_address,
                notification_type=1,
            )
            self.device_under_test.hci_classic_security.SendKeypressNotification(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c60)

            # Read local oob extended data
            self.device_under_test.hci_classic_security.ReadLocalOobExtendedData(
                empty_pb2.Empty())
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x0c7d)

            # Read Encryption key size
            message = hci_facade_pb2.ReadEncryptionKeySizeMessage(
                connection_handle=self.connection_handle,)
            self.device_under_test.hci_classic_security.ReadEncryptionKeySize(
                message)
            dut_command_complete_asserts.assert_event_occurs(
                lambda event: event.command_opcode == 0x1408)

        self._disconnect_from_dut()

    def test_internal_hci_command(self):
        self._connect_from_dut()
        self.device_under_test.hci.TestInternalHciCommands(empty_pb2.Empty())
        self.device_under_test.hci.TestInternalHciLeCommands(empty_pb2.Empty())
        self._disconnect_from_dut()

    def test_classic_connection_management_command(self):
        self._connect_from_dut()
        self.device_under_test.hci.TestClassicConnectionManagementCommands(
            self.cert_address)
        self._disconnect_from_dut()
+33 −24
Original line number Diff line number Diff line
@@ -149,7 +149,7 @@ class SimpleL2capTest(GdBaseTestClass):
                    ChannelRetransmissionFlowControlConfig(
                        mode=self.retransmission_mode)))

        self.handle_connection_request = handle_connection_request
        self.handle_connection_response = handle_connection_response
        event_callback_stream.register_callback(
            self.handle_connection_response, matcher_fn=is_connection_response)

@@ -204,7 +204,7 @@ class SimpleL2capTest(GdBaseTestClass):
        self.cert_device.l2cap.SetupLink(
            l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address))
        event_asserts.assert_event_occurs(
            lambda log: log.HasField("link_up") and log.remote == self.dut_address
            lambda log: log.HasField("link_up") and log.link_up.remote == self.dut_address
        )

    def _open_channel(self, event_asserts, scid=0x0101, psm=0x33):
@@ -213,7 +213,11 @@ class SimpleL2capTest(GdBaseTestClass):
        self.cert_device.l2cap.SendConnectionRequest(
            l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
        event_asserts.assert_event_occurs(
            lambda log: is_configuration_response(log) and scid == log.scid)
            lambda log: is_configuration_response(log) and scid == log.configuration_response.scid
        )

        # Allow some time for channel creation on facade side after configuration response is received.
        time.sleep(0.5)

    def test_connect(self):
        with EventCallbackStream(
@@ -260,7 +264,7 @@ class SimpleL2capTest(GdBaseTestClass):
            l2cap_event_asserts.assert_event_occurs(
                lambda log : log.HasField("data_packet") and \
                             log.data_packet.channel == 2 and \
                             basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"123")
                             log.data_packet.payload == b"123")

            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(
@@ -461,7 +465,7 @@ class SimpleL2capTest(GdBaseTestClass):
                    EXTENDED_FEATURES,
                    signal_id=signal_id))

            l2cap_event_asserts_alt.assert_event_occurs_at_most_times(
            l2cap_event_asserts_alt.assert_event_occurs_at_most(
                is_information_response, 1)

            expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES
@@ -508,14 +512,15 @@ class SimpleL2capTest(GdBaseTestClass):
            self.cert_device.l2cap.SendSFrame(
                l2cap_cert_pb2.SFrame(
                    channel=self.scid_dcid_map[scid], req_seq=3, s=0))

            l2cap_event_asserts.assert_event_occurs(
                lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
                lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc'
            )
            l2cap_event_asserts.assert_event_occurs(
                lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
                lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc'
            )
            l2cap_event_asserts.assert_event_occurs(
                lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
                lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc'
            )
            l2cap_event_asserts_alt.assert_event_occurs_at_most(
                lambda log: log.HasField("data_packet"), 3)
@@ -559,7 +564,6 @@ class SimpleL2capTest(GdBaseTestClass):
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            self._register_callbacks(l2cap_log_stream)
            self._setup_link(l2cap_event_asserts)

            signal_id = 3
@@ -570,12 +574,6 @@ class SimpleL2capTest(GdBaseTestClass):
            self.device_under_test.l2cap.SetDynamicChannel(
                l2cap_facade_pb2.SetEnableDynamicChannelRequest(
                    psm=psm, retransmission_mode=mode))
            self.cert_device.l2cap.SendConnectionRequest(
                l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))

            l2cap_log_stream.unregister_callback(
                self.handle_Connection_response,
                matcher_fn=is_configuration_response)

            def handle_connection_response(log):
                log = log.connection_response
@@ -593,10 +591,6 @@ class SimpleL2capTest(GdBaseTestClass):
            l2cap_log_stream.register_callback(
                handle_connection_response, matcher_fn=is_connection_response)

            l2cap_log_stream.unregister_callback(
                self.handle_configuration_request,
                matcher_fn=is_configuration_request)

            def handle_configuration_request(log):
                log = log.configuration_request
                if log.dcid not in self.scid_dcid_map:
@@ -617,6 +611,23 @@ class SimpleL2capTest(GdBaseTestClass):
                handle_configuration_request,
                matcher_fn=is_configuration_request)

            def handle_information_request(log):
                log = log.information_request
                self.cert_device.l2cap.SendInformationResponse(
                    l2cap_cert_pb2.InformationResponse(
                        type=log.type, signal_id=log.signal_id))

            l2cap_log_stream.register_callback(
                handle_information_request, matcher_fn=is_information_request)

            self.cert_device.l2cap.SendConnectionRequest(
                l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))

            l2cap_event_asserts.assert_event_occurs(
                lambda log: is_configuration_response(log) and scid == log.configuration_response.scid
            )
            time.sleep(0.5)

            self.cert_device.l2cap.SendIFrame(
                l2cap_cert_pb2.IFrame(
                    channel=self.scid_dcid_map[scid],
@@ -630,9 +641,8 @@ class SimpleL2capTest(GdBaseTestClass):
                    tx_seq=(self.tx_window - 1),
                    sar=0))
            l2cap_event_asserts.assert_event_occurs(
                lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01'
                lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01'
            )

            self.cert_device.l2cap.SendSFrame(
                l2cap_cert_pb2.SFrame(
                    channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0))
@@ -674,9 +684,8 @@ class SimpleL2capTest(GdBaseTestClass):
                l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))

            l2cap_event_asserts.assert_event_occurs(
                lambda log: is_configuration_request(log) and \
                lambda log: is_configuration_response(log) and \
                    log.configuration_response.scid == scid and\
                    log.configuration_response.result == log.l2cap_cert_pb2.ConfigurationResult.SUCCESS and \
                    log.HasField("retransmission_config") and \
                    log.configuration_response.result == l2cap_cert_pb2.ConfigurationResult.SUCCESS and \
                    log.configuration_response.retransmission_config.mode
                        == l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM)
Loading