Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0f35d5cd authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "L2CAP: Use new event stream model"

parents 8e26cacd a081bada
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -79,5 +79,3 @@ class GdCertDevice(GdDeviceBase):
        self.hci.disconnection_stream = EventStream(self.hci.FetchDisconnection)
        self.hci.connection_failed_stream = EventStream(self.hci.FetchConnectionFailed)
        self.hci.acl_stream = EventStream(self.hci.FetchAclData)
        self.l2cap.packet_stream = EventStream(self.l2cap.FetchL2capData)
        self.l2cap.connection_complete_stream = EventStream(self.l2cap.FetchConnectionComplete)
+109 −18
Original line number Diff line number Diff line
@@ -7,15 +7,23 @@ import "facade/common.proto";

service L2capModuleCert {
  rpc SendL2capPacket(L2capPacket) returns (google.protobuf.Empty) {}
  rpc FetchL2capData(facade.EventStreamRequest) returns (stream L2capPacket) {}
  rpc FetchConnectionComplete(facade.EventStreamRequest) returns (stream ConnectionCompleteEvent) {}
  rpc SetOnIncomingConnectionRequest(SetOnIncomingConnectionRequestRequest)
      returns (SetOnIncomingConnectionRequestResponse) {}

  rpc SetupLink(SetupLinkRequest) returns (SetupLinkResponse) {}
  rpc DisconnectLink(DisconnectLinkRequest) returns (google.protobuf.Empty) {}

  rpc SendConnectionRequest(ConnectionRequest) returns (google.protobuf.Empty) {}
  rpc SendConnectionResponse(ConnectionResponse) returns (SendConnectionResponseResult) {}

  rpc SendConfigurationRequest(ConfigurationRequest) returns (SendConfigurationRequestResult) {}
  rpc SendConfigurationResponse(ConfigurationResponse) returns (SendConfigurationResponseResult) {}

  rpc SendDisconnectionRequest(DisconnectionRequest) returns (google.protobuf.Empty) {}
  rpc FetchOpenedChannels(FetchOpenedChannelsRequest) returns (FetchOpenedChannelsResponse) {}
  rpc SendDisconnectionResponse(DisconnectionResponse) returns (SendDisconnectionResponseResult) {}

  rpc SendInformationRequest(InformationRequest) returns (SendInformationRequestResult) {}
  rpc SendInformationResponse(InformationResponse) returns (SendInformationResponseResult) {}

  rpc FetchL2capLog(FetchL2capLogRequest) returns (stream FetchL2capLogResponse) {}
}

