Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2e7a938e authored by android-build-team Robot's avatar android-build-team Robot
Browse files

Snap for 6984833 from fa379f3f to sc-release

Change-Id: I96d317f2e829d0f217d125d66b2e5a53c311f988
parents ef078305 fa379f3f
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -291,6 +291,10 @@ static void event_shut_down_stack(UNUSED_ATTR void* context) {

  future_await(local_hack_future);

  main_thread_shut_down();

  module_clean_up(get_module(BTE_LOGMSG_MODULE));

  gatt_free();
  l2c_free();
  sdp_free();
@@ -305,10 +309,6 @@ static void event_shut_down_stack(UNUSED_ATTR void* context) {
    module_shut_down(get_module(BTSNOOP_MODULE));
  }

  main_thread_shut_down();

  module_clean_up(get_module(BTE_LOGMSG_MODULE));

  module_shut_down(get_module(CONTROLLER_MODULE));  // Doesn't do any work, just
                                                    // puts it in a restartable
                                                    // state
+8 −8
Original line number Diff line number Diff line
@@ -67,53 +67,53 @@ class HciCaptures(object):
    def ReadLocalOobDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
        )

    @staticmethod
    def ReadLocalOobExtendedDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
        )

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR),
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_BD_ADDR)))
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR)))

    @staticmethod
    def ConnectionRequestCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST),
            lambda packet: hci_packets.ConnectionRequestView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_REQUEST)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST)))

    @staticmethod
    def ConnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE),
            lambda packet: hci_packets.ConnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE)))

    @staticmethod
    def DisconnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE),
            lambda packet: hci_packets.DisconnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.DISCONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE)))

    @staticmethod
    def LeConnectionCompleteCapture():
        return Capture(HciMatchers.LeConnectionComplete(),
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.event))
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.payload))

    @staticmethod
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE),
            lambda packet: hci_packets.SimplePairingCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))


class L2capCaptures(object):
+53 −13
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import bluetooth_packets_python3 as bt_packets
import logging

@@ -31,7 +32,7 @@ class HciMatchers(object):

    @staticmethod
    def CommandComplete(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode)
        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
@@ -55,9 +56,35 @@ class HciMatchers(object):
            else:
                return complete

    @staticmethod
    def CommandStatus(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)

    @staticmethod
    def _is_matching_command_status(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None

    @staticmethod
    def _extract_matching_command_status(packet_bytes, opcode=None):
        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
        if event is None:
            return None
        complete = hci_packets.CommandStatusView(event)
        if opcode is None or complete is None:
            return complete
        else:
            if complete.GetCommandOpCode() != opcode:
                return None
            else:
                return complete

    @staticmethod
    def EventWithCode(event_code):
        return lambda msg: HciMatchers._is_matching_event(msg.event, event_code)
        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)

    @staticmethod
    def ExtractEventWithCode(packet_bytes, event_code):
@@ -78,7 +105,7 @@ class HciMatchers(object):

    @staticmethod
    def LeEventWithCode(subevent_code):
        return lambda msg: HciMatchers._extract_matching_le_event(msg.event, subevent_code) is not None
        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None

    @staticmethod
    def ExtractLeEventWithCode(packet_bytes, subevent_code):
@@ -86,17 +113,17 @@ class HciMatchers(object):

    @staticmethod
    def _extract_matching_le_event(packet_bytes, subevent_code):
        event = hci_packets.LeMetaEventView(
            HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT))
        if event is None:
        inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)
        if inner_event is None:
            return None
        event = hci_packets.LeMetaEventView(inner_event)
        if event.GetSubeventCode() != subevent_code:
            return None
        return event

    @staticmethod
    def LeConnectionComplete():
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.event) is not None
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None

    @staticmethod
    def ExtractLeConnectionComplete(packet_bytes):
@@ -104,12 +131,16 @@ class HciMatchers(object):

    @staticmethod
    def _extract_le_connection_complete(packet_bytes):
        event = hci_packets.LeConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE))
        if event is not None:
            return event
        return hci_packets.LeEnhancedConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE))
        inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeConnectionCompleteView(inner_event)

        inner_event = HciMatchers._extract_matching_le_event(packet_bytes,
                                                             hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeEnhancedConnectionCompleteView(inner_event)

        return None

    @staticmethod
    def LogEventCode():
@@ -163,6 +194,15 @@ class HciMatchers(object):
    def RemoteOobDataRequest():
        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST)

    @staticmethod
    def LoopbackOf(packet):
        return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet))

    @staticmethod
    def Exactly(packet):
        data = bytes(packet.Serialize())
        return lambda event: data == event.payload


