Loading system/gd/cert/matchers.py +13 −0 Original line number Diff line number Diff line Loading @@ -83,6 +83,10 @@ class L2capMatchers(object): def LeDisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid) @staticmethod def LeFlowControlCredit(cid): return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid) @staticmethod def SFrame(req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) Loading Loading @@ -263,6 +267,15 @@ class L2capMatchers(object): return request.GetSourceCid() == scid and request.GetDestinationCid( ) == dcid @staticmethod def _is_matching_le_flow_control_credit(packet, cid): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT) if frame is None: return False request = l2cap_packets.LeFlowControlCreditView(frame) return request.GetCid() == cid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( Loading system/gd/cert/py_l2cap.py +8 −0 Original line number Diff line number Diff line Loading @@ -74,6 +74,14 @@ class PyLeL2capChannel(IEventStream): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) def close_channel(self): self._device.l2cap_le.CloseDynamicChannel( l2cap_le_facade_pb2.CloseDynamicChannelRequest( remote=common.BluetoothAddressWithType( address=common.BluetoothAddress( address=b"22:33:ff:ff:11:00")), psm=self._psm)) class CreditBasedConnectionResponseFutureWrapper(object): """ Loading system/gd/l2cap/le/cert/cert_le_l2cap.py +46 −15 Original line number Diff line number Diff line Loading @@ -26,11 +26,19 @@ from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures from mobly import asserts class CertLeL2capChannel(IEventStream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0): self._device = device self._scid = scid self._dcid = dcid Loading @@ -39,6 +47,7 @@ class CertLeL2capChannel(IEventStream): self._control_channel = control_channel self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) self._credits_left = initial_credits def get_event_queue(self): return self._our_acl_view.get_event_queue() Loading @@ -46,16 +55,18 @@ class CertLeL2capChannel(IEventStream): def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def send_first_le_i_frame(self, sdu_size, packet): frame = l2cap_packets.FirstLeInformationFrameBuilder( self._dcid, sdu_size, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) self._control_channel.send( l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid)) assertThat(self._control_channel).emits( Loading @@ -70,6 +81,9 @@ class CertLeL2capChannel(IEventStream): l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits)) def credits_left(self): return self._credits_left class CertLeL2cap(Closable): Loading @@ -83,9 +97,11 @@ class CertLeL2cap(Closable): self._on_disconnection_request_default, LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit, } self.scid_to_dcid = {} self._cid_to_cert_channels = {} def close(self): self._le_acl_manager.close() Loading Loading @@ -128,10 +144,13 @@ class CertLeL2cap(Closable): response = L2capCaptures.CreditBasedConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertLeL2capChannel(self._device, scid, channel = CertLeL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl, self.control_channel) self.control_channel, response.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def verify_and_respond_open_channel_from_remote( self, psm=0x33, Loading @@ -140,9 +159,12 @@ class CertLeL2cap(Closable): assertThat(self.control_channel).emits(request) (scid, dcid) = self._respond_connection_request_default( request.get(), result) return CertLeL2capChannel(self._device, scid, dcid, channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, self.control_channel) self.control_channel, request.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def verify_and_reject_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.CreditBasedConnectionRequest(psm) Loading @@ -151,6 +173,10 @@ class CertLeL2cap(Closable): reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) self.control_channel.send(reject) def verify_le_flow_control_credit(self, channel): assertThat(self.control_channel).emits( L2capMatchers.LeFlowControlCredit(channel._dcid)) def _respond_connection_request_default( self, request, result=LeCreditBasedConnectionResponseResult.SUCCESS): Loading @@ -167,10 +193,6 @@ class CertLeL2cap(Closable): self.control_channel.send(response) return (our_scid, our_dcid) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) def get_control_channel(self): return self.control_channel Loading @@ -190,6 +212,15 @@ class CertLeL2cap(Closable): disconnection_response = l2cap_packets.LeDisconnectionResponseView( request) def _on_credit(self, l2cap_le_control_view): credit_view = l2cap_packets.LeFlowControlCreditView( l2cap_le_control_view) cid = credit_view.GetCid() if cid not in self._cid_to_cert_channels: return self._cid_to_cert_channels[ cid]._credits_left += credit_view.GetCredits() def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( Loading system/gd/l2cap/le/cert/le_l2cap_test.py +101 −29 Original line number Diff line number Diff line Loading @@ -305,6 +305,58 @@ class LeL2capTest(GdBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_command_reject_reserved_pdu_codes(self): """ L2CAP/LE/REJ/BI-02-C """ self._setup_link_from_cert() self.cert_l2cap.get_control_channel().send( l2cap_packets.MoveChannelRequestBuilder(2, 0, 0)) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_le_credit_based_connection_request_legacy_peer(self): """ L2CAP/LE/CFC/BV-01-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) def test_le_credit_based_connection_request_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_on_an_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-04-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) def test_credit_based_connection_request_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-05-C Loading Loading @@ -339,47 +391,75 @@ class LeL2capTest(GdBaseTestClass): L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_le_credit_based_connection_request_legacy_peer(self): def test_credit_exchange_sending_credits(self): """ L2CAP/LE/CFC/BV-01-C L2CAP/LE/CFC/BV-07-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) (dut_channel, cert_channel) = self._open_channel_from_cert() credits = cert_channel.credits_left() # Note: DUT only needs to send credit when ALL credits are consumed. # Here we enforce that DUT sends credit after receiving 3 packets, to # test without sending too many packets (may take too long). # This behavior is not expected for all Bluetooth stacks. for _ in range(min(credits + 1, 3)): cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) self.cert_l2cap.verify_le_flow_control_credit(cert_channel) def test_le_credit_based_connection_request_on_supported_le_psm(self): def test_disconnection_request(self): """ L2CAP/LE/CFC/BV-02-C L2CAP/LE/CFC/BV-08-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.close_channel() cert_channel.verify_disconnect_request() def test_credit_based_connection_response_on_supported_le_psm(self): def test_disconnection_response(self): """ L2CAP/LE/CFC/BV-03-C L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) cert_channel.disconnect_and_verify() def test_credit_based_connection_request_on_an_unsupported_le_psm(self): def test_security_insufficient_authentication_initiator(self): """ L2CAP/LE/CFC/BV-04-C L2CAP/LE/CFC/BV-10-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) result=LeCreditBasedConnectionResponseResult. INSUFFICIENT_AUTHENTICATION) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION) def test_security_insufficient_authorization_initiator(self): """ L2CAP/LE/CFC/BV-12-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult. INSUFFICIENT_AUTHORIZATION) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION) def test_request_refused_due_to_invalid_source_cid_initiator(self): """ L2CAP/LE/CFC/BV-18-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) def test_request_refused_due_to_unacceptable_parameters_initiator(self): """ Loading @@ -402,11 +482,3 @@ class LeL2capTest(GdBaseTestClass): (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_credits(65535) cert_channel.verify_disconnect_request() def test_disconnection_response(self): """ L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.disconnect_and_verify() system/gd/l2cap/le/facade.cc +6 −0 Original line number Diff line number Diff line Loading @@ -67,6 +67,12 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { if (service_helper->second->channel_ == nullptr) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open"); } auto address = service_helper->second->channel_->GetDevice().GetAddress(); hci::Address peer_address; ASSERT(hci::Address::FromString(request->remote().address().address(), peer_address)); if (address != peer_address) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Remote address doesn't match"); } service_helper->second->channel_->Close(); return ::grpc::Status::OK; } Loading Loading
system/gd/cert/matchers.py +13 −0 Original line number Diff line number Diff line Loading @@ -83,6 +83,10 @@ class L2capMatchers(object): def LeDisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid) @staticmethod def LeFlowControlCredit(cid): return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid) @staticmethod def SFrame(req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) Loading Loading @@ -263,6 +267,15 @@ class L2capMatchers(object): return request.GetSourceCid() == scid and request.GetDestinationCid( ) == dcid @staticmethod def _is_matching_le_flow_control_credit(packet, cid): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT) if frame is None: return False request = l2cap_packets.LeFlowControlCreditView(frame) return request.GetCid() == cid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( Loading
system/gd/cert/py_l2cap.py +8 −0 Original line number Diff line number Diff line Loading @@ -74,6 +74,14 @@ class PyLeL2capChannel(IEventStream): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) def close_channel(self): self._device.l2cap_le.CloseDynamicChannel( l2cap_le_facade_pb2.CloseDynamicChannelRequest( remote=common.BluetoothAddressWithType( address=common.BluetoothAddress( address=b"22:33:ff:ff:11:00")), psm=self._psm)) class CreditBasedConnectionResponseFutureWrapper(object): """ Loading
system/gd/l2cap/le/cert/cert_le_l2cap.py +46 −15 Original line number Diff line number Diff line Loading @@ -26,11 +26,19 @@ from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures from mobly import asserts class CertLeL2capChannel(IEventStream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0): self._device = device self._scid = scid self._dcid = dcid Loading @@ -39,6 +47,7 @@ class CertLeL2capChannel(IEventStream): self._control_channel = control_channel self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) self._credits_left = initial_credits def get_event_queue(self): return self._our_acl_view.get_event_queue() Loading @@ -46,16 +55,18 @@ class CertLeL2capChannel(IEventStream): def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def send_first_le_i_frame(self, sdu_size, packet): frame = l2cap_packets.FirstLeInformationFrameBuilder( self._dcid, sdu_size, packet) self._acl.send(frame.Serialize()) self._credits_left -= 1 def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) self._control_channel.send( l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid)) assertThat(self._control_channel).emits( Loading @@ -70,6 +81,9 @@ class CertLeL2capChannel(IEventStream): l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits)) def credits_left(self): return self._credits_left class CertLeL2cap(Closable): Loading @@ -83,9 +97,11 @@ class CertLeL2cap(Closable): self._on_disconnection_request_default, LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit, } self.scid_to_dcid = {} self._cid_to_cert_channels = {} def close(self): self._le_acl_manager.close() Loading Loading @@ -128,10 +144,13 @@ class CertLeL2cap(Closable): response = L2capCaptures.CreditBasedConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertLeL2capChannel(self._device, scid, channel = CertLeL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl, self.control_channel) self.control_channel, response.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def verify_and_respond_open_channel_from_remote( self, psm=0x33, Loading @@ -140,9 +159,12 @@ class CertLeL2cap(Closable): assertThat(self.control_channel).emits(request) (scid, dcid) = self._respond_connection_request_default( request.get(), result) return CertLeL2capChannel(self._device, scid, dcid, channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, self.control_channel) self.control_channel, request.get().GetInitialCredits()) self._cid_to_cert_channels[scid] = channel return channel def verify_and_reject_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.CreditBasedConnectionRequest(psm) Loading @@ -151,6 +173,10 @@ class CertLeL2cap(Closable): reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) self.control_channel.send(reject) def verify_le_flow_control_credit(self, channel): assertThat(self.control_channel).emits( L2capMatchers.LeFlowControlCredit(channel._dcid)) def _respond_connection_request_default( self, request, result=LeCreditBasedConnectionResponseResult.SUCCESS): Loading @@ -167,10 +193,6 @@ class CertLeL2cap(Closable): self.control_channel.send(response) return (our_scid, our_dcid) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) def get_control_channel(self): return self.control_channel Loading @@ -190,6 +212,15 @@ class CertLeL2cap(Closable): disconnection_response = l2cap_packets.LeDisconnectionResponseView( request) def _on_credit(self, l2cap_le_control_view): credit_view = l2cap_packets.LeFlowControlCreditView( l2cap_le_control_view) cid = credit_view.GetCid() if cid not in self._cid_to_cert_channels: return self._cid_to_cert_channels[ cid]._credits_left += credit_view.GetCredits() def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( Loading
system/gd/l2cap/le/cert/le_l2cap_test.py +101 −29 Original line number Diff line number Diff line Loading @@ -305,6 +305,58 @@ class LeL2capTest(GdBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_command_reject_reserved_pdu_codes(self): """ L2CAP/LE/REJ/BI-02-C """ self._setup_link_from_cert() self.cert_l2cap.get_control_channel().send( l2cap_packets.MoveChannelRequestBuilder(2, 0, 0)) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_le_credit_based_connection_request_legacy_peer(self): """ L2CAP/LE/CFC/BV-01-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) def test_le_credit_based_connection_request_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_on_an_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-04-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) def test_credit_based_connection_request_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-05-C Loading Loading @@ -339,47 +391,75 @@ class LeL2capTest(GdBaseTestClass): L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_le_credit_based_connection_request_legacy_peer(self): def test_credit_exchange_sending_credits(self): """ L2CAP/LE/CFC/BV-01-C L2CAP/LE/CFC/BV-07-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) (dut_channel, cert_channel) = self._open_channel_from_cert() credits = cert_channel.credits_left() # Note: DUT only needs to send credit when ALL credits are consumed. # Here we enforce that DUT sends credit after receiving 3 packets, to # test without sending too many packets (may take too long). # This behavior is not expected for all Bluetooth stacks. for _ in range(min(credits + 1, 3)): cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) self.cert_l2cap.verify_le_flow_control_credit(cert_channel) def test_le_credit_based_connection_request_on_supported_le_psm(self): def test_disconnection_request(self): """ L2CAP/LE/CFC/BV-02-C L2CAP/LE/CFC/BV-08-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.close_channel() cert_channel.verify_disconnect_request() def test_credit_based_connection_response_on_supported_le_psm(self): def test_disconnection_response(self): """ L2CAP/LE/CFC/BV-03-C L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) cert_channel.disconnect_and_verify() def test_credit_based_connection_request_on_an_unsupported_le_psm(self): def test_security_insufficient_authentication_initiator(self): """ L2CAP/LE/CFC/BV-04-C L2CAP/LE/CFC/BV-10-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) result=LeCreditBasedConnectionResponseResult. INSUFFICIENT_AUTHENTICATION) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION) def test_security_insufficient_authorization_initiator(self): """ L2CAP/LE/CFC/BV-12-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult. INSUFFICIENT_AUTHORIZATION) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION) def test_request_refused_due_to_invalid_source_cid_initiator(self): """ L2CAP/LE/CFC/BV-18-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) def test_request_refused_due_to_unacceptable_parameters_initiator(self): """ Loading @@ -402,11 +482,3 @@ class LeL2capTest(GdBaseTestClass): (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_credits(65535) cert_channel.verify_disconnect_request() def test_disconnection_response(self): """ L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.disconnect_and_verify()
system/gd/l2cap/le/facade.cc +6 −0 Original line number Diff line number Diff line Loading @@ -67,6 +67,12 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { if (service_helper->second->channel_ == nullptr) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open"); } auto address = service_helper->second->channel_->GetDevice().GetAddress(); hci::Address peer_address; ASSERT(hci::Address::FromString(request->remote().address().address(), peer_address)); if (address != peer_address) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Remote address doesn't match"); } service_helper->second->channel_->Close(); return ::grpc::Status::OK; } Loading