Loading system/gd/cert/captures.py +12 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,18 @@ class L2capCaptures(object): packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame) @staticmethod def CreditBasedConnectionRequest(psm): return Capture( L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) @staticmethod def _extract_credit_based_connection_request(packet): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) return l2cap_packets.LeCreditBasedConnectionRequestView(frame) @staticmethod def CreditBasedConnectionResponse(scid): return Capture( Loading system/gd/cert/matchers.py +16 −2 Original line number Diff line number Diff line Loading @@ -58,8 +58,8 @@ class L2capMatchers(object): return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) @staticmethod def CreditBasedConnectionRequest(): return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) def CreditBasedConnectionRequest(psm): return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm) @staticmethod def CreditBasedConnectionResponse( Loading Loading @@ -100,6 +100,11 @@ class L2capMatchers(object): def PacketPayloadRawData(payload): return lambda packet: payload in packet.payload # this is a hack - should be removed @staticmethod def PacketPayloadWithMatchingPsm(psm): return lambda packet: None if psm != packet.psm else packet @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) Loading Loading @@ -281,6 +286,15 @@ class L2capMatchers(object): return False return True @staticmethod def _is_matching_credit_based_connection_request(packet, psm): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) if frame is None: return False request = l2cap_packets.LeCreditBasedConnectionRequestView(frame) return request.GetLePsm() == psm @staticmethod def _is_matching_credit_based_connection_response(packet, scid, result): frame = L2capMatchers.le_control_frame_with_code( Loading system/gd/cert/py_l2cap.py +59 −12 Original line number Diff line number Diff line Loading @@ -17,8 +17,13 @@ from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 from bluetooth_packets_python3 import l2cap_packets from cert.event_stream import EventStream from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from cert.event_stream import FilteringEventStream from cert.event_stream import EventStream, IEventStream from cert.closable import Closable, safeClose from cert.truth import assertThat from cert.matchers import L2capMatchers from facade import common_pb2 as common from google.protobuf import empty_pb2 as empty_proto Loading Loading @@ -52,32 +57,74 @@ class PyL2cap(Closable): return PyL2capChannel(self._device, psm) class PyLeL2capChannel(object): class PyLeL2capChannel(IEventStream): def __init__(self, device, psm): def __init__(self, device, psm, l2cap_stream): self._device = device self._psm = psm self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) class CreditBasedConnectionResponseFutureWrapper(object): """ The future object returned when we send a connection request from DUT. Can be used to get connection status and create the corresponding PyLeL2capChannel object later """ def __init__(self, grpc_response_future, device, psm, le_l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._psm = psm self._le_l2cap_stream = le_l2cap_stream def get_status(self): return l2cap_packets.LeCreditBasedConnectionResponseResult( self._grpc_response_future.result().status) def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capChannel(self._device, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): def __init__(self, device): self._device = device self.le_l2cap_stream = EventStream( self._le_l2cap_stream = EventStream( self._device.l2cap_le.FetchL2capData(empty_proto.Empty())) def close(self): safeClose(self.le_l2cap_stream) safeClose(self._le_l2cap_stream) def get_le_l2cap_stream(self): return self.le_l2cap_stream def open_credit_based_flow_control_channel(self, psm=0x33): # todo, I don't understand what SetDynamicChannel means? def register_coc(self, psm=0x33): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) return PyLeL2capChannel(self._device, psm) l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capChannel(self._device, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, psm=0x33): """ Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request """ self.register_coc(psm) # TODO: Update CERT device random address in ACL manager response_future = self._device.l2cap_le.OpenDynamicChannel.future( l2cap_le_facade_pb2.OpenDynamicChannelRequest( psm=psm, remote=common.BluetoothAddressWithType( address=common.BluetoothAddress( address=b"22:33:ff:ff:11:00")))) return CreditBasedConnectionResponseFutureWrapper( response_future, self._device, psm, self._le_l2cap_stream) system/gd/l2cap/le/cert/cert_le_l2cap.py +49 −15 Original line number Diff line number Diff line Loading @@ -20,7 +20,8 @@ from cert.py_le_acl_manager import PyLeAclManager from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import LeCommandCode from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers Loading Loading @@ -78,9 +79,9 @@ class CertLeL2cap(Closable): self._le_acl = None self.control_table = { CommandCode.DISCONNECTION_REQUEST: LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, CommandCode.DISCONNECTION_RESPONSE: LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, } Loading Loading @@ -120,6 +121,40 @@ class CertLeL2cap(Closable): self._get_acl_stream(), self._le_acl, self.control_channel) def verify_and_respond_open_channel_from_remote( self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) (scid, dcid) = self._respond_connection_request_default( request.get(), result) return CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, self.control_channel) def verify_and_reject_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) sid = request.get().GetIdentifier() reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) self.control_channel.send(reject) def _respond_connection_request_default( self, request, result=LeCreditBasedConnectionResponseResult.SUCCESS): sid = request.GetIdentifier() their_scid = request.GetSourceCid() mtu = request.GetMtu() mps = request.GetMps() initial_credits = request.GetInitialCredits() # Here we use the same value - their scid as their scid our_scid = their_scid our_dcid = their_scid response = l2cap_packets.LeCreditBasedConnectionResponseBuilder( sid, our_scid, mtu, mps, initial_credits, result) self.control_channel.send(response) return (our_scid, our_dcid) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) Loading @@ -130,19 +165,18 @@ class CertLeL2cap(Closable): def _get_acl_stream(self): return self._le_acl_manager.get_le_acl_stream() def _on_disconnection_request_default(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( l2cap_control_view) def _on_disconnection_request_default(self, request): disconnection_request = l2cap_packets.LeDisconnectionRequestView( request) sid = disconnection_request.GetIdentifier() scid = disconnection_request.GetSourceCid() dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) self.control_channel.send(disconnection_response) response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid) self.control_channel.send(response) def _on_disconnection_response_default(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( l2cap_control_view) def _on_disconnection_response_default(self, request): disconnection_response = l2cap_packets.LeDisconnectionResponseView( request) def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload Loading @@ -150,8 +184,8 @@ class CertLeL2cap(Closable): bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 5: return l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) fn = self.control_table.get(l2cap_control_view.GetCode()) request = l2cap_packets.LeControlView(l2cap_view.GetPayload()) fn = self.control_table.get(request.GetCode()) if fn is not None: fn(l2cap_control_view) fn(request) return system/gd/l2cap/le/cert/le_l2cap_test.py +95 −28 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from cert.matchers import L2capMatchers from facade import common_pb2 as common from facade import rootservice_pb2 as facade_rootservice from google.protobuf import empty_pb2 as empty_proto from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.le import facade_pb2 as l2cap_facade_pb2 from neighbor.facade import facade_pb2 as neighbor_facade from hci.facade import acl_manager_facade_pb2 as acl_manager_facade from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade Loading Loading @@ -87,7 +87,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): request) self.cert_l2cap.connect_le_acl(bytes(b'0D:05:04:03:02:01')) def _open_unvalidated_channel(self, def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33, Loading @@ -95,18 +95,25 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): mps=100, initial_credit=6): dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm) dut_channel = self.dut_l2cap.register_coc(psm) cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu, mps, initial_credit) return (dut_channel, cert_channel) def _open_channel_from_dut(self, psm=0x33): response_future = self.dut_l2cap.connect_coc_to_cert(psm) cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm) dut_channel = response_future.get_channel() return (dut_channel, cert_channel) def test_segmentation(self): """ L2CAP/COS/CFC/BV-01-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel( (dut_channel, cert_channel) = self._open_channel_from_cert( mtu=1000, mps=102) dut_channel.send(b'hello' * 20 + b'world') # The first LeInformation packet contains 2 bytes of SDU size. Loading @@ -121,7 +128,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel( (dut_channel, cert_channel) = self._open_channel_from_cert( mtu=1000, mps=202) dut_channel.send(b'hello' * 40) assertThat(cert_channel).emits( Loading @@ -132,12 +139,12 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() sdu_size_for_two_sample_packet = 12 cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET) cert_channel.send(SAMPLE_PACKET) assertThat(self.dut_l2cap.le_l2cap_stream).emits( assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2)) def test_data_receiving(self): Loading @@ -145,9 +152,37 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-04-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(self.dut_l2cap.le_l2cap_stream).emits( assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_multiple_channels_with_interleaved_data_streams(self): """ L2CAP/COS/CFC/BV-05-C """ self._setup_link_from_cert() (dut_channel_x, cert_channel_x) = self._open_channel_from_cert( signal_id=1, scid=0x0103, psm=0x33) (dut_channel_y, cert_channel_y) = self._open_channel_from_cert( signal_id=2, scid=0x0105, psm=0x35) (dut_channel_z, cert_channel_z) = self._open_channel_from_cert( signal_id=3, scid=0x0107, psm=0x37) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) # TODO: We should assert two events in order, but it got stuck assertThat(dut_channel_y).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), at_least_times=3) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData( b'\x01\x01\x02\x00\x00\x00')).inOrder() cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_reject_unknown_command_in_le_sigling_channel(self): Loading @@ -162,16 +197,6 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-05-C Loading @@ -192,7 +217,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel(initial_credit=0) cert_channel) = self._open_channel_from_cert(initial_credit=0) for _ in range(4): dut_channel.send(b'hello') cert_channel.send_credits(1) Loading @@ -206,12 +231,54 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_le_credit_based_connection_request_legacy_peer(self): """ L2CAP/LE/CFC/BV-01-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) def test_le_credit_based_connection_request_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_on_an_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-04-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) def test_credit_exchange_exceed_initial_credits(self): """ L2CAP/LE/CFC/BI-01-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_credits(65535) cert_channel.verify_disconnect_request() Loading @@ -220,5 +287,5 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.disconnect_and_verify() Loading
system/gd/cert/captures.py +12 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,18 @@ class L2capCaptures(object): packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame) @staticmethod def CreditBasedConnectionRequest(psm): return Capture( L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) @staticmethod def _extract_credit_based_connection_request(packet): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) return l2cap_packets.LeCreditBasedConnectionRequestView(frame) @staticmethod def CreditBasedConnectionResponse(scid): return Capture( Loading
system/gd/cert/matchers.py +16 −2 Original line number Diff line number Diff line Loading @@ -58,8 +58,8 @@ class L2capMatchers(object): return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) @staticmethod def CreditBasedConnectionRequest(): return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) def CreditBasedConnectionRequest(psm): return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm) @staticmethod def CreditBasedConnectionResponse( Loading Loading @@ -100,6 +100,11 @@ class L2capMatchers(object): def PacketPayloadRawData(payload): return lambda packet: payload in packet.payload # this is a hack - should be removed @staticmethod def PacketPayloadWithMatchingPsm(psm): return lambda packet: None if psm != packet.psm else packet @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) Loading Loading @@ -281,6 +286,15 @@ class L2capMatchers(object): return False return True @staticmethod def _is_matching_credit_based_connection_request(packet, psm): frame = L2capMatchers.le_control_frame_with_code( packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) if frame is None: return False request = l2cap_packets.LeCreditBasedConnectionRequestView(frame) return request.GetLePsm() == psm @staticmethod def _is_matching_credit_based_connection_response(packet, scid, result): frame = L2capMatchers.le_control_frame_with_code( Loading
system/gd/cert/py_l2cap.py +59 −12 Original line number Diff line number Diff line Loading @@ -17,8 +17,13 @@ from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 from bluetooth_packets_python3 import l2cap_packets from cert.event_stream import EventStream from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from cert.event_stream import FilteringEventStream from cert.event_stream import EventStream, IEventStream from cert.closable import Closable, safeClose from cert.truth import assertThat from cert.matchers import L2capMatchers from facade import common_pb2 as common from google.protobuf import empty_pb2 as empty_proto Loading Loading @@ -52,32 +57,74 @@ class PyL2cap(Closable): return PyL2capChannel(self._device, psm) class PyLeL2capChannel(object): class PyLeL2capChannel(IEventStream): def __init__(self, device, psm): def __init__(self, device, psm, l2cap_stream): self._device = device self._psm = psm self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) class CreditBasedConnectionResponseFutureWrapper(object): """ The future object returned when we send a connection request from DUT. Can be used to get connection status and create the corresponding PyLeL2capChannel object later """ def __init__(self, grpc_response_future, device, psm, le_l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._psm = psm self._le_l2cap_stream = le_l2cap_stream def get_status(self): return l2cap_packets.LeCreditBasedConnectionResponseResult( self._grpc_response_future.result().status) def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capChannel(self._device, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): def __init__(self, device): self._device = device self.le_l2cap_stream = EventStream( self._le_l2cap_stream = EventStream( self._device.l2cap_le.FetchL2capData(empty_proto.Empty())) def close(self): safeClose(self.le_l2cap_stream) safeClose(self._le_l2cap_stream) def get_le_l2cap_stream(self): return self.le_l2cap_stream def open_credit_based_flow_control_channel(self, psm=0x33): # todo, I don't understand what SetDynamicChannel means? def register_coc(self, psm=0x33): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) return PyLeL2capChannel(self._device, psm) l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capChannel(self._device, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, psm=0x33): """ Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request """ self.register_coc(psm) # TODO: Update CERT device random address in ACL manager response_future = self._device.l2cap_le.OpenDynamicChannel.future( l2cap_le_facade_pb2.OpenDynamicChannelRequest( psm=psm, remote=common.BluetoothAddressWithType( address=common.BluetoothAddress( address=b"22:33:ff:ff:11:00")))) return CreditBasedConnectionResponseFutureWrapper( response_future, self._device, psm, self._le_l2cap_stream)
system/gd/l2cap/le/cert/cert_le_l2cap.py +49 −15 Original line number Diff line number Diff line Loading @@ -20,7 +20,8 @@ from cert.py_le_acl_manager import PyLeAclManager from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import LeCommandCode from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers Loading Loading @@ -78,9 +79,9 @@ class CertLeL2cap(Closable): self._le_acl = None self.control_table = { CommandCode.DISCONNECTION_REQUEST: LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, CommandCode.DISCONNECTION_RESPONSE: LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, } Loading Loading @@ -120,6 +121,40 @@ class CertLeL2cap(Closable): self._get_acl_stream(), self._le_acl, self.control_channel) def verify_and_respond_open_channel_from_remote( self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) (scid, dcid) = self._respond_connection_request_default( request.get(), result) return CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, self.control_channel) def verify_and_reject_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.CreditBasedConnectionRequest(psm) assertThat(self.control_channel).emits(request) sid = request.get().GetIdentifier() reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) self.control_channel.send(reject) def _respond_connection_request_default( self, request, result=LeCreditBasedConnectionResponseResult.SUCCESS): sid = request.GetIdentifier() their_scid = request.GetSourceCid() mtu = request.GetMtu() mps = request.GetMps() initial_credits = request.GetInitialCredits() # Here we use the same value - their scid as their scid our_scid = their_scid our_dcid = their_scid response = l2cap_packets.LeCreditBasedConnectionResponseBuilder( sid, our_scid, mtu, mps, initial_credits, result) self.control_channel.send(response) return (our_scid, our_dcid) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) Loading @@ -130,19 +165,18 @@ class CertLeL2cap(Closable): def _get_acl_stream(self): return self._le_acl_manager.get_le_acl_stream() def _on_disconnection_request_default(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( l2cap_control_view) def _on_disconnection_request_default(self, request): disconnection_request = l2cap_packets.LeDisconnectionRequestView( request) sid = disconnection_request.GetIdentifier() scid = disconnection_request.GetSourceCid() dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) self.control_channel.send(disconnection_response) response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid) self.control_channel.send(response) def _on_disconnection_response_default(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( l2cap_control_view) def _on_disconnection_response_default(self, request): disconnection_response = l2cap_packets.LeDisconnectionResponseView( request) def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload Loading @@ -150,8 +184,8 @@ class CertLeL2cap(Closable): bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 5: return l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) fn = self.control_table.get(l2cap_control_view.GetCode()) request = l2cap_packets.LeControlView(l2cap_view.GetPayload()) fn = self.control_table.get(request.GetCode()) if fn is not None: fn(l2cap_control_view) fn(request) return
system/gd/l2cap/le/cert/le_l2cap_test.py +95 −28 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from cert.matchers import L2capMatchers from facade import common_pb2 as common from facade import rootservice_pb2 as facade_rootservice from google.protobuf import empty_pb2 as empty_proto from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.le import facade_pb2 as l2cap_facade_pb2 from neighbor.facade import facade_pb2 as neighbor_facade from hci.facade import acl_manager_facade_pb2 as acl_manager_facade from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade Loading Loading @@ -87,7 +87,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): request) self.cert_l2cap.connect_le_acl(bytes(b'0D:05:04:03:02:01')) def _open_unvalidated_channel(self, def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33, Loading @@ -95,18 +95,25 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): mps=100, initial_credit=6): dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm) dut_channel = self.dut_l2cap.register_coc(psm) cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu, mps, initial_credit) return (dut_channel, cert_channel) def _open_channel_from_dut(self, psm=0x33): response_future = self.dut_l2cap.connect_coc_to_cert(psm) cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm) dut_channel = response_future.get_channel() return (dut_channel, cert_channel) def test_segmentation(self): """ L2CAP/COS/CFC/BV-01-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel( (dut_channel, cert_channel) = self._open_channel_from_cert( mtu=1000, mps=102) dut_channel.send(b'hello' * 20 + b'world') # The first LeInformation packet contains 2 bytes of SDU size. Loading @@ -121,7 +128,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel( (dut_channel, cert_channel) = self._open_channel_from_cert( mtu=1000, mps=202) dut_channel.send(b'hello' * 40) assertThat(cert_channel).emits( Loading @@ -132,12 +139,12 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() sdu_size_for_two_sample_packet = 12 cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET) cert_channel.send(SAMPLE_PACKET) assertThat(self.dut_l2cap.le_l2cap_stream).emits( assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2)) def test_data_receiving(self): Loading @@ -145,9 +152,37 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/COS/CFC/BV-04-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(self.dut_l2cap.le_l2cap_stream).emits( assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_multiple_channels_with_interleaved_data_streams(self): """ L2CAP/COS/CFC/BV-05-C """ self._setup_link_from_cert() (dut_channel_x, cert_channel_x) = self._open_channel_from_cert( signal_id=1, scid=0x0103, psm=0x33) (dut_channel_y, cert_channel_y) = self._open_channel_from_cert( signal_id=2, scid=0x0105, psm=0x35) (dut_channel_z, cert_channel_z) = self._open_channel_from_cert( signal_id=3, scid=0x0107, psm=0x37) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) # TODO: We should assert two events in order, but it got stuck assertThat(dut_channel_y).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), at_least_times=3) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData( b'\x01\x01\x02\x00\x00\x00')).inOrder() cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_reject_unknown_command_in_le_sigling_channel(self): Loading @@ -162,16 +197,6 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.LeCommandReject()) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-05-C Loading @@ -192,7 +217,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel(initial_credit=0) cert_channel) = self._open_channel_from_cert(initial_credit=0) for _ in range(4): dut_channel.send(b'hello') cert_channel.send_credits(1) Loading @@ -206,12 +231,54 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_le_credit_based_connection_request_legacy_peer(self): """ L2CAP/LE/CFC/BV-01-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) assertThat(response_future.get_status()).isNotEqualTo( LeCreditBasedConnectionResponseResult.SUCCESS) def test_le_credit_based_connection_request_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-02-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_credit_based_connection_response_on_supported_le_psm(self): """ L2CAP/LE/CFC/BV-03-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() dut_channel.send(b'hello') assertThat(cert_channel).emits( L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) def test_credit_based_connection_request_on_an_unsupported_le_psm(self): """ L2CAP/LE/CFC/BV-04-C """ self._setup_link_from_cert() response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33) self.cert_l2cap.verify_and_respond_open_channel_from_remote( psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) assertThat(response_future.get_status()).isEqualTo( LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) def test_credit_exchange_exceed_initial_credits(self): """ L2CAP/LE/CFC/BI-01-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_credits(65535) cert_channel.verify_disconnect_request() Loading @@ -220,5 +287,5 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass): L2CAP/LE/CFC/BV-09-C """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_unvalidated_channel() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.disconnect_and_verify()