Loading system/gd/hci/acl_manager/assembler.h +5 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,11 @@ struct assembler { void on_incoming_packet(AclPacketView packet) { PacketView<kLittleEndian> payload = packet.GetPayload(); auto payload_size = payload.size(); auto broadcast_flag = packet.GetBroadcastFlag(); if (broadcast_flag == BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST) { LOG_WARN("Dropping broadcast from remote"); return; } auto packet_boundary_flag = packet.GetPacketBoundaryFlag(); if (packet_boundary_flag == PacketBoundaryFlag::FIRST_NON_AUTOMATICALLY_FLUSHABLE) { LOG_ERROR("Controller is not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"); Loading system/gd/hci/acl_manager_test.cc +1 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,7 @@ std::unique_ptr<BasePacketBuilder> NextPayload(uint16_t handle) { std::unique_ptr<AclPacketBuilder> NextAclPacket(uint16_t handle) { PacketBoundaryFlag packet_boundary_flag = PacketBoundaryFlag::FIRST_AUTOMATICALLY_FLUSHABLE; BroadcastFlag broadcast_flag = BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST; BroadcastFlag broadcast_flag = BroadcastFlag::POINT_TO_POINT; return AclPacketBuilder::Create(handle, packet_boundary_flag, broadcast_flag, NextPayload(handle)); } Loading system/gd/hci/cert/acl_manager_test.py +19 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,25 @@ class AclManagerTest(GdBaseTestClass): assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.data) assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_reject_broadcast(self): dut_address = self.dut.hci_controller.GetMacAddressSimple() self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.dut_acl_manager.listen_for_an_incoming_connection() self.cert_hci.initiate_connection(dut_address) with self.dut_acl_manager.complete_incoming_connection() as dut_acl: cert_acl = self.cert_hci.complete_connection() cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST, b'\x26\x00\x07\x00This is a Broadcast from the Cert') assertThat(dut_acl).emitsNone() cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x26\x00\x07\x00This is just SomeAclData from the Cert') assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects_disconnects(self): dut_address = self.dut.hci_controller.GetMacAddressSimple() self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) Loading Loading
system/gd/hci/acl_manager/assembler.h +5 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,11 @@ struct assembler { void on_incoming_packet(AclPacketView packet) { PacketView<kLittleEndian> payload = packet.GetPayload(); auto payload_size = payload.size(); auto broadcast_flag = packet.GetBroadcastFlag(); if (broadcast_flag == BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST) { LOG_WARN("Dropping broadcast from remote"); return; } auto packet_boundary_flag = packet.GetPacketBoundaryFlag(); if (packet_boundary_flag == PacketBoundaryFlag::FIRST_NON_AUTOMATICALLY_FLUSHABLE) { LOG_ERROR("Controller is not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"); Loading
system/gd/hci/acl_manager_test.cc +1 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,7 @@ std::unique_ptr<BasePacketBuilder> NextPayload(uint16_t handle) { std::unique_ptr<AclPacketBuilder> NextAclPacket(uint16_t handle) { PacketBoundaryFlag packet_boundary_flag = PacketBoundaryFlag::FIRST_AUTOMATICALLY_FLUSHABLE; BroadcastFlag broadcast_flag = BroadcastFlag::ACTIVE_PERIPHERAL_BROADCAST; BroadcastFlag broadcast_flag = BroadcastFlag::POINT_TO_POINT; return AclPacketBuilder::Create(handle, packet_boundary_flag, broadcast_flag, NextPayload(handle)); } Loading
system/gd/hci/cert/acl_manager_test.py +19 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,25 @@ class AclManagerTest(GdBaseTestClass): assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.data) assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_reject_broadcast(self): dut_address = self.dut.hci_controller.GetMacAddressSimple() self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.dut_acl_manager.listen_for_an_incoming_connection() self.cert_hci.initiate_connection(dut_address) with self.dut_acl_manager.complete_incoming_connection() as dut_acl: cert_acl = self.cert_hci.complete_connection() cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST, b'\x26\x00\x07\x00This is a Broadcast from the Cert') assertThat(dut_acl).emitsNone() cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x26\x00\x07\x00This is just SomeAclData from the Cert') assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects_disconnects(self): dut_address = self.dut.hci_controller.GetMacAddressSimple() self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) Loading