Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 080617c5 authored by Henri Chataing's avatar Henri Chataing
Browse files

gdcert: Deprecate l2cap dut and cert implementations

Bug: 329663555
Test: cert/run LeIsoTest
Flag: EXEMPT, test change
Change-Id: I8f6b68e150d3b9b89f7c5cdde8ebd016f7c8ebc5
parent 4e8d9cf2
Loading
Loading
Loading
Loading
+0 −264
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from google.protobuf import empty_pb2 as empty_proto

from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade_pb2
from blueberry.facade.l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType
from blueberry.facade.l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from blueberry.facade.l2cap.le.facade_pb2 import SecurityLevel
from bluetooth_packets_python3 import l2cap_packets
from blueberry.tests.gd.cert.event_stream import FilteringEventStream
from blueberry.tests.gd.cert.event_stream import EventStream, IEventStream
from blueberry.tests.gd.cert.closable import Closable, safeClose
from blueberry.tests.gd.cert.py_hci import PyHci
from blueberry.tests.gd.cert.matchers import HciMatchers
from blueberry.tests.gd.cert.matchers import L2capMatchers
from blueberry.tests.gd.cert.truth import assertThat
import hci_packets as hci


class PyL2capChannel(IEventStream):

    def __init__(self, device, psm, l2cap_stream):
        self._device = device
        self._psm = psm
        self._le_l2cap_stream = l2cap_stream
        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))

    def get_event_queue(self):
        return self._our_le_l2cap_view.get_event_queue()

    def send(self, payload):
        self._device.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))

    def close_channel(self):
        self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm))

    def set_traffic_paused(self, paused):
        self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused))


class _ClassicConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyL2capDynamicChannel object later
    """

    def __init__(self, grpc_response_future, device, psm, l2cap_stream):
        self._grpc_response_future = grpc_response_future
        self._device = device
        self._psm = psm
        self._l2cap_stream = l2cap_stream

    def get_channel(self):
        return PyL2capChannel(self._device, self._psm, self._l2cap_stream)


class PyL2cap(Closable):

    def __init__(self, device, cert_address, has_security=False):
        self._device = device
        self._cert_address = cert_address
        self._hci = PyHci(device)
        self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty()))
        self._security_connection_event_stream = EventStream(
            self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty()))
        if has_security == False:
            self._hci.register_for_events(hci.EventCode.LINK_KEY_REQUEST)

    def close(self):
        safeClose(self._l2cap_stream)
        safeClose(self._security_connection_event_stream)
        safeClose(self._hci)

    def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
        self._device.l2cap.SetDynamicChannel(
            l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode))
        return PyL2capChannel(self._device, psm, self._l2cap_stream)

    def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
        """
        Send open Dynamic channel request to CERT.
        Get a future for connection result, to be used after CERT accepts request
        """
        self.register_dynamic_channel(psm, mode)
        response_future = self._device.l2cap.OpenChannel.future(
            l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode))

        return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream)

    def get_channel_queue_buffer_size(self):
        return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size

    def initiate_connection_for_security(self):
        """
        Establish an ACL for the specific purpose of pairing devices
        """
        self._device.l2cap.InitiateConnectionForSecurity(self._cert_address)

    def get_security_connection_event_stream(self):
        """
        Stream of Link related events.  Events are returned with an address.
        Events map to the LinkSecurityInterfaceListener callbacks
        """
        return self._security_connection_event_stream

    def security_link_hold(self):
        """
        Holds open the ACL indefinitely allowing for the security handshake
        to take place
        """
        self._device.l2cap.SecurityLinkHold(self._cert_address)

    def security_link_ensure_authenticated(self):
        """
        Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED
        """
        self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address)

    def security_link_release(self):
        """
        Releases a Held open ACL allowing for the ACL to time out after the default time
        """
        self._device.l2cap.SecurityLinkRelease(self._cert_address)

    def security_link_disconnect(self):
        """
        Immediately release and disconnect ACL
        """
        self._device.l2cap.SecurityLinkDisconnect(self._cert_address)

    def verify_security_connection(self):
        """
        Verify that we get a connection and a link key request
        """
        assertThat(self.get_security_connection_event_stream()).emits(
            lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED)
        assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest())


class PyLeL2capFixedChannel(IEventStream):

    def __init__(self, device, cid, l2cap_stream):
        self._device = device
        self._cid = cid
        self._le_l2cap_stream = l2cap_stream
        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
                                                       L2capMatchers.PacketPayloadWithMatchingCid(self._cid))

    def get_event_queue(self):
        return self._our_le_l2cap_view.get_event_queue()

    def send(self, payload):
        self._device.l2cap_le.SendFixedChannelPacket(
            l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload))

    def close_channel(self):
        self._device.l2cap_le.SetFixedChannel(
            l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False))


class PyLeL2capDynamicChannel(IEventStream):

    def __init__(self, device, cert_address, psm, l2cap_stream):
        self._device = device
        self._cert_address = cert_address
        self._psm = psm
        self._le_l2cap_stream = l2cap_stream
        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))

    def get_event_queue(self):
        return self._our_le_l2cap_view.get_event_queue()

    def send(self, payload):
        self._device.l2cap_le.SendDynamicChannelPacket(
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))

    def close_channel(self):
        self._device.l2cap_le.CloseDynamicChannel(
            l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm))


class _CreditBasedConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyLeL2capDynamicChannel object later
    """

    def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream):
        self._grpc_response_future = grpc_response_future
        self._device = device
        self._cert_address = cert_address
        self._psm = psm
        self._le_l2cap_stream = le_l2cap_stream

    def get_status(self):
        return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status)

    def get_channel(self):
        assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
        return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream)


