Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 737bbcff authored by Myles Watson's avatar Myles Watson Committed by Hansong Zhang
Browse files

Neighbor: Check addresses in tests

Test: ./cert/run --host --test_filter=NeighborTest
Change-Id: I3c499dc690548e476964e5b9870b93e4589e3aa6
parent 7d49fe8b
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -89,7 +89,6 @@ class GdDevice(GdDeviceBase):
            self.grpc_channel)
        self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel)
        self.hci.register_for_events = self.__register_for_hci_events
        self.hci.new_event_stream = lambda: EventStream(self.hci.FetchEvents(empty_proto.Empty()))
        self.hci.send_command_with_complete = self.__send_hci_command_with_complete
        self.hci.send_command_with_status = self.__send_hci_command_with_status
        self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(
+80 −0
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3.hci_packets import EventCode
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
@@ -22,6 +24,84 @@ from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult


class HciMatchers(object):

    @staticmethod
    def CommandComplete(opcode=None, num_complete=1):
        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode, num_complete)

    @staticmethod
    def _is_matching_command_complete(packet_bytes, opcode=None,
                                      num_complete=1):
        hci_event = HciMatchers.extract_hci_event_with_code(
            packet_bytes, EventCode.COMMAND_COMPLETE)
        if hci_event is None:
            return False
        frame = hci_packets.CommandCompleteView(hci_event)
        return (opcode is None or frame.GetCommandOpCode() == opcode) and\
               frame.GetNumHciCommandPackets() == num_complete

    @staticmethod
    def extract_hci_event_with_code(packet_bytes, event_code=None):
        hci_event = hci_packets.EventPacketView(
            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
        if hci_event is None:
            return None
        if event_code is not None and hci_event.GetEventCode() != event_code:
            return None
        return hci_event


class NeighborMatchers(object):

    @staticmethod
    def InquiryResult(address):
        return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address)

    @staticmethod
    def _is_matching_inquiry_result(packet, address):
        hci_event = HciMatchers.extract_hci_event_with_code(
            packet, EventCode.INQUIRY_RESULT)
        if hci_event is None:
            return False
        inquiry_view = hci_packets.InquiryResultView(hci_event)
        if inquiry_view is None:
            return False
        results = inquiry_view.GetInquiryResults()
        return any((address == result.bd_addr for result in results))

    @staticmethod
    def InquiryResultwithRssi(address):
        return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address)

    @staticmethod
    def _is_matching_inquiry_result_with_rssi(packet, address):
        hci_event = HciMatchers.extract_hci_event_with_code(
            packet, EventCode.INQUIRY_RESULT_WITH_RSSI)
        if hci_event is None:
            return False
        inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event)
        if inquiry_view is None:
            return False
        results = inquiry_view.GetInquiryResults()
        return any((address == result.address for result in results))

    @staticmethod
    def ExtendedInquiryResult(address):
        return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address)

    @staticmethod
    def _is_matching_extended_inquiry_result(packet, address):
        hci_event = HciMatchers.extract_hci_event_with_code(
            packet, EventCode.EXTENDED_INQUIRY_RESULT)
        if hci_event is None:
            return False
        extended_view = hci_packets.ExtendedInquiryResultView(hci_event)
        if extended_view is None:
            return False
        return address == extended_view.GetAddress()


class L2capMatchers(object):

    @staticmethod
+2 −1
Original line number Diff line number Diff line
@@ -67,7 +67,8 @@ class PyHci(Closable):
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        self.event_stream = self.device.hci.new_event_stream()
        self.event_stream = EventStream(
            self.device.hci.FetchEvents(empty_proto.Empty()))
        self.acl_stream = EventStream(
            self.device.hci.FetchAclPackets(empty_proto.Empty()))

+60 −107
Original line number Diff line number Diff line
@@ -14,19 +14,16 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import os
import sys
import logging
from datetime import timedelta

