Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit af6f9d61 authored by Hansong Zhang's avatar Hansong Zhang Committed by Automerger Merge Worker
Browse files

DualL2capTest am: d7ee66ba

Change-Id: I66b417fdabaffb17d94a5e3e794b24039dca59f3
parents c917c134 d7ee66ba
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -11,3 +11,4 @@ LeAclManagerTest
StackTest
L2capTest
LeL2capTest
DualL2capTest
+8 −6
Original line number Diff line number Diff line
@@ -179,17 +179,19 @@ class CertL2cap(Closable):
                                self._get_acl_stream(), self._acl,
                                self.control_channel, self.fcs_enabled)

    def verify_and_respond_open_channel_from_remote(self, psm=0x33):
    def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None):
        request = L2capCaptures.ConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)

        sid = request.get().GetIdentifier()
        cid = request.get().GetSourceCid()
        dcid = request.get().GetSourceCid()
        if scid is None or scid in self.scid_to_dcid:
            scid = dcid

        self.scid_to_dcid[cid] = cid
        self.scid_to_dcid[scid] = dcid

        connection_response = l2cap_packets.ConnectionResponseBuilder(
            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            l2cap_packets.ConnectionResponseStatus.
            NO_FURTHER_INFORMATION_AVAILABLE)
        self.control_channel.send(connection_response)
@@ -203,10 +205,10 @@ class CertL2cap(Closable):
            config_options.append(self.fcs_option)

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, cid, l2cap_packets.Continuation.END, config_options)
            sid + 1, dcid, l2cap_packets.Continuation.END, config_options)
        self.control_channel.send(config_request)

        channel = CertL2capChannel(self._device, cid, cid,
        channel = CertL2capChannel(self._device, scid, dcid,
                                   self._get_acl_stream(), self._acl,
                                   self.control_channel, self.fcs_enabled)
        return channel
+6 −7
Original line number Diff line number Diff line
@@ -48,13 +48,12 @@ class L2capTest(GdBaseTestClass):
    def setup_test(self):
        super().setup_test()

        self.dut.address = self.dut.hci_controller.GetMacAddressSimple()
        self.cert.address = self.cert.controller_read_only_property.ReadLocalAddress(
            empty_proto.Empty()).address
        self.cert_address = common_pb2.BluetoothAddress(
            address=self.cert.address)
        self.dut_address = self.dut.hci_controller.GetMacAddressSimple()
        cert_address = common_pb2.BluetoothAddress(
            address=self.cert.controller_read_only_property.ReadLocalAddress(
                empty_proto.Empty()).address)

        self.dut_l2cap = PyL2cap(self.dut, self.cert_address)
        self.dut_l2cap = PyL2cap(self.dut, cert_address)
        self.cert_l2cap = CertL2cap(self.cert)

    def teardown_test(self):
@@ -65,7 +64,7 @@ class L2capTest(GdBaseTestClass):
    def _setup_link_from_cert(self):
        self.dut.neighbor.EnablePageScan(
            neighbor_facade.EnableMsg(enabled=True))
        self.cert_l2cap.connect_acl(self.dut.address)
        self.cert_l2cap.connect_acl(self.dut_address)

    def _open_unvalidated_channel(self,
                                  signal_id=1,
+12 −7
Original line number Diff line number Diff line
@@ -159,12 +159,14 @@ class CertLeL2cap(Closable):
        return channel

    def verify_and_respond_open_channel_from_remote(
            self, psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
            self,
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS,
            our_scid=None):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        (scid, dcid) = self._respond_connection_request_default(
            request.get(), result)
            request.get(), result, our_scid)
        channel = CertLeL2capChannel(self._device, scid, dcid,
                                     self._get_acl_stream(), self._le_acl,
                                     self.control_channel,
@@ -184,14 +186,17 @@ class CertLeL2cap(Closable):
            L2capMatchers.LeFlowControlCredit(channel._dcid))

    def _respond_connection_request_default(
            self, request,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
            self,
            request,
            result=LeCreditBasedConnectionResponseResult.SUCCESS,
            our_scid=None):
        sid = request.GetIdentifier()
        their_scid = request.GetSourceCid()
        mtu = request.GetMtu()
        mps = request.GetMps()
        initial_credits = request.GetInitialCredits()
        # Here we use the same value - their scid as their scid
        # If our_scid is not specified, we use the same value - their scid as their scid
        if our_scid is None:
            our_scid = their_scid
        our_dcid = their_scid
        response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(
+197 −0
Original line number Diff line number Diff line
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from cert.gd_base_test import GdBaseTestClass
from cert.truth import assertThat
from cert.py_l2cap import PyLeL2cap, PyL2cap
from cert.matchers import L2capMatchers
from cert.metadata import metadata
from facade import common_pb2 as common
from google.protobuf import empty_pb2 as empty_proto
from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets, l2cap_packets
from l2cap.classic.cert.cert_l2cap import CertL2cap
from l2cap.le.cert.cert_le_l2cap import CertLeL2cap
from neighbor.facade import facade_pb2 as neighbor_facade

# Assemble a sample packet.
SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])