class PyLeL2cap(Closable):

    def __init__(self, device):
        self._device = device
        self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))

    def close(self):
        safeClose(self._le_l2cap_stream)

    def enable_fixed_channel(self, cid=4):
        self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True))

    def get_fixed_channel(self, cid=4):
        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)

    def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY):
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level))
        return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream)

    def connect_coc_to_cert(self, cert_address, psm=0x33):
        """
        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
        """
        self.register_coc(cert_address, psm)
        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
            l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address))

        return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm,
                                                           self._le_l2cap_stream)

    def update_connection_parameter(self,
                                    conn_interval_min=0x10,
                                    conn_interval_max=0x10,
                                    conn_latency=0x0a,
                                    supervision_timeout=0x64,
                                    min_ce_length=12,
                                    max_ce_length=12):
        self._device.l2cap_le.SendConnectionParameterUpdate(
            l2cap_le_facade_pb2.ConnectionParameter(conn_interval_min=conn_interval_min,
                                                    conn_interval_max=conn_interval_max,
                                                    conn_latency=conn_latency,
                                                    supervision_timeout=supervision_timeout,
                                                    min_ce_length=min_ce_length,
                                                    max_ce_length=max_ce_length))
+0 −200
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.captures import L2capCaptures
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.event_stream import FilteringEventStream
from blueberry.tests.gd.cert.event_stream import IEventStream
from blueberry.tests.gd.cert.matchers import L2capMatchers
from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
from blueberry.tests.gd.cert.truth import assertThat
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import LeCommandCode
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult


class CertLeL2capChannel(IEventStream):

    def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0):
        self._device = device
        self._scid = scid
        self._dcid = dcid
        self._acl_stream = acl_stream
        self._acl = acl
        self._control_channel = control_channel
        self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid))
        self._credits_left = initial_credits

    def get_event_queue(self):
        return self._our_acl_view.get_event_queue()

    def send(self, packet):
        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
        self._acl.send(frame.Serialize())
        self._credits_left -= 1

    def send_first_le_i_frame(self, sdu_size, packet):
        frame = l2cap_packets.FirstLeInformationFrameBuilder(self._dcid, sdu_size, packet)
        self._acl.send(frame.Serialize())
        self._credits_left -= 1

    def disconnect_and_verify(self):
        assertThat(self._scid).isNotEqualTo(1)
        self._control_channel.send(l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid))

        assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid))

    def verify_disconnect_request(self):
        assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid))

    def send_credits(self, num_credits):
        self._control_channel.send(l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits))

    def credits_left(self):
        return self._credits_left