class NeighborMatchers(object):

+238 −3
Original line number Diff line number Diff line
@@ -16,9 +16,99 @@

from google.protobuf import empty_pb2 as empty_proto
from cert.event_stream import EventStream
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.closable import Closable
from cert.closable import safeClose
from cert.captures import HciCaptures
from cert.truth import assertThat
from hal import facade_pb2 as hal_facade
from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder
from bluetooth_packets_python3.hci_packets import ScanEnable
from bluetooth_packets_python3.hci_packets import AclPacketBuilder
from bluetooth_packets_python3 import RawBuilder
from bluetooth_packets_python3.hci_packets import BroadcastFlag
from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
from bluetooth_packets_python3 import hci_packets
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHalAclConnection(IEventStream):

    def __init__(self, handle, acl_stream, device):
        self.handle = int(handle)
        self.device = device
        self.our_acl_stream = FilteringEventStream(acl_stream, None)

    def send(self, pb_flag, b_flag, data):
        acl = AclPacketBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize())))

    def send_first(self, data):
        self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data))

    def get_event_queue(self):
        return self.our_acl_stream.get_event_queue()


class PyHalAdvertisement(object):

    def __init__(self, handle, py_hal):
        self.handle = handle
        self.py_hal = py_hal

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE)

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)

    def stop(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)


class PyHal(Closable):
@@ -38,11 +128,156 @@ class PyHal(Closable):
    def get_hci_event_stream(self):
        return self.hci_event_stream

    def wait_for_complete(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))

    def wait_for_status(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))

    def get_acl_stream(self):
        return self.acl_stream

    def send_hci_command(self, command):
        self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command)))
        self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize())))

    def send_acl(self, handle, pb_flag, b_flag, data):
        acl = AclPacketBuilder(handle, pb_flag, b_flag, RawBuilder(data))
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize())))

    def send_acl_first(self, handle, data):
        self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data)

    def read_own_address(self):
        self.send_hci_command(hci_packets.ReadBdAddrBuilder())
        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
        assertThat(self.hci_event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()

    def set_random_le_address(self, addr):
        self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)

    def set_scan_parameters(self):
        phy_scan_params = hci_packets.PhyScanParameters()
        phy_scan_params.le_scan_interval = 6553
        phy_scan_params.le_scan_window = 6553
        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE

        self.send_hci_command(
            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
                                                           [phy_scan_params]))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)

    def start_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def stop_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def reset(self):
        self.send_hci_command(hci_packets.ResetBuilder())
        self.wait_for_complete(OpCode.RESET)

    def enable_inquiry_and_page_scan(self):
        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))

    def initiate_connection(self, remote_addr):
        self.send_hci_command(
            hci_packets.CreateConnectionBuilder(
                remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'),
                0xcc18,  # Packet Type
                hci_packets.PageScanRepetitionMode.R1,
                0x0,
                hci_packets.ClockOffsetValid.INVALID,
                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

    def accept_connection(self):
        connection_request = HciCaptures.ConnectionRequestCapture()
        assertThat(self.hci_event_stream).emits(connection_request)

        self.send_hci_command(
            hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(),
                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
        return self.complete_connection()

    def complete_connection(self):
        connection_complete = HciCaptures.ConnectionCompleteCapture()
        assertThat(self.hci_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def initiate_le_connection(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)

    def add_to_connect_list(self, remote_addr):
        self.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, remote_addr))

    def initiate_le_connection_by_connect_list(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))

    def complete_le_connection(self):
        connection_complete = HciCaptures.LeConnectionCompleteCapture()
        assertThat(self.hci_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_hci_command(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)

        self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS)

    def send_acl(self, acl):
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl)))
        return PyHalAdvertisement(handle, self)
+85 −25

File changed.

Preview size limit exceeded, changes collapsed.

Loading