Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 140a2c22 authored by Myles Watson's avatar Myles Watson
Browse files

gd ACL: Drop broadcasts

Bug: 169327567
Test: gd/cert/run --host AclManagerTest
Tag: #security
Change-Id: I2985440f4e38c4a24fb56dcaac72a857e8b4d4e8
parent ca8f8d83
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -75,6 +75,11 @@ struct assembler {
  void on_incoming_packet(AclPacketView packet) {
    PacketView<kLittleEndian> payload = packet.GetPayload();
    auto payload_size = payload.size();
    auto broadcast_flag = packet.GetBroadcastFlag();
    if (broadcast_flag == BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST) {
      LOG_WARN("Dropping broadcast from remote");
      return;
    }
    auto packet_boundary_flag = packet.GetPacketBoundaryFlag();
    if (packet_boundary_flag == PacketBoundaryFlag::FIRST_NON_AUTOMATICALLY_FLUSHABLE) {
      LOG_ERROR("Controller is not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode");
+1 −1
Original line number Diff line number Diff line
@@ -64,7 +64,7 @@ std::unique_ptr<BasePacketBuilder> NextPayload(uint16_t handle) {

std::unique_ptr<AclPacketBuilder> NextAclPacket(uint16_t handle) {
  PacketBoundaryFlag packet_boundary_flag = PacketBoundaryFlag::FIRST_AUTOMATICALLY_FLUSHABLE;
  BroadcastFlag broadcast_flag = BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST;
  BroadcastFlag broadcast_flag = BroadcastFlag::POINT_TO_POINT;
  return AclPacketBuilder::Create(handle, packet_boundary_flag, broadcast_flag, NextPayload(handle));
}

+19 −0
Original line number Diff line number Diff line
@@ -66,6 +66,25 @@ class AclManagerTest(GdBaseTestClass):
            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.data)
            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)

    def test_reject_broadcast(self):
        dut_address = self.dut.hci_controller.GetMacAddressSimple()
        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))

        self.dut_acl_manager.listen_for_an_incoming_connection()
        self.cert_hci.initiate_connection(dut_address)
        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
            cert_acl = self.cert_hci.complete_connection()

            cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                          hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST,
                          b'\x26\x00\x07\x00This is a Broadcast from the Cert')
            assertThat(dut_acl).emitsNone()

            cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                          hci_packets.BroadcastFlag.POINT_TO_POINT,
                          b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)

    def test_cert_connects_disconnects(self):
        dut_address = self.dut.hci_controller.GetMacAddressSimple()
        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))