Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 35cf46c2 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Add function on hci to send commands (with status and with complete)"

parents 3b1f2c52 d3fc2f3c
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -87,6 +87,8 @@ class GdDevice(GdDeviceBase):
        self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel)
        self.hci.register_for_events = self.__register_for_hci_events
        self.hci.new_event_stream = lambda: EventStream(self.hci.FetchEvents(empty_proto.Empty()))
        self.hci.send_command_with_complete = self.__send_hci_command_with_complete
        self.hci.send_command_with_status = self.__send_hci_command_with_status
        self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(
            self.grpc_channel)
        self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub(
@@ -110,3 +112,13 @@ class GdDevice(GdDeviceBase):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.hci.RegisterEventHandler(msg)

    def __send_hci_command_with_complete(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithComplete(cmd)

    def __send_hci_command_with_status(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.hci.EnqueueCommandWithStatus(cmd)
+15 −25
Original line number Diff line number Diff line
@@ -38,14 +38,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.cert.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.cert.hci.EnqueueCommandWithStatus(cmd)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
            handle=int(handle),
@@ -65,11 +57,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -87,11 +80,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)

                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()
@@ -135,22 +127,20 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # DUT Enables scans and gets its address
            dut_address = self.dut.hci_controller.GetMacAddress(
                empty_proto.Empty()).address
            dut_address = self.dut.hci_controller.GetMacAddressSimple()

            self.dut.neighbor.EnablePageScan(
                neighbor_facade.EnableMsg(enabled=True))

            # Cert connects
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_status(
                hci_packets.CreateConnectionBuilder(
                    dut_address.decode('utf-8'),
                    0xcc18,  # Packet Type
                    hci_packets.PageScanRepetitionMode.R1,
                    0x0,
                    hci_packets.ClockOffsetValid.INVALID,
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
                False)
                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

            # DUT gets a connection request
            connection_complete = ConnectionCompleteCapture()
@@ -191,11 +181,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
            EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:

            # CERT Enables scans and gets its address
            self.enqueue_hci_command(
            self.cert.hci.send_command_with_complete(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
            self.cert.hci.send_command_with_complete(
                hci_packets.ReadBdAddrBuilder())

            read_bd_addr = ReadBdAddrCompleteCapture()
            assertThat(cert_hci_event_stream).emits(read_bd_addr)
@@ -212,11 +203,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                # Cert Accepts
                connection_request = ConnectionRequestCapture()
                assertThat(cert_hci_event_stream).emits(connection_request)
                self.enqueue_hci_command(
                self.cert.hci.send_command_with_status(
                    hci_packets.AcceptConnectionRequestBuilder(
                        connection_request.get().GetBdAddr(),
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
                    False)
                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))

                # Cert gets ConnectionComplete with a handle and sends ACL data
                connection_complete = ConnectionCompleteCapture()