class DualL2capTest(GdBaseTestClass):

    def setup_class(self):
        super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES')

    def setup_test(self):
        super().setup_test()

        self.dut_address = self.dut.hci_controller.GetMacAddressSimple()
        cert_address = common.BluetoothAddress(
            address=self.cert.controller_read_only_property.ReadLocalAddress(
                empty_proto.Empty()).address)

        self.dut_l2cap = PyL2cap(self.dut, cert_address)
        self.cert_l2cap = CertL2cap(self.cert)
        self.dut_le_l2cap = PyLeL2cap(self.dut)
        self.cert_le_l2cap = CertLeL2cap(self.cert)

    def teardown_test(self):
        self.cert_le_l2cap.close()
        self.dut_le_l2cap.close()
        self.cert_l2cap.close()
        self.dut_l2cap.close()
        super().teardown_test()

    def _setup_acl_link_from_cert(self):
        self.dut.neighbor.EnablePageScan(
            neighbor_facade.EnableMsg(enabled=True))
        self.cert_l2cap.connect_acl(self.dut_address)

    def _setup_le_link_from_cert(self):
        # DUT Advertises
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_The_DUT'))
        gap_data = le_advertising_facade.GapDataMsg(
            data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            random_address=common.BluetoothAddress(
                address=bytes(b'0D:05:04:03:02:01')),
            interval_min=512,
            interval_max=768,
            event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            address_type=common.RANDOM_DEVICE_ADDRESS,
            peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
            peer_address=common.BluetoothAddress(
                address=bytes(b'A6:A5:A4:A3:A2:A1')),
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
            ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(
            request)
        self.cert_le_l2cap.connect_le_acl(bytes(b'0D:05:04:03:02:01'))

    def _open_le_coc_from_dut(self, psm=0x33, our_scid=None):
        response_future = self.dut_le_l2cap.connect_coc_to_cert(psm)
        cert_channel = self.cert_le_l2cap.verify_and_respond_open_channel_from_remote(
            psm=psm, our_scid=our_scid)
        dut_channel = response_future.get_channel()
        return (dut_channel, cert_channel)

    def _open_channel_from_dut(self, psm=0x33, our_scid=None):
        dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(psm)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=psm, scid=our_scid)
        dut_channel = dut_channel_future.get_channel()

        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        return (dut_channel, cert_channel)

    def _open_unvalidated_channel(self, signal_id=1, scid=0x0101, psm=0x33):

        dut_channel = self.dut_l2cap.register_dynamic_channel(psm)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid)

        return (dut_channel, cert_channel)

    def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33):
        result = self._open_unvalidated_channel(signal_id, scid, psm)

        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        return result

    def _open_le_coc_from_cert(self,
                               signal_id=1,
                               scid=0x0101,
                               psm=0x35,
                               mtu=1000,
                               mps=100,
                               initial_credit=6):

        dut_channel = self.dut_le_l2cap.register_coc(psm)
        cert_channel = self.cert_le_l2cap.open_channel(signal_id, psm, scid,
                                                       mtu, mps, initial_credit)

        return (dut_channel, cert_channel)

    @metadata(
        pts_test_id="L2CAP/LE/CID/BV-01-C",
        pts_test_name="Receiving DCID over BR/EDR and LE")
    def test_receiving_dcid_over_bredr_and_le(self):
        """
        Test that the L2CAP entity can receive the same DCID in L2CAP connect responses on both the
        BR/EDR and LE links.
        """
        self._setup_acl_link_from_cert()
        # TODO: We should let LE use public address, same as classic link.
        # TODO: Update AclManager::impl::create_le_connection
        self._setup_le_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut(0x33, 0x70)
        (le_dut_channel, le_cert_channel) = self._open_le_coc_from_dut(
            0x35, 0x70)

        dut_channel.send(b'abc')
        assertThat(cert_channel).emits(L2capMatchers.Data(b'abc'))

        le_dut_channel.send(b'hello')
        assertThat(le_cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

        le_cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(le_dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

        cert_channel.disconnect_and_verify()
        le_cert_channel.disconnect_and_verify()

    @metadata(
        pts_test_id="L2CAP/LE/CID/BV-02-C",
        pts_test_name="Receiving SCID over BR/EDR and LE")
    def test_receiving_scid_over_bredr_and_le(self):
        """
        Test that the L2CAP entity can receive the same SCID in L2CAP connect requests on both the
        BR/EDR and LE links.
        """
        self._setup_acl_link_from_cert()
        # TODO: We should let LE use public address, same as classic link.
        # TODO: Update AclManager::impl::create_le_connection
        self._setup_le_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert(0x33, 0x70)
        (le_dut_channel, le_cert_channel) = self._open_le_coc_from_cert(
            0x35, 0x70)

        dut_channel.send(b'abc')
        assertThat(cert_channel).emits(L2capMatchers.Data(b'abc'))

        le_dut_channel.send(b'hello')
        assertThat(le_cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

        le_cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(le_dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

        cert_channel.disconnect_and_verify()
        le_cert_channel.disconnect_and_verify()
Loading