Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b7ccae33 authored by Zach Johnson's avatar Zach Johnson Committed by Gerrit Code Review
Browse files

Merge changes I333524dc,Ia460804d,I90cfe69e,I848e344b,Ie9108602, ...

* changes:
  Add read_own_addres to PyHal
  Simplify ACL connection components of DirectHciTest
  Add send_acl_first to PyHal, to simplify redundancy
  Fold ACL sending into PyHal
  Simplify ACL sending in SimpleHalTest
  Add shortcut for enabling inquiry in PyHal
  Simplify DirectHciTest send_hal_acl_data
  Add HciMatchers.LoopbackOf to avoid imprecise matching
  Improve signal to noise in DirectHciTest
  DirectHciTest, remove send_hal_hci_command
  Improve hci facade proto
parents a6216543 3f918a19
Loading
Loading
Loading
Loading
+8 −8
Original line number Diff line number Diff line
@@ -67,53 +67,53 @@ class HciCaptures(object):
    def ReadLocalOobDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
        )

    @staticmethod
    def ReadLocalOobExtendedDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
        )

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR),
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_BD_ADDR)))
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR)))

    @staticmethod
    def ConnectionRequestCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST),
            lambda packet: hci_packets.ConnectionRequestView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_REQUEST)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST)))

    @staticmethod
    def ConnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE),
            lambda packet: hci_packets.ConnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE)))

    @staticmethod
    def DisconnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE),
            lambda packet: hci_packets.DisconnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.DISCONNECTION_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE)))

    @staticmethod
    def LeConnectionCompleteCapture():
        return Capture(HciMatchers.LeConnectionComplete(),
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.event))
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.payload))

    @staticmethod
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE),
            lambda packet: hci_packets.SimplePairingCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))


class L2capCaptures(object):
+9 −4
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ class HciMatchers(object):

    @staticmethod
    def CommandComplete(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode)
        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
@@ -57,7 +57,7 @@ class HciMatchers(object):

    @staticmethod
    def EventWithCode(event_code):
        return lambda msg: HciMatchers._is_matching_event(msg.event, event_code)
        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)

    @staticmethod
    def ExtractEventWithCode(packet_bytes, event_code):
@@ -78,7 +78,7 @@ class HciMatchers(object):

    @staticmethod
    def LeEventWithCode(subevent_code):
        return lambda msg: HciMatchers._extract_matching_le_event(msg.event, subevent_code) is not None
        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None

    @staticmethod
    def ExtractLeEventWithCode(packet_bytes, subevent_code):
@@ -96,7 +96,7 @@ class HciMatchers(object):

    @staticmethod
    def LeConnectionComplete():
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.event) is not None
        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None

    @staticmethod
    def ExtractLeConnectionComplete(packet_bytes):
@@ -163,6 +163,11 @@ class HciMatchers(object):
    def RemoteOobDataRequest():
        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST)

    @staticmethod
    def LoopbackOf(packet):
        data = bytes(hci_packets.LoopbackCommandBuilder(packet).Serialize())
        return lambda event: data == event.payload


class NeighborMatchers(object):

+70 −2
Original line number Diff line number Diff line
@@ -16,9 +16,38 @@

from google.protobuf import empty_pb2 as empty_proto
from cert.event_stream import EventStream
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.closable import Closable
from cert.closable import safeClose
from cert.captures import HciCaptures
from cert.truth import assertThat
from hal import facade_pb2 as hal_facade
from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder
from bluetooth_packets_python3.hci_packets import ScanEnable
from bluetooth_packets_python3.hci_packets import AclPacketBuilder
from bluetooth_packets_python3 import RawBuilder
from bluetooth_packets_python3.hci_packets import BroadcastFlag
from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
from bluetooth_packets_python3 import hci_packets


class PyHalAclConnection(IEventStream):

    def __init__(self, handle, acl_stream, device):
        self.handle = int(handle)
        self.device = device
        self.our_acl_stream = FilteringEventStream(acl_stream, None)

    def send(self, pb_flag, b_flag, data):
        acl = AclPacketBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize())))

    def send_first(self, data):
        self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data))

    def get_event_queue(self):
        return self.our_acl_stream.get_event_queue()