message L2capPacket {
@@ -24,41 +32,124 @@ message L2capPacket {
  bytes payload = 3;
}

message ConnectionCompleteEvent {
message DisconnectLinkRequest {
  facade.BluetoothAddress remote = 1;
}

message SetOnIncomingConnectionRequestRequest {
  bool accept = 1;
message SetupLinkRequest {
  facade.BluetoothAddress remote = 1;
}

message SetOnIncomingConnectionRequestResponse {}
message SetupLinkResponse {}

message DisconnectLinkRequest {
message ConnectionRequest {
  facade.BluetoothAddress remote = 1;
  uint32 psm = 2;
  uint32 scid = 3;
  uint32 signal_id = 4;
}

message ConnectionRequest {
message ConnectionResponse {
  facade.BluetoothAddress remote = 1;
  uint32 psm = 2;
  uint32 dcid = 2;
  uint32 scid = 3;
  uint32 signal_id = 4;
}

message SendConnectionResponseResult {}

message ConfigurationRequest {
  uint32 scid = 1;
  uint32 dcid = 1;
  uint32 signal_id = 2;
  repeated string configuration = 3;
}

message SendConfigurationRequestResult {}

message ConfigurationResponse {
  uint32 scid = 1;
  uint32 signal_id = 2;
  repeated string configuration = 3;
}

message SendConfigurationResponseResult {}

message DisconnectionRequest {
  facade.BluetoothAddress remote = 1;
  uint32 dcid = 2;
  uint32 scid = 3;
  uint32 signal_id = 4;
}

message FetchOpenedChannelsRequest {}
message DisconnectionResponse {
  facade.BluetoothAddress remote = 1;
  uint32 dcid = 2;
  uint32 scid = 3;
  uint32 signal_id = 4;
}

message SendDisconnectionResponseResult {}

enum InformationRequestType {
  CONNECTIONLESS_MTU = 0;
  EXTENDED_FEATURES = 1;
  FIXED_CHANNELS = 2;
}

message InformationRequest {
  InformationRequestType type = 1;
  uint32 signal_id = 4;
}

message FetchOpenedChannelsResponse {
  repeated uint32 scid = 1;
  repeated uint32 dcid = 2;
message SendInformationRequestResult {}

message InformationResponse {
  InformationRequestType type = 1;
  uint32 data = 2;
  uint32 signal_id = 3;
}

message SendInformationResponseResult {}

message FetchL2capLogRequest {}

message CommandReject {
  uint32 signal_id = 1;
  uint32 reason = 2;
}

message EchoRequest {
  uint32 signal_id = 1;
  string data = 2;
}
message EchoResponse {
  uint32 signal_id = 1;
  string data = 2;
}

message LinkUp {
  facade.BluetoothAddress remote = 1;
}

message LinkDown {
  facade.BluetoothAddress remote = 1;
}

message FetchL2capLogResponse {
  oneof response {
    L2capPacket data_packet = 1;
    CommandReject command_reject = 2;
    ConnectionRequest connection_request = 3;
    ConnectionResponse connection_response = 4;
    ConfigurationRequest configuration_request = 5;
    ConfigurationResponse configuration_response = 6;
    DisconnectionRequest disconnection_request = 7;
    DisconnectionResponse disconnection_response = 8;
    EchoRequest echo_request = 9;
    EchoResponse echo_response = 10;
    InformationRequest information_request = 11;
    InformationResponse information_response = 12;
    LinkUp link_up = 20;
    LinkDown link_down = 21;
  }
}
+231 −96

File changed.

Preview size limit exceeded, changes collapsed.

+220 −116
Original line number Diff line number Diff line
@@ -31,6 +31,49 @@ import time

ASYNC_OP_TIME_SECONDS = 1  # TODO: Use events to synchronize events instead

class EventHandler:
    def __init__(self):
        self._handler_map = {}

    def on(self, matcher, func):
        self._handler_map[matcher] = func

    def execute(self, grpc_stream):
        for result in grpc_stream:
            for matcher, func in self._handler_map.items():
                if matcher(result):
                    func(result)

def is_connection_request(log):
    return log.HasField("connection_request")

def is_connection_response(log):
    return log.HasField("connection_response")

def is_configuration_request(log):
    return log.HasField("configuration_request")

def is_configuration_response(log):
    return log.HasField("configuration_response")

def is_disconnection_request(log):
    return log.HasField("disconnection_request")

def is_disconnection_response(log):
    return log.HasField("disconnection_response")

def is_echo_response(log):
    return log.HasField("echo_response")

def is_information_request(log):
    return log.HasField("information_request")

def is_information_response(log):
    return log.HasField("information_response")

def is_command_reject(log):
    return log.HasField("command_reject")

class SimpleL2capTest(GdBaseTestClass):
    def setup_test(self):
        self.device_under_test = self.gd_devices[0]
@@ -59,6 +102,53 @@ class SimpleL2capTest(GdBaseTestClass):
        self.cert_address = common_pb2.BluetoothAddress(
            address=self.cert_device.address)

        log_event_handler = EventHandler()
        self.next_scid = 0x40
        self.scid_dcid_map = {}
        def handle_connection_request(log):
            log = log.connection_request
            self.cert_device.l2cap.SendConnectionResponse(l2cap_cert_pb2.ConnectionResponse(dcid=self.next_scid,scid=log.scid,
                                                                                            signal_id=log.signal_id))
            self.scid_dcid_map[self.next_scid] = log.scid
            self.next_scid += 1
            self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(dcid=log.scid,
                                                                                            signal_id=log.signal_id+1))
        log_event_handler.on(is_connection_request, handle_connection_request)

        def handle_connection_response(log):
            log = log.connection_response
            self.scid_dcid_map[log.scid] = log.dcid
            self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(dcid=log.dcid,
                                                                                              signal_id=log.signal_id+1))
        log_event_handler.on(is_connection_response, handle_connection_response)

        def handle_configuration_request(log):
            log = log.configuration_request
            if log.dcid not in self.scid_dcid_map:
                return
            dcid = self.scid_dcid_map[log.dcid]
            self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse(scid=dcid,
                                                                                            signal_id=log.signal_id))
        log_event_handler.on(is_configuration_request, handle_configuration_request)

        def handle_disconnection_request(log):
            log = log.disconnection_request
            self.cert_device.l2cap.SendDisconnectionResponse(l2cap_cert_pb2.DisconnectionResponse(dcid=log.dcid,scid=log.scid,
                                                                                            signal_id=log.signal_id))
        log_event_handler.on(is_disconnection_request, handle_disconnection_request)

        def handle_information_request(log):
            log = log.information_request
            self.cert_device.l2cap.SendInformationResponse(l2cap_cert_pb2.InformationResponse(type=log.type,
                                                                                      signal_id=log.signal_id))
        log_event_handler.on(is_information_request, handle_information_request)

        self.event_dump = []
        def dump_log(log):
            self.event_dump.append(log)
        log_event_handler.on(lambda _: True, dump_log)
        self.event_handler = log_event_handler

    def teardown_test(self):
        self.device_under_test.rootservice.StopStack(
            facade_rootservice_pb2.StopStackRequest()
@@ -67,98 +157,103 @@ class SimpleL2capTest(GdBaseTestClass):
            cert_rootservice_pb2.StopStackRequest()
        )

    def test_connect_and_send_data(self):
        self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x01))
        dut_packet_stream = self.device_under_test.l2cap.packet_stream
        cert_packet_stream = self.cert_device.l2cap.packet_stream
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        dut_connection_stream = self.device_under_test.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        dut_connection_stream.subscribe()
        self.device_under_test.l2cap.Connect(self.cert_address)
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        dut_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.cert_address
        )
        dut_connection_stream.unsubscribe()
    def _setup_link(self):
        self.cert_device.l2cap.SetupLink(l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address))
        link_up_handled = []
        def handle_link_up(log):
            log = log.link_up
            link_up_handled.append(log.remote)
        self.event_handler.on(lambda log : log.HasField("link_up"), handle_link_up)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert self.dut_address in link_up_handled

        self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=0x101, psm=1))
        time.sleep(ASYNC_OP_TIME_SECONDS)
        open_channels = self.cert_device.l2cap.FetchOpenedChannels(l2cap_cert_pb2.FetchOpenedChannelsRequest())
        cid = open_channels.dcid[0]
        self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(scid=cid))
        time.sleep(ASYNC_OP_TIME_SECONDS)
    def _open_channel(self, scid=0x0101, psm=0x01):
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))

        dut_packet_stream.subscribe()
        cert_packet_stream.subscribe()
        configuration_response_handled = []
        def handle_configuration_response(log):
            log = log.configuration_response
            configuration_response_handled.append(log.scid)
        self.event_handler.on(is_configuration_response, handle_configuration_response)
        self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert scid in configuration_response_handled

        self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"abc"))
        dut_packet_stream.assert_event_occurs(
            lambda packet: b"abc" in packet.payload
        )
    def test_connect(self):
        self._setup_link()
        self._open_channel(scid=0x0101)

    def test_connect_and_send_data(self):
        self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x01))
        self._setup_link()
        scid = 0x0101
        self._open_channel(scid=scid)
        self.device_under_test.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
        cert_packet_stream.assert_event_occurs(
            lambda packet: b"123" in packet.payload
        )

        self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=cid, payload=b"123"))
        dut_packet_stream.assert_event_occurs(
            lambda packet: b"123" in packet.payload
        )
        data_received = []
        event_handler = EventHandler()
        def on_data_received(log):
            log = log.data_packet
            data_received.append((log.channel, log.payload))
        event_handler.on(lambda log : log.HasField("data_packet"), on_data_received)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        event_handler.execute(logs)
        assert (2, b"123") in data_received

        self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=1, payload=b'abc'))
        cert_packet_stream.assert_event_occurs(
            lambda packet: b"abc" in packet.payload
        )

        self.cert_device.l2cap.SendDisconnectionRequest(l2cap_cert_pb2.DisconnectionRequest(dcid=0x40, scid=101))
        time.sleep(ASYNC_OP_TIME_SECONDS)
        dut_packet_stream.unsubscribe()
        cert_packet_stream.unsubscribe()
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        event_handler.execute(logs)
        assert (scid, b"abc") in data_received

    def test_open_two_channels(self):
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=0x01))
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=0x03))
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        time.sleep(ASYNC_OP_TIME_SECONDS)
        open_channels = self.cert_device.l2cap.FetchOpenedChannels(l2cap_cert_pb2.FetchOpenedChannelsRequest())
        assert len(open_channels.dcid) == 2
        self._setup_link()
        self._open_channel(scid=0x0101, psm=0x1)
        self._open_channel(scid=0x0102, psm=0x3)

    def test_accept_disconnect(self):
        """
        L2CAP/COS/CED/BV-07-C
        """
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=0x01))
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        self.device_under_test.l2cap.Connect(self.cert_address)
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        time.sleep(ASYNC_OP_TIME_SECONDS)
        cert_packet_stream = self.cert_device.l2cap.packet_stream
        cert_packet_stream.subscribe()
        open_channels = self.cert_device.l2cap.FetchOpenedChannels(l2cap_cert_pb2.FetchOpenedChannelsRequest())
        cid = open_channels.dcid[0]
        disconnection_request_packet = b"\x06\x01\x04\x00\x40\x00\x40\x01"
        disconnection_response_packet = b"\x07\x01\x04\x00\x40\x00\x40\x01"
        #TODO(b/143374372): Instead of hardcoding this, use packet builder
        self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=disconnection_request_packet))
        cert_packet_stream.assert_event_occurs(
            lambda packet: disconnection_response_packet in packet.payload
        )
        cert_packet_stream.unsubscribe()
        time.sleep(ASYNC_OP_TIME_SECONDS)  # TODO(b/144186649): Remove this line
        self._setup_link()
        scid=0x0101
        self._open_channel(scid=scid, psm=0x1)
        dcid = self.scid_dcid_map[scid]
        disconnection_response_handled = []
        def handle_disconnection_response(log):
            log = log.disconnection_response
            disconnection_response_handled.append((log.scid, log.dcid))
        self.event_handler.on(is_disconnection_response, handle_disconnection_response)
        self.cert_device.l2cap.SendDisconnectionRequest(l2cap_cert_pb2.DisconnectionRequest(scid=scid, dcid=dcid, signal_id=2))
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert (scid, dcid) in disconnection_response_handled

    def test_disconnect_on_timeout(self):
        """
        L2CAP/COS/CED/BV-08-C
        """
        self._setup_link()
        scid = 0x0101
        psm = 1
        self._open_channel(scid=0x0101, psm=0x1)

        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))

        # Don't send configuration response back
        self.event_handler.on(is_configuration_request, lambda _: True)
        self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        time.sleep(3)
        def handle_configuration_response(log):
            # DUT should not send configuration response due to timeout
            assert False
        self.event_handler.on(is_configuration_response, handle_configuration_response)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)

    def test_basic_operation_request_connection(self):
        """
@@ -166,58 +261,67 @@ class SimpleL2capTest(GdBaseTestClass):
        Verify that the IUT is able to request the connection establishment for an L2CAP data channel and
        initiate the configuration procedure.
        """
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=0x01))
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        psm = 1
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=psm))
        connection_request = []
        def handle_connection_request(log):
            log = log.connection_request
            connection_request.append(log.psm)
        self.event_handler.on(is_connection_request, handle_connection_request)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert psm in connection_request

    def test_respond_to_echo_request(self):
        """
        L2CAP/COS/ECH/BV-01-C [Respond to Echo Request]
        Verify that the IUT responds to an echo request.
        """
        self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        self.device_under_test.l2cap.Connect(self.cert_address)
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        cert_packet_stream = self.cert_device.l2cap.packet_stream
        cert_packet_stream.subscribe()
        self._setup_link()
        # TODO: Replace with constructed packets when PDL is available
        echo_request_packet = b"\x08\x01\x00\x00"
        echo_response_packet = b"\x09\x01\x00\x00"
        self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=echo_request_packet))
        cert_packet_stream.assert_event_occurs(
            lambda packet: echo_response_packet in packet.payload
        )
        cert_packet_stream.unsubscribe()
        time.sleep(ASYNC_OP_TIME_SECONDS)  # TODO(b/144186649): Remove this line
        echo_response = []
        def handle_echo_response(log):
            log = log.echo_response
            echo_response.append(log.signal_id)
        self.event_handler.on(is_echo_response, handle_echo_response)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert 0x01 in echo_response

    def test_reject_unknown_command(self):
        """
        L2CAP/COS/CED/BI-01-C
        """
        cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
        cert_connection_stream.subscribe()
        self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
        self.device_under_test.l2cap.Connect(self.cert_address)
        cert_connection_stream.assert_event_occurs(
            lambda device: device.remote == self.dut_address
        )
        cert_connection_stream.unsubscribe()
        cert_packet_stream = self.cert_device.l2cap.packet_stream
        cert_packet_stream.subscribe()
        self._setup_link()
        # TODO: Replace with constructed packets when PDL is available
        invalid_command_packet = b"\xff\x01\x00\x00"
        self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=invalid_command_packet))
        command_reject_packet = b"\x01\x01\x02\x00\x00\x00"
        cert_packet_stream.assert_event_occurs(
            lambda packet: command_reject_packet in packet.payload
        )
        cert_packet_stream.unsubscribe()
        command_reject = []
        def handle_command_reject(log):
            log = log.command_reject
            command_reject.append(log.signal_id)
        self.event_handler.on(is_command_reject, handle_command_reject)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert 0x01 in command_reject

        time.sleep(ASYNC_OP_TIME_SECONDS)  # TODO(b/144186649): Remove this line
    def test_query_for_1_2_features(self):
        """
        L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
        """
        self._setup_link()
        signal_id = 3
        self.cert_device.l2cap.SendInformationRequest(
            l2cap_cert_pb2.InformationRequest(
                type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS, signal_id=signal_id))
        info_response = []
        def handle_info_response(log):
            log = log.information_response
            info_response.append((log.signal_id, log.type))
        self.event_handler.on(is_information_response, handle_info_response)
        logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest())
        self.event_handler.execute(logs)
        assert (signal_id, l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS) in info_response
+16 −1
Original line number Diff line number Diff line
@@ -454,7 +454,22 @@ void ClassicSignallingManager::send_connection_response(SignalId signal_id, Cid

void ClassicSignallingManager::on_command_timeout() {
  LOG_WARN("Response time out");
  link_->OnAclDisconnected(hci::ErrorCode::SUCCESS);
  if (pending_commands_.empty()) {
    LOG_ERROR("No pending command");
    return;
  }

  auto last_sent_command = std::move(pending_commands_.front());
  pending_commands_.pop();
  switch (last_sent_command.command_code_) {
    case CommandCode::CONFIGURATION_REQUEST: {
      SendDisconnectionRequest(last_sent_command.source_cid_, last_sent_command.destination_cid_);
      break;
    }
    default:
      break;
  }
  handle_send_next_command();
}

void ClassicSignallingManager::handle_send_next_command() {