Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d67e03e8 authored by Automerger Merge Worker's avatar Automerger Merge Worker
Browse files

Merge "Add function on hci to send commands (with status and with complete)"...

Merge "Add function on hci to send commands (with status and with complete)" am: 35cf46c2 am: 27ce12f7 am: 6baa0ba6 am: b2687993

Change-Id: Ia4ffcd2b28f532b499bfda577e0c6d05dc202d80
parents 74dad0ac b2687993
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -87,6 +87,8 @@ class GdDevice(GdDeviceBase):
        self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel)
        self.hci.register_for_events = self.__register_for_hci_events
        self.hci.new_event_stream = lambda: EventStream(self.hci.FetchEvents(empty_proto.Empty()))
        self.hci.send_command_with_complete = self.__send_hci_command_with_complete
        self.hci.send_command_with_status = self.__send_hci_command_with_status
        self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(
            self.grpc_channel)
        self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub(
@@ -110,3 +112,13 @@ class GdDevice(GdDeviceBase):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.hci.RegisterEventHandler(msg)

    def __send_hci_command_with_complete(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithComplete(cmd)

    def __send_hci_command_with_status(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithStatus(cmd)
+15 −25
Original line number Diff line number Diff line
@@ -38,14 +38,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.cert.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.cert.hci.EnqueueCommandWithStatus(cmd)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
            handle=int(handle),
@@ -65,11 +57,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -87,11 +80,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)

                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
@@ -135,22 +127,20 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # DUT Enables scans and gets its address
            dut_address = self.dut.hci_controller.GetMacAddress(
                empty_proto.Empty()).address
            dut_address = self.dut.hci_controller.GetMacAddressSimple()

            self.dut.neighbor.EnablePageScan(
                neighbor_facade.EnableMsg(enabled=True))

            # Cert connects
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_status(
                hci_packets.CreateConnectionBuilder(
                    dut_address.decode('utf-8'),
                    0xcc18,  # Packet Type
                    hci_packets.PageScanRepetitionMode.R1,
                    0x0,
                    hci_packets.ClockOffsetValid.INVALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
                False)
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

            # DUT gets a connection request
            connection_complete = ConnectionCompleteCapture()
@@ -191,11 +181,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -212,11 +203,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)
                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()