Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d10b45c4 authored by Zach Johnson's avatar Zach Johnson
Browse files

Improve hci facade proto

reduce redundant or extra words

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: I3994dcf0cb41784a660e6c28987b9be306f42d87
parent ac365b20
Loading
Loading
Loading
Loading
+8 −8
Original line number Diff line number Diff line
@@ -67,53 +67,53 @@ class HciCaptures(object):
    def ReadLocalOobDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
        )

    @staticmethod
    def ReadLocalOobExtendedDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
        )

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR),
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_BD_ADDR)))
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR)))

    @staticmethod
    def ConnectionRequestCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST),
            lambda packet: hci_packets.ConnectionRequestView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_REQUEST)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST)))

    @staticmethod
    def ConnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE),
            lambda packet: hci_packets.ConnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE)))

    @staticmethod
    def DisconnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE),
            lambda packet: hci_packets.DisconnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.DISCONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE)))

    @staticmethod
    def LeConnectionCompleteCapture():
        return Capture(HciMatchers.LeConnectionComplete(),
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.event))
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.payload))

    @staticmethod
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE),
            lambda packet: hci_packets.SimplePairingCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))


class L2capCaptures(object):
+4 −4
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ class HciMatchers(object):

    @staticmethod
    def CommandComplete(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode)
        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
@@ -57,7 +57,7 @@ class HciMatchers(object):

    @staticmethod
    def EventWithCode(event_code):
        return lambda msg: HciMatchers._is_matching_event(msg.event, event_code)
        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)

    @staticmethod
    def ExtractEventWithCode(packet_bytes, event_code):
@@ -78,7 +78,7 @@ class HciMatchers(object):

    @staticmethod
    def LeEventWithCode(subevent_code):
        return lambda msg: HciMatchers._extract_matching_le_event(msg.event, subevent_code) is not None
        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None

    @staticmethod
    def ExtractLeEventWithCode(packet_bytes, subevent_code):
@@ -96,7 +96,7 @@ class HciMatchers(object):

    @staticmethod
    def LeConnectionComplete():
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.event) is not None
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None

    @staticmethod
    def ExtractLeConnectionComplete(packet_bytes):
+9 −24
Original line number Diff line number Diff line
@@ -35,9 +35,9 @@ class PyHciAclConnection(IEventStream):
        self.our_acl_stream = FilteringEventStream(acl_stream, None)

    def send(self, pb_flag, b_flag, data):
        acl_msg = hci_facade.AclMsg(
        acl_msg = hci_facade.AclPacket(
            handle=self.handle, packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data)
        self.device.hci.SendAclData(acl_msg)
        self.device.hci.SendAcl(acl_msg)

    def send_first(self, data):
        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
@@ -64,22 +64,13 @@ class PyHci(Closable):
            want this if you are testing HCI itself.
        """
        self.device = device
        self._setup_event_stream()
        self._setup_le_event_stream()
        self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty()))
        self.le_event_stream = EventStream(self.device.hci.StreamLeSubevents(empty_proto.Empty()))
        if acl_streaming:
            self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
                                     hci_packets.EventCode.CONNECTION_COMPLETE,
                                     hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
            self._setup_acl_stream()

    def _setup_event_stream(self):
        self.event_stream = EventStream(self.device.hci.FetchEvents(empty_proto.Empty()))

    def _setup_le_event_stream(self):
        self.le_event_stream = EventStream(self.device.hci.FetchLeSubevents(empty_proto.Empty()))

    def _setup_acl_stream(self):
        self.acl_stream = EventStream(self.device.hci.FetchAclPackets(empty_proto.Empty()))
            self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))

    def close(self):
        safeClose(self.event_stream)
@@ -99,23 +90,17 @@ class PyHci(Closable):

    def register_for_events(self, *event_codes):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.device.hci.RegisterEventHandler(msg)
            self.device.hci.RequestEvent(hci_facade.EventRequest(code=int(event_code)))

    def register_for_le_events(self, *event_codes):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.device.hci.RegisterLeEventHandler(msg)
            self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code)))

    def send_command_with_complete(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.device.hci.EnqueueCommandWithComplete(cmd)
        self.device.hci.SendCommandWithComplete(hci_facade.Command(payload=bytes(command.Serialize())))

    def send_command_with_status(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.device.hci.EnqueueCommandWithStatus(cmd)
        self.device.hci.SendCommandWithStatus(hci_facade.Command(payload=bytes(command.Serialize())))

    def enable_inquiry_and_page_scan(self):
        self.send_command_with_complete(
+6 −4
Original line number Diff line number Diff line
@@ -6,13 +6,15 @@ import "google/protobuf/empty.proto";

service HciHalFacade {
  rpc SendCommand(Command) returns (google.protobuf.Empty) {}
  rpc SendAcl(AclPacket) returns (google.protobuf.Empty) {}
  rpc SendSco(ScoPacket) returns (google.protobuf.Empty) {}
  rpc SendIso(IsoPacket) returns (google.protobuf.Empty) {}

  rpc StreamEvents(google.protobuf.Empty) returns (stream Event) {}

  rpc SendAcl(AclPacket) returns (google.protobuf.Empty) {}
  rpc StreamAcl(google.protobuf.Empty) returns (stream AclPacket) {}

  rpc SendSco(ScoPacket) returns (google.protobuf.Empty) {}
  rpc StreamSco(google.protobuf.Empty) returns (stream ScoPacket) {}

  rpc SendIso(IsoPacket) returns (google.protobuf.Empty) {}
  rpc StreamIso(google.protobuf.Empty) returns (stream IsoPacket) {}
}

+4 −4
Original line number Diff line number Diff line
@@ -47,9 +47,9 @@ class DirectHciTest(GdBaseTestClass):
        self.cert_hal.send_hci_command(command)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
        acl_msg = hci_facade.AclPacket(
            handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl)
        self.dut.hci.SendAclData(acl_msg)
        self.dut.hci.SendAcl(acl_msg)

    def send_hal_acl_data(self, handle, pb_flag, b_flag, acl):
        lower = handle & 0xff
@@ -72,7 +72,7 @@ class DirectHciTest(GdBaseTestClass):
        self.dut_hci.send_command_with_complete(cmd2loop)

        looped_bytes = bytes(cmd2loop.Serialize())
        assertThat(self.dut_hci.get_event_stream()).emits(lambda packet: looped_bytes in packet.event)
        assertThat(self.dut_hci.get_event_stream()).emits(lambda packet: looped_bytes in packet.payload)

    def test_inquiry_from_dut(self):
        self.dut_hci.register_for_events(hci_packets.EventCode.INQUIRY_RESULT)
@@ -148,7 +148,7 @@ class DirectHciTest(GdBaseTestClass):
        self.send_hal_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))

        assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.event)
        assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)

        self.send_hal_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))
Loading