class PyHal(Closable):
@@ -44,5 +73,44 @@ class PyHal(Closable):
    def send_hci_command(self, command):
        self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize())))

    def send_acl(self, acl):
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl)))
    def send_acl(self, handle, pb_flag, b_flag, data):
        acl = AclPacketBuilder(handle, pb_flag, b_flag, RawBuilder(data))
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize())))

    def send_acl_first(self, handle, data):
        self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data)

    def read_own_address(self):
        self.send_hci_command(hci_packets.ReadBdAddrBuilder())
        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
        assertThat(self.hci_event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()

    def enable_inquiry_and_page_scan(self):
        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))

    def initiate_connection(self, remote_addr):
        self.send_hci_command(
            hci_packets.CreateConnectionBuilder(
                remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'),
                0xcc18,  # Packet Type
                hci_packets.PageScanRepetitionMode.R1,
                0x0,
                hci_packets.ClockOffsetValid.INVALID,
                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))

    def accept_connection(self):
        connection_request = HciCaptures.ConnectionRequestCapture()
        assertThat(self.hci_event_stream).emits(connection_request)

        self.send_hci_command(
            hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(),
                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
        return self.complete_connection()

    def complete_connection(self):
        connection_complete = HciCaptures.ConnectionCompleteCapture()
        assertThat(self.hci_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)
+10 −25
Original line number Diff line number Diff line
@@ -35,9 +35,9 @@ class PyHciAclConnection(IEventStream):
        self.our_acl_stream = FilteringEventStream(acl_stream, None)

    def send(self, pb_flag, b_flag, data):
        acl_msg = hci_facade.AclMsg(
        acl_msg = hci_facade.AclPacket(
            handle=self.handle, packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data)
        self.device.hci.SendAclData(acl_msg)
        self.device.hci.SendAcl(acl_msg)

    def send_first(self, data):
        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
@@ -64,22 +64,13 @@ class PyHci(Closable):
            want this if you are testing HCI itself.
        """
        self.device = device
        self._setup_event_stream()
        self._setup_le_event_stream()
        self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty()))
        self.le_event_stream = EventStream(self.device.hci.StreamLeSubevents(empty_proto.Empty()))
        if acl_streaming:
            self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
                                     hci_packets.EventCode.CONNECTION_COMPLETE,
                                     hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
            self._setup_acl_stream()

    def _setup_event_stream(self):
        self.event_stream = EventStream(self.device.hci.FetchEvents(empty_proto.Empty()))

    def _setup_le_event_stream(self):
        self.le_event_stream = EventStream(self.device.hci.FetchLeSubevents(empty_proto.Empty()))

    def _setup_acl_stream(self):
        self.acl_stream = EventStream(self.device.hci.FetchAclPackets(empty_proto.Empty()))
            self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))

    def close(self):
        safeClose(self.event_stream)
@@ -99,23 +90,17 @@ class PyHci(Closable):

    def register_for_events(self, *event_codes):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.device.hci.RegisterEventHandler(msg)
            self.device.hci.RequestEvent(hci_facade.EventRequest(code=int(event_code)))

    def register_for_le_events(self, *event_codes):
        for event_code in event_codes:
            msg = hci_facade.EventCodeMsg(code=int(event_code))
            self.device.hci.RegisterLeEventHandler(msg)
            self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code)))

    def send_command_with_complete(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.device.hci.EnqueueCommandWithComplete(cmd)
        self.device.hci.SendCommandWithComplete(hci_facade.Command(payload=bytes(command.Serialize())))

    def send_command_with_status(self, command):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        self.device.hci.EnqueueCommandWithStatus(cmd)
        self.device.hci.SendCommandWithStatus(hci_facade.Command(payload=bytes(command.Serialize())))

    def enable_inquiry_and_page_scan(self):
        self.send_command_with_complete(
@@ -130,7 +115,7 @@ class PyHci(Closable):
    def initiate_connection(self, remote_addr):
        self.send_command_with_status(
            hci_packets.CreateConnectionBuilder(
                remote_addr.decode('utf-8'),
                remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'),
                0xcc18,  # Packet Type
                hci_packets.PageScanRepetitionMode.R1,
                0x0,
+4 −24
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@ from facade import rootservice_pb2 as facade_rootservice_pb2
from hal import facade_pb2 as hal_facade_pb2
from bluetooth_packets_python3 import hci_packets
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3.hci_packets import AclPacketBuilder
from bluetooth_packets_python3 import RawBuilder

_GRPC_TIMEOUT = 10

@@ -48,26 +50,6 @@ class SimpleHalTest(GdBaseTestClass):
        self.cert_hal.close()
        super().teardown_test()

    def send_cert_acl_data(self, handle, pb_flag, b_flag, acl):
        lower = handle & 0xff
        upper = (handle >> 8) & 0xf
        upper = upper | int(pb_flag) & 0x3
        upper = upper | ((int(b_flag) & 0x3) << 2)
        lower_length = len(acl) & 0xff
        upper_length = (len(acl) & 0xff00) >> 8
        concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
        self.cert_hal.send_acl(concatenated)

    def send_dut_acl_data(self, handle, pb_flag, b_flag, acl):
        lower = handle & 0xff
        upper = (handle >> 8) & 0xf
        upper = upper | int(pb_flag) & 0x3
        upper = upper | ((int(b_flag) & 0x3) << 2)
        lower_length = len(acl) & 0xff
        upper_length = (len(acl) & 0xff00) >> 8
        concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
        self.dut_hal.send_acl(concatenated)

    def test_fetch_hci_event(self):
        self.dut_hal.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
@@ -242,10 +224,8 @@ class SimpleHalTest(GdBaseTestClass):
        dut_handle = conn_handle

        # Send ACL Data
        self.send_dut_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                               hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
        self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                                hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))
        self.dut_hal.send_acl_first(dut_handle, bytes(b'Just SomeAclData'))
        self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData'))

        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
        assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
Loading