Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6baa0ba6 authored by Automerger Merge Worker's avatar Automerger Merge Worker
Browse files

Merge "Add function on hci to send commands (with status and with complete)"...

Merge "Add function on hci to send commands (with status and with complete)" am: 35cf46c2 am: 27ce12f7

Change-Id: Idf3bf49b89f970fcd1118c4ff6e7402915edbb57
parents e7d98904 27ce12f7
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -87,6 +87,8 @@ class GdDevice(GdDeviceBase):
        self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel)
        self.hci.register_for_events = self.__register_for_hci_events
        self.hci.new_event_stream = lambda: EventStream(self.hci.FetchEvents(empty_proto.Empty()))
        self.hci.send_command_with_complete = self.__send_hci_command_with_complete
        self.hci.send_command_with_status = self.__send_hci_command_with_status
        self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(
            self.grpc_channel)
        self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub(
@@ -110,3 +112,13 @@ class GdDevice(GdDeviceBase):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.hci.RegisterEventHandler(msg)

    def __send_hci_command_with_complete(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithComplete(cmd)

    def __send_hci_command_with_status(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithStatus(cmd)
+15 −25
Original line number Diff line number Diff line
@@ -38,14 +38,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.cert.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.cert.hci.EnqueueCommandWithStatus(cmd)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
            handle=int(handle),
@@ -65,11 +57,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -87,11 +80,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)

                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
@@ -135,22 +127,20 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # DUT Enables scans and gets its address
            dut_address = self.dut.hci_controller.GetMacAddress(
                empty_proto.Empty()).address
            dut_address = self.dut.hci_controller.GetMacAddressSimple()

            self.dut.neighbor.EnablePageScan(
                neighbor_facade.EnableMsg(enabled=True))

            # Cert connects
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_status(
                hci_packets.CreateConnectionBuilder(
                    dut_address.decode('utf-8'),
                    0xcc18,  # Packet Type
                    hci_packets.PageScanRepetitionMode.R1,
                    0x0,
                    hci_packets.ClockOffsetValid.INVALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
                False)
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

            # DUT gets a connection request
            connection_complete = ConnectionCompleteCapture()
@@ -191,11 +181,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -212,11 +203,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)
                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()