Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d49f7fec authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Add PyHciAclConnection"

parents 7d23cee7 644e1949
Loading
Loading
Loading
Loading
+11 −94
Original line number Diff line number Diff line
@@ -39,20 +39,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
            handle=int(handle),
            packet_boundary_flag=int(pb_flag),
            broadcast_flag=int(b_flag),
            data=acl)
        self.cert.hci.SendAclData(acl_msg)

    def test_dut_connects(self):
        self.cert.hci.register_for_events(
            hci_packets.EventCode.CONNECTION_REQUEST,
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

@@ -67,29 +54,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                            address=bytes(cert_address,
                                          'utf8')))) as connection_event_stream:

                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_request)

                cert_hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_complete)
                cert_handle = connection_complete.get().GetConnectionHandle()

                self.enqueue_acl_data(
                    cert_handle, hci_packets.PacketBoundaryFlag.
                    FIRST_AUTOMATICALLY_FLUSHABLE,
                    hci_packets.BroadcastFlag.POINT_TO_POINT,
                    bytes(
                        b'\x26\x00\x07\x00This is just SomeAclData from the Cert'
                    ))
                cert_acl = cert_hci.accept_connection()
                cert_acl.send_first(
                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert')

                # DUT gets a connection complete event and sends and receives
                connection_complete = ConnectionCompleteCapture()
@@ -109,11 +76,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                    lambda packet: b'SomeAclData' in packet.payload)

    def test_cert_connects(self):
        self.cert.hci.register_for_events(
            hci_packets.EventCode.ROLE_CHANGE,
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
@@ -124,15 +86,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            self.dut.neighbor.EnablePageScan(
                neighbor_facade.EnableMsg(enabled=True))

            # Cert connects
            cert_hci.send_command_with_status(
                hci_packets.CreateConnectionBuilder(
                    dut_address.decode('utf-8'),
                    0xcc18,  # Packet Type
                    hci_packets.PageScanRepetitionMode.R1,
                    0x0,
                    hci_packets.ClockOffsetValid.INVALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
            cert_hci.initiate_connection(dut_address)

            # DUT gets a connection request
            connection_complete = ConnectionCompleteCapture()
@@ -146,16 +100,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                        b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                    )))

            connection_complete = ConnectionCompleteCapture()
            assertThat(cert_hci.get_event_stream()).emits(connection_complete)
            cert_handle = connection_complete.get().GetConnectionHandle()

            self.enqueue_acl_data(
                cert_handle,
                hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                hci_packets.BroadcastFlag.POINT_TO_POINT,
                bytes(
                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
            cert_acl = cert_hci.complete_connection()
            cert_acl.send_first(
                b'\x26\x00\x07\x00This is just SomeAclData from the Cert')

            assertThat(cert_hci.get_acl_stream()).emits(
                lambda packet: b'SomeMoreAclData' in packet.data)
@@ -163,11 +110,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                lambda packet: b'SomeAclData' in packet.payload)

    def test_recombination_l2cap_packet(self):
        self.cert.hci.register_for_events(
            hci_packets.EventCode.CONNECTION_REQUEST,
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        with PyHci(self.cert) as cert_hci, \
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

@@ -183,35 +125,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                            address=bytes(cert_address,
                                          'utf8')))) as connection_event_stream:

                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_request)
                cert_hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
                assertThat(
                    cert_hci.get_event_stream()).emits(connection_complete)
                cert_handle = connection_complete.get().GetConnectionHandle()

                self.enqueue_acl_data(
                    cert_handle, hci_packets.PacketBoundaryFlag.
                    FIRST_AUTOMATICALLY_FLUSHABLE,
                    hci_packets.BroadcastFlag.POINT_TO_POINT,
                    bytes(b'\x06\x00\x07\x00Hello'))
                self.enqueue_acl_data(
                    cert_handle,
                    hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
                    hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
                self.enqueue_acl_data(
                    cert_handle, hci_packets.PacketBoundaryFlag.
                    FIRST_AUTOMATICALLY_FLUSHABLE,
                    hci_packets.BroadcastFlag.POINT_TO_POINT,
                    bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200))
                cert_acl = cert_hci.accept_connection()
                cert_acl.send_first(b'\x06\x00\x07\x00Hello')
                cert_acl.send_continuing(b'!')
                cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)

                # DUT gets a connection complete event and sends and receives
                connection_complete = ConnectionCompleteCapture()
+61 −0
Original line number Diff line number Diff line
@@ -17,14 +17,48 @@
from google.protobuf import empty_pb2 as empty_proto
from cert.event_stream import EventStream
from captures import ReadBdAddrCompleteCapture
from captures import ConnectionCompleteCapture
from captures import ConnectionRequestCapture
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
from hci.facade import facade_pb2 as hci_facade


class PyHciAclConnection(object):

    def __init__(self, handle, acl_stream, device):
        self.handle = handle
        self.acl_stream = acl_stream
        self.device = device

    def send(self, pb_flag, b_flag, data):
        acl_msg = hci_facade.AclMsg(
            handle=int(self.handle),
            packet_boundary_flag=int(pb_flag),
            broadcast_flag=int(b_flag),
            data=data)
        self.device.hci.SendAclData(acl_msg)

    def send_first(self, data):
        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))

    def send_continuing(self, data):
        self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))


class PyHci(object):

    def __init__(self, device):
        self.device = device

        self.device.hci.register_for_events(
            hci_packets.EventCode.ROLE_CHANGE,
            hci_packets.EventCode.CONNECTION_REQUEST,
            hci_packets.EventCode.CONNECTION_COMPLETE,
            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)

        self.event_stream = self.device.hci.new_event_stream()
        self.acl_stream = EventStream(
            self.device.hci.FetchAclPackets(empty_proto.Empty()))
@@ -65,3 +99,30 @@ class PyHci(object):
        read_bd_addr = ReadBdAddrCompleteCapture()
        assertThat(self.event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()

    def initiate_connection(self, remote_addr):
        self.send_command_with_status(
            hci_packets.CreateConnectionBuilder(
                remote_addr.decode('utf-8'),
                0xcc18,  # Packet Type
                hci_packets.PageScanRepetitionMode.R1,
                0x0,
                hci_packets.ClockOffsetValid.INVALID,
                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

    def accept_connection(self):
        connection_request = ConnectionRequestCapture()
        assertThat(self.event_stream).emits(connection_request)

        self.send_command_with_status(
            hci_packets.AcceptConnectionRequestBuilder(
                connection_request.get().GetBdAddr(),
                hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
        return self.complete_connection()

    def complete_connection(self):
        connection_complete = ConnectionCompleteCapture()
        assertThat(self.event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHciAclConnection(handle, self.acl_stream, self.device)