Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 40232190 authored by Zach Johnson's avatar Zach Johnson
Browse files

Flesh out PyAclManager a little more

Test: cert/run --host --test_filter=AclManagerTest
Change-Id: Ifad59da2bbac45b897547711d638d7bbca397372
parent 76a6c90b
Loading
Loading
Loading
Loading
+14 −45
Original line number Diff line number Diff line
@@ -47,29 +47,16 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            cert_hci.enable_inquiry_and_page_scan()
            cert_address = cert_hci.read_own_address()

            with EventStream(
                    self.dut.hci_acl_manager.CreateConnection(
                        acl_manager_facade.ConnectionMsg(
                            address_type=int(
                                hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
                            address=bytes(cert_address,
                                          'utf8')))) as connection_event_stream:

            with dut_acl_manager.initiate_connection(cert_address) as dut_acl:
                cert_acl = cert_hci.accept_connection()
                cert_acl.send_first(
                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert')

                # DUT gets a connection complete event and sends and receives
                connection_complete = ConnectionCompleteCapture()
                connection_event_stream.assert_event_occurs(connection_complete)
                dut_handle = connection_complete.get().GetConnectionHandle()
                dut_acl.wait_for_connection_complete()

                self.dut.hci_acl_manager.SendAclData(
                    acl_manager_facade.AclData(
                        handle=dut_handle,
                        payload=bytes(
                dut_acl.send(
                    b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                        )))
                )

                assertThat(cert_hci.get_acl_stream()).emits(
                    lambda packet: b'SomeMoreAclData' in packet.data)
@@ -78,8 +65,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):

    def test_cert_connects(self):
        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
            PyAclManager(self.dut) as dut_acl_manager:

            # DUT Enables scans and gets its address
            dut_address = self.dut.hci_controller.GetMacAddressSimple()
@@ -87,19 +73,13 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            self.dut.neighbor.EnablePageScan(
                neighbor_facade.EnableMsg(enabled=True))

            dut_acl_manager.listen_for_incoming_connections()

            cert_hci.initiate_connection(dut_address)
            dut_acl = dut_acl_manager.accept_connection()

            # DUT gets a connection request
            connection_complete = ConnectionCompleteCapture()
            assertThat(incoming_connection_stream).emits(connection_complete)
            dut_handle = connection_complete.get().GetConnectionHandle()

            self.dut.hci_acl_manager.SendAclData(
                acl_manager_facade.AclData(
                    handle=dut_handle,
                    payload=bytes(
                        b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                    )))
            dut_acl.send(
                b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')

            cert_acl = cert_hci.complete_connection()
            cert_acl.send_first(
@@ -107,34 +87,23 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):

            assertThat(cert_hci.get_acl_stream()).emits(
                lambda packet: b'SomeMoreAclData' in packet.data)
            assertThat(acl_data_stream).emits(
            assertThat(dut_acl_manager.get_acl_stream()).emits(
                lambda packet: b'SomeAclData' in packet.payload)

    def test_recombination_l2cap_packet(self):
        with PyHci(self.cert) as cert_hci, \
            PyAclManager(self.dut) as dut_acl_manager:

            # CERT Enables scans and gets its address
            cert_hci.enable_inquiry_and_page_scan()
            cert_address = cert_hci.read_own_address()

            with EventStream(
                    self.dut.hci_acl_manager.CreateConnection(
                        acl_manager_facade.ConnectionMsg(
                            address_type=int(
                                hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
                            address=bytes(cert_address,
                                          'utf8')))) as connection_event_stream:

            with dut_acl_manager.initiate_connection(cert_address) as dut_acl:
                cert_acl = cert_hci.accept_connection()
                cert_acl.send_first(b'\x06\x00\x07\x00Hello')
                cert_acl.send_continuing(b'!')
                cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)

                # DUT gets a connection complete event and sends and receives
                connection_complete = ConnectionCompleteCapture()
                connection_event_stream.assert_event_occurs(connection_complete)
                dut_handle = connection_complete.get().GetConnectionHandle()
                dut_acl.wait_for_connection_complete()

                assertThat(dut_acl_manager.get_acl_stream()).emits(
                    lambda packet: b'Hello!' in packet.payload,
+58 −0
Original line number Diff line number Diff line
@@ -22,6 +22,47 @@ from captures import ConnectionRequestCapture
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
from hci.facade import facade_pb2 as hci_facade
from hci.facade import acl_manager_facade_pb2 as acl_manager_facade


class PyAclManagerAclConnection(object):

    def __init__(self, device, remote_addr, handle):
        self.device = device
        self.handle = handle

        if remote_addr:
            self.connection_event_stream = EventStream(
                self.device.hci_acl_manager.CreateConnection(
                    acl_manager_facade.ConnectionMsg(
                        address_type=int(
                            hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
                        address=bytes(remote_addr, 'utf8'))))
        else:
            self.connection_event_stream = None

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.clean_up()
        return traceback is None

    def __del__(self):
        self.clean_up()

    def clean_up(self):
        if self.connection_event_stream is not None:
            self.connection_event_stream.shutdown()

    def wait_for_connection_complete(self):
        connection_complete = ConnectionCompleteCapture()
        assertThat(self.connection_event_stream).emits(connection_complete)
        self.handle = connection_complete.get().GetConnectionHandle()

    def send(self, data):
        self.device.hci_acl_manager.SendAclData(
            acl_manager_facade.AclData(handle=self.handle, payload=bytes(data)))


class PyAclManager(object):
@@ -31,6 +72,7 @@ class PyAclManager(object):

        self.acl_stream = EventStream(
            self.device.hci_acl_manager.FetchAclData(empty_proto.Empty()))
        self.incoming_connection_stream = None

    def __enter__(self):
        return self
@@ -44,6 +86,22 @@ class PyAclManager(object):

    def clean_up(self):
        self.acl_stream.shutdown()
        if self.incoming_connection_stream is not None:
            self.incoming_connection_stream.shutdown()

    def listen_for_incoming_connections(self):
        self.incoming_connection_stream = EventStream(
            self.device.hci_acl_manager.FetchIncomingConnection(
                empty_proto.Empty()))

    def get_acl_stream(self):
        return self.acl_stream

    def initiate_connection(self, remote_addr):
        return PyAclManagerAclConnection(self.device, remote_addr, None)

    def accept_connection(self):
        connection_complete = ConnectionCompleteCapture()
        assertThat(self.incoming_connection_stream).emits(connection_complete)
        handle = connection_complete.get().GetConnectionHandle()
        return PyAclManagerAclConnection(self.device, None, handle)