class CertLeL2cap(Closable):

    def __init__(self, device):
        self._device = device
        self._le_acl_manager = PyLeAclManager(device)
        self._le_acl = None

        self.control_table = {
            LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default,
            LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default,
            LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit,
        }

        self._cid_to_cert_channels = {}

    def close(self):
        self._le_acl_manager.close()
        safeClose(self._le_acl)

    def connect_le_acl(self, remote_addr):
        self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr)
        self.control_channel = CertLeL2capChannel(
            self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None)
        self._get_acl_stream().register_callback(self._handle_control_packet)

    def wait_for_connection(self):
        self._le_acl = self._le_acl_manager.wait_for_connection()
        self.control_channel = CertLeL2capChannel(
            self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None)
        self._get_acl_stream().register_callback(self._handle_control_packet)

    def open_fixed_channel(self, cid=4):
        channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0)
        return channel

    def open_channel(self, signal_id, psm, scid, mtu=1000, mps=100, initial_credit=6):
        self.control_channel.send(
            l2cap_packets.LeCreditBasedConnectionRequestBuilder(signal_id, psm, scid, mtu, mps, initial_credit))

        response = L2capCaptures.CreditBasedConnectionResponse()
        assertThat(self.control_channel).emits(response)
        channel = CertLeL2capChannel(self._device, scid,
                                     response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl,
                                     self.control_channel,
                                     response.get().GetInitialCredits())
        self._cid_to_cert_channels[scid] = channel
        return channel

    def open_channel_with_expected_result(self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS):
        self.control_channel.send(l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, psm, 0x40, 1000, 100, 6))

        response = L2capMatchers.CreditBasedConnectionResponse(result)
        assertThat(self.control_channel).emits(response)

    def verify_and_respond_open_channel_from_remote(self,
                                                    psm=0x33,
                                                    result=LeCreditBasedConnectionResponseResult.SUCCESS,
                                                    our_scid=None):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        (scid, dcid) = self._respond_connection_request_default(request.get(), result, our_scid)
        channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl,
                                     self.control_channel,
                                     request.get().GetInitialCredits())
        self._cid_to_cert_channels[scid] = channel
        return channel

    def verify_and_reject_open_channel_from_remote(self, psm=0x33):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        sid = request.get().GetIdentifier()
        reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid)
        self.control_channel.send(reject)

    def verify_le_flow_control_credit(self, channel):
        assertThat(self.control_channel).emits(L2capMatchers.LeFlowControlCredit(channel._dcid))

    def _respond_connection_request_default(self,
                                            request,
                                            result=LeCreditBasedConnectionResponseResult.SUCCESS,
                                            our_scid=None):
        sid = request.GetIdentifier()
        their_scid = request.GetSourceCid()
        mtu = request.GetMtu()
        mps = request.GetMps()
        initial_credits = request.GetInitialCredits()
        # If our_scid is not specified, we use the same value - their scid as their scid
        if our_scid is None:
            our_scid = their_scid
        our_dcid = their_scid
        response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(sid, our_scid, mtu, mps, initial_credits,
                                                                        result)
        self.control_channel.send(response)
        return (our_scid, our_dcid)

    def get_control_channel(self):
        return self.control_channel

    def _get_acl_stream(self):
        return self._le_acl.acl_stream

    def _on_disconnection_request_default(self, request):
        disconnection_request = l2cap_packets.LeDisconnectionRequestView(request)
        sid = disconnection_request.GetIdentifier()
        scid = disconnection_request.GetSourceCid()
        dcid = disconnection_request.GetDestinationCid()
        response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid)
        self.control_channel.send(response)

    def _on_disconnection_response_default(self, request):
        disconnection_response = l2cap_packets.LeDisconnectionResponseView(request)

    def _on_credit(self, l2cap_le_control_view):
        credit_view = l2cap_packets.LeFlowControlCreditView(l2cap_le_control_view)
        cid = credit_view.GetCid()
        if cid not in self._cid_to_cert_channels:
            return
        self._cid_to_cert_channels[cid]._credits_left += credit_view.GetCredits()

    def _handle_control_packet(self, l2cap_packet):
        packet_bytes = l2cap_packet.payload
        l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
        if l2cap_view.GetChannelId() != 5:
            return
        request = l2cap_packets.LeControlView(l2cap_view.GetPayload())
        fn = self.control_table.get(request.GetCode())
        if fn is not None:
            fn(request)
        return