Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 717697eb authored by Martin Brabham's avatar Martin Brabham
Browse files

PyHci: Modify to allow acl_streaming flag.

Bug: 145638034
Test: ./cert/run --host
Change-Id: I3687592c7a45c784cf7a249daa41c48154ddab5a
parent d8170d3d
Loading
Loading
Loading
Loading
+31 −8
Original line number Diff line number Diff line
@@ -56,19 +56,36 @@ class PyHciAclConnection(IEventStream):

class PyHci(Closable):

    def __init__(self, device):
    event_stream = None
    le_event_stream = None
    acl_stream = None

    def __init__(self, device, acl_streaming=False):
        """
            If you are planning on personally using the ACL data stream
            coming from HCI, specify acl_streaming=True. You probably only
            want this if you are testing HCI itself.
        """
        self.device = device

        self._setup_event_stream()
        self._setup_le_event_stream()
        if acl_streaming:
            self.register_for_events(
                hci_packets.EventCode.ROLE_CHANGE,
                hci_packets.EventCode.CONNECTION_REQUEST,
                hci_packets.EventCode.CONNECTION_COMPLETE,
                hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
            self._setup_acl_stream()

    def _setup_event_stream(self):
        self.event_stream = EventStream(
            self.device.hci.FetchEvents(empty_proto.Empty()))

    def _setup_le_event_stream(self):
        self.le_event_stream = EventStream(
            self.device.hci.FetchLeSubevents(empty_proto.Empty()))

    def _setup_acl_stream(self):
        self.acl_stream = EventStream(
            self.device.hci.FetchAclPackets(empty_proto.Empty()))

@@ -84,6 +101,9 @@ class PyHci(Closable):
        return self.le_event_stream

    def get_raw_acl_stream(self):
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" %
                            self.__class__.__name__)
        return self.acl_stream

    def register_for_events(self, *event_codes):
@@ -142,4 +162,7 @@ class PyHci(Closable):
        assertThat(self.event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" %
                            self.__class__.__name__)
        return PyHciAclConnection(handle, self.acl_stream, self.device)
+1 −1
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@ class AclManagerTest(GdBaseTestClass):
    # todo: move into GdBaseTestClass, based on modules inited
    def setup_test(self):
        super().setup_test()
        self.cert_hci = PyHci(self.cert)
        self.cert_hci = PyHci(self.cert, acl_streaming=True)
        self.dut_acl_manager = PyAclManager(self.dut)

    def teardown_test(self):
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ class DirectHciTest(GdBaseTestClass):

    def setup_test(self):
        super().setup_test()
        self.dut_hci = PyHci(self.dut)
        self.dut_hci = PyHci(self.dut, acl_streaming=True)
        self.cert_hal = PyHal(self.cert)
        self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize())

+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ class NeighborTest(GdBaseTestClass):

    def setup_test(self):
        super().setup_test()
        self.cert_hci = PyHci(self.cert)
        self.cert_hci = PyHci(self.cert, acl_streaming=True)
        self.cert_hci.send_command_with_complete(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))