Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7d23cee7 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge changes If61ab95a,Id9b7d89a

* changes:
  Add PyHci helpers for page/scan and reading own address
  Add PyHci, to simplify interactions with HCI (not HCI_INTERFACES)
parents 35cf46c2 3d223791
Loading
Loading
Loading
Loading
+22 −37
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ from bluetooth_packets_python3 import hci_packets
from captures import ReadBdAddrCompleteCapture
from captures import ConnectionCompleteCapture
from captures import ConnectionRequestCapture
from py_hci import PyHci


class AclManagerTest(GdFacadeOnlyBaseTestClass):
@@ -52,21 +53,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
            cert_address = read_bd_addr.get().GetBdAddr()
            cert_hci.enable_inquiry_and_page_scan()
            cert_address = cert_hci.read_own_address()

            with EventStream(
                    self.dut.hci_acl_manager.CreateConnection(
@@ -78,16 +69,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):

                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_request)

                self.cert.hci.send_command_with_status(
                cert_hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
                assertThat(cert_hci_event_stream).emits(connection_complete)
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_complete)
                cert_handle = connection_complete.get().GetConnectionHandle()

                self.enqueue_acl_data(
@@ -110,7 +103,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                            b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                        )))

                assertThat(cert_acl_data_stream).emits(
                assertThat(cert_hci.get_acl_stream()).emits(
                    lambda packet: b'SomeMoreAclData' in packet.data)
                assertThat(acl_data_stream).emits(
                    lambda packet: b'SomeAclData' in packet.payload)
@@ -121,8 +114,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

@@ -133,7 +125,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                neighbor_facade.EnableMsg(enabled=True))

            # Cert connects
            self.cert.hci.send_command_with_status(
            cert_hci.send_command_with_status(
                hci_packets.CreateConnectionBuilder(
                    dut_address.decode('utf-8'),
                    0xcc18,  # Packet Type
@@ -155,7 +147,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                    )))

            connection_complete = ConnectionCompleteCapture()
            assertThat(cert_hci_event_stream).emits(connection_complete)
            assertThat(cert_hci.get_event_stream()).emits(connection_complete)
            cert_handle = connection_complete.get().GetConnectionHandle()

            self.enqueue_acl_data(
@@ -165,7 +157,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                bytes(
                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))

            assertThat(cert_acl_data_stream).emits(
            assertThat(cert_hci.get_acl_stream()).emits(
                lambda packet: b'SomeMoreAclData' in packet.data)
            assertThat(acl_data_stream).emits(
                lambda packet: b'SomeAclData' in packet.payload)
@@ -176,21 +168,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
            cert_address = read_bd_addr.get().GetBdAddr()
            cert_hci.enable_inquiry_and_page_scan()
            cert_address = cert_hci.read_own_address()

            with EventStream(
                    self.dut.hci_acl_manager.CreateConnection(
@@ -202,15 +185,17 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):

                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)
                self.cert.hci.send_command_with_status(
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_request)
                cert_hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
                assertThat(cert_hci_event_stream).emits(connection_complete)
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_complete)
                cert_handle = connection_complete.get().GetConnectionHandle()

                self.enqueue_acl_data(
+67 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from google.protobuf import empty_pb2 as empty_proto
from cert.event_stream import EventStream
from captures import ReadBdAddrCompleteCapture
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat


class PyHci(object):

    def __init__(self, device):
        self.device = device
        self.event_stream = self.device.hci.new_event_stream()
        self.acl_stream = EventStream(
            self.device.hci.FetchAclPackets(empty_proto.Empty()))

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.clean_up()
        return traceback is None

    def __del__(self):
        self.clean_up()

    def clean_up(self):
        self.event_stream.shutdown()
        self.acl_stream.shutdown()

    def get_event_stream(self):
        return self.event_stream

    def get_acl_stream(self):
        return self.acl_stream

    def send_command_with_complete(self, command):
        self.device.hci.send_command_with_complete(command)

    def send_command_with_status(self, command):
        self.device.hci.send_command_with_status(command)

    def enable_inquiry_and_page_scan(self):
        self.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

    def read_own_address(self):
        self.send_command_with_complete(hci_packets.ReadBdAddrBuilder())
        read_bd_addr = ReadBdAddrCompleteCapture()
        assertThat(self.event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()