from cert.gd_base_test import GdBaseTestClass
from cert.event_stream import EventStream
from google.protobuf import empty_pb2 as empty_proto
from facade import rootservice_pb2 as facade_rootservice
from hci.facade import facade_pb2 as hci_facade
from hci.facade import controller_facade_pb2 as controller_facade
from cert.matchers import HciMatchers, NeighborMatchers
from cert.py_hci import PyHci
from cert.truth import assertThat
from neighbor.cert.py_neighbor import PyNeighbor
from neighbor.facade import facade_pb2 as neighbor_facade
from bluetooth_packets_python3 import hci_packets
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3.hci_packets import OpCode


class NeighborTest(GdBaseTestClass):
@@ -34,25 +31,30 @@ class NeighborTest(GdBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def register_for_dut_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.dut.hci.RegisterEventHandler(msg)

    def register_for_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.cert.hci.RegisterEventHandler(msg)

    def register_for_le_event(self, event_code):
        msg = hci_facade.LeSubeventCodeMsg(code=int(event_code))
        self.cert.hci.RegisterLeEventHandler(msg)
    def setup_test(self):
        super().setup_test()
        self.cert_hci = PyHci(self.cert)
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
        self.cert_name = b'Im_A_Cert'
        self.cert_address = self.cert_hci.read_own_address()
        self.cert_name += b'@' + self.cert_address.encode('utf8')
        self.dut_neighbor = PyNeighbor(self.dut)

    def teardown_test(self):
        self.cert_hci.close()
        super().teardown_test()

    def _set_name(self):
        padded_name = self.cert_name
        while len(padded_name) < 248:
            padded_name = padded_name + b'\0'
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteLocalNameBuilder(padded_name))

    def enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.cert.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.cert.hci.EnqueueCommandWithStatus(cmd)
        assertThat(self.cert_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.WRITE_LOCAL_NAME))

    def test_inquiry_from_dut(self):
        inquiry_msg = neighbor_facade.InquiryMsg(
@@ -60,101 +62,52 @@ class NeighborTest(GdBaseTestClass):
            result_mode=neighbor_facade.ResultMode.STANDARD,
            length_1_28s=3,
            max_results=0)
        with EventStream(self.dut.neighbor.SetInquiryMode(
                inquiry_msg)) as inquiry_event_stream:
            self.enqueue_hci_command(
        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            inquiry_event_stream.assert_event_occurs(
                lambda msg: b'\x02\x0f' in msg.packet
                # Expecting an HCI Event (code 0x02, length 0x0f)
            )
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
        assertThat(session).emits(
            NeighborMatchers.InquiryResult(self.cert_address),
            timeout=timedelta(seconds=10))

    def test_inquiry_rssi_from_dut(self):
        inquiry_msg = neighbor_facade.InquiryMsg(
            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
            result_mode=neighbor_facade.ResultMode.RSSI,
            length_1_28s=3,
            length_1_28s=6,
            max_results=0)
        with EventStream(self.dut.neighbor.SetInquiryMode(
                inquiry_msg)) as inquiry_event_stream:
            self.enqueue_hci_command(
        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            inquiry_event_stream.assert_event_occurs(
                lambda msg: b'\x22\x0f' in msg.packet
                # Expecting an HCI Event (code 0x22, length 0x0f)
            )
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
        assertThat(session).emits(
            NeighborMatchers.InquiryResultwithRssi(self.cert_address),
            timeout=timedelta(seconds=10))

    def test_inquiry_extended_from_dut(self):
        name_string = b'Im_A_Cert'
        self._set_name()
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(name_string))
        gap_name.data = list(bytes(self.cert_name))
        gap_data = list([gap_name])

        self.enqueue_hci_command(
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteExtendedInquiryResponseBuilder(
                hci_packets.FecRequired.NOT_REQUIRED, gap_data), True)
                hci_packets.FecRequired.NOT_REQUIRED, gap_data))
        inquiry_msg = neighbor_facade.InquiryMsg(
            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
            result_mode=neighbor_facade.ResultMode.EXTENDED,
            length_1_28s=3,
            length_1_28s=8,
            max_results=0)
        with EventStream(self.dut.neighbor.SetInquiryMode(
                inquiry_msg)) as inquiry_event_stream:
            self.enqueue_hci_command(
        session = self.dut_neighbor.set_inquiry_mode(inquiry_msg)
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            inquiry_event_stream.assert_event_occurs(
                lambda msg: name_string in msg.packet)
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
        assertThat(session).emits(
            NeighborMatchers.ExtendedInquiryResult(self.cert_address),
            timeout=timedelta(seconds=10))

    def test_remote_name(self):
        self.register_for_dut_event(
            hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)

        with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
            EventStream(self.dut.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as name_event_stream:

            cert_name = b'Im_A_Cert'
            padded_name = cert_name
            while len(padded_name) < 248:
                padded_name = padded_name + b'\0'
            self.enqueue_hci_command(
                hci_packets.WriteLocalNameBuilder(padded_name), True)

            hci_event_stream.assert_event_occurs(
                lambda msg: b'\x0e\x04\x01\x13\x0c' in msg.event)

            address = hci_packets.Address()

            def get_address_from_complete(packet):
                packet_bytes = packet.event
                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
                    nonlocal address
                    addr_view = hci_packets.ReadBdAddrCompleteView(
                        hci_packets.CommandCompleteView(
                            hci_packets.EventPacketView(
                                bt_packets.PacketViewLittleEndian(
                                    list(packet_bytes)))))
                    address = addr_view.GetBdAddr()
                    return True
                return False

            # DUT Enables scans and gets its address
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)

            hci_event_stream.assert_event_occurs(get_address_from_complete)

            cert_address = address.encode('utf8')

            self.dut.neighbor.ReadRemoteName(
                neighbor_facade.RemoteNameRequestMsg(
                    address=cert_address,
                    page_scan_repetition_mode=1,
                    clock_offset=0x6855))
            name_event_stream.assert_event_occurs(
                lambda msg: cert_name in msg.name)
        self._set_name()
        session = self.dut_neighbor.get_remote_name(self.cert_address)
        session.verify_name(self.cert_name)
+91 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from datetime import timedelta

from bluetooth_packets_python3 import hci_packets
from cert.event_stream import EventStream
from cert.event_stream import IEventStream
from cert.closable import Closable
from cert.closable import safeClose
from cert.truth import assertThat
from google.protobuf import empty_pb2 as empty_proto
from hci.facade import facade_pb2 as hci_facade
from neighbor.facade import facade_pb2 as neighbor_facade


class InquirySession(Closable, IEventStream):

    def __init__(self, device, inquiry_msg):
        self.inquiry_event_stream = EventStream(
            device.neighbor.SetInquiryMode(inquiry_msg))

    def get_event_queue(self):
        return self.inquiry_event_stream.get_event_queue()

    def close(self):
        safeClose(self.inquiry_event_stream)


class GetRemoteNameSession(Closable):

    def __init__(self, device):
        self.remote_name_stream = EventStream(
            device.neighbor.GetRemoteNameEvents(empty_proto.Empty()))

    def verify_name(self, name):
        assertThat(self.remote_name_stream).emits(
            lambda msg: bytes(name) in msg.name, timeout=timedelta(seconds=10))

    def close(self):
        safeClose(self.remote_name_stream)


class PyNeighbor(object):

    def __init__(self, device):
        self.device = device
        self.remote_host_supported_features_notification_registered = False

    def set_inquiry_mode(self, inquiry_msg):
        """
        Set the inquiry mode and return a session which can be used for event queue assertion
        """
        return InquirySession(self.device, inquiry_msg)

    def _register_remote_host_supported_features_notification(self):
        """
        REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION event will be sent when a device sends remote name request
        """
        if self.remote_host_supported_features_notification_registered:
            return
        msg = hci_facade.EventCodeMsg(
            code=int(hci_packets.EventCode.
                     REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION))
        self.device.hci.RegisterEventHandler(msg)
        self.remote_host_supported_features_notification_registered = True

    def get_remote_name(self, remote_address):
        """
        Get the remote name and return a session which can be used for event queue assertion
        """
        self._register_remote_host_supported_features_notification()
        self.device.neighbor.ReadRemoteName(
            neighbor_facade.RemoteNameRequestMsg(
                address=remote_address.encode('utf8'),
                page_scan_repetition_mode=1,
                clock_offset=0x6855))
        return GetRemoteNameSession(self.device)
Loading