Loading system/gd/cert/captures.py +74 −35 Original line number Diff line number Diff line Loading @@ -22,6 +22,47 @@ from cert.capture import Capture from cert.matchers import L2capMatchers class HalCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture(lambda packet: b'\x0e\x0a\x01\x09\x10' in packet.payload, lambda packet: hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))))) @staticmethod def ConnectionRequestCapture(): return Capture(lambda packet: b'\x04\x0a' in packet.payload, lambda packet: hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload))))) @staticmethod def ConnectionCompleteCapture(): return Capture(lambda packet: b'\x03\x0b\x00' in packet.payload, lambda packet: hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload))))) @staticmethod def LeConnectionCompleteCapture(): return Capture(lambda packet: packet.payload[0] == 0x3e and (packet.payload[2] == 0x01 or packet.payload[2] == 0x0a), lambda packet: hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))))) class HciCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture(lambda packet: b'\x0e\x0a\x01\x09\x10' in packet.event, lambda packet: hci_packets.ReadBdAddrCompleteView( Loading @@ -30,7 +71,7 @@ def ReadBdAddrCompleteCapture(): bt_packets.PacketViewLittleEndian( list(packet.event)))))) @staticmethod def ConnectionRequestCapture(): return Capture(lambda packet: b'\x04\x0a' in packet.event, lambda packet: hci_packets.ConnectionRequestView( Loading @@ -38,7 +79,7 @@ def ConnectionRequestCapture(): bt_packets.PacketViewLittleEndian( list(packet.event))))) @staticmethod def ConnectionCompleteCapture(): return Capture(lambda packet: b'\x03\x0b\x00' in packet.event, lambda packet: hci_packets.ConnectionCompleteView( Loading @@ -46,7 +87,7 @@ def ConnectionCompleteCapture(): bt_packets.PacketViewLittleEndian( list(packet.event))))) @staticmethod def LeConnectionCompleteCapture(): return Capture(lambda packet: packet.event[0] == 0x3e and (packet.event[2] == 0x01 or packet.event[2] == 0x0a), Loading @@ -54,9 +95,7 @@ def LeConnectionCompleteCapture(): hci_packets.LeMetaEventView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event) ) )))) list(packet.event)))))) class L2capCaptures(object): Loading system/gd/cert/gd_device.py +0 −18 Original line number Diff line number Diff line Loading @@ -88,9 +88,6 @@ class GdDevice(GdDeviceBase): self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub( self.grpc_channel) self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel) self.hci.register_for_events = self.__register_for_hci_events self.hci.send_command_with_complete = self.__send_hci_command_with_complete self.hci.send_command_with_status = self.__send_hci_command_with_status self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub( self.grpc_channel) self.l2cap_le = l2cap_le_facade_pb2_grpc.L2capLeModuleFacadeStub( Loading @@ -111,18 +108,3 @@ class GdDevice(GdDeviceBase): self.grpc_channel) self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) def __register_for_hci_events(self, *event_codes): for event_code in event_codes: msg = hci_facade.EventCodeMsg(code=int(event_code)) self.hci.RegisterEventHandler(msg) def __send_hci_command_with_complete(self, command): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) self.hci.EnqueueCommandWithComplete(cmd) def __send_hci_command_with_status(self, command): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) self.hci.EnqueueCommandWithStatus(cmd) system/gd/cert/matchers.py +4 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,10 @@ class HciMatchers(object): return (opcode is None or frame.GetCommandOpCode() == opcode) and\ frame.GetNumHciCommandPackets() == num_complete @staticmethod def EventWithCode(event_code): return lambda msg: HciMatchers.extract_hci_event_with_code(msg.event, event_code) @staticmethod def extract_hci_event_with_code(packet_bytes, event_code=None): hci_event = hci_packets.EventPacketView( Loading system/gd/cert/py_acl_manager.py +3 −3 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.event_stream import IEventStream from cert.captures import ConnectionCompleteCapture from cert.captures import HciCaptures from cert.closable import Closable from cert.closable import safeClose from bluetooth_packets_python3 import hci_packets Loading Loading @@ -51,7 +51,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): safeClose(self.connection_event_stream) def wait_for_connection_complete(self): connection_complete = ConnectionCompleteCapture() connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.connection_event_stream).emits(connection_complete) self.handle = connection_complete.get().GetConnectionHandle() Loading Loading @@ -90,7 +90,7 @@ class PyAclManager(Closable): remote_addr, None) def accept_connection(self): connection_complete = ConnectionCompleteCapture() connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.incoming_connection_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyAclManagerAclConnection(self.device, self.acl_stream, None, Loading system/gd/cert/py_hal.py 0 → 100644 +51 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.closable import Closable from cert.closable import safeClose from hal import facade_pb2 as hal_facade class PyHal(Closable): def __init__(self, device): self.device = device self.hci_event_stream = EventStream( self.device.hal.FetchHciEvent(empty_proto.Empty())) self.acl_stream = EventStream( self.device.hal.FetchHciAcl(empty_proto.Empty())) # We don't deal with SCO for now def close(self): safeClose(self.hci_event_stream) safeClose(self.acl_stream) def get_hci_event_stream(self): return self.hci_event_stream def get_acl_stream(self): return self.acl_stream def send_hci_command(self, command): self.device.hal.SendHciCommand( hal_facade.HciCommandPacket(payload=bytes(command))) def send_acl(self, acl): self.device.hal.SendHciAcl(hal_facade.HciAclPacket(payload=bytes(acl))) Loading
system/gd/cert/captures.py +74 −35 Original line number Diff line number Diff line Loading @@ -22,6 +22,47 @@ from cert.capture import Capture from cert.matchers import L2capMatchers class HalCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture(lambda packet: b'\x0e\x0a\x01\x09\x10' in packet.payload, lambda packet: hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))))) @staticmethod def ConnectionRequestCapture(): return Capture(lambda packet: b'\x04\x0a' in packet.payload, lambda packet: hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload))))) @staticmethod def ConnectionCompleteCapture(): return Capture(lambda packet: b'\x03\x0b\x00' in packet.payload, lambda packet: hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload))))) @staticmethod def LeConnectionCompleteCapture(): return Capture(lambda packet: packet.payload[0] == 0x3e and (packet.payload[2] == 0x01 or packet.payload[2] == 0x0a), lambda packet: hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))))) class HciCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture(lambda packet: b'\x0e\x0a\x01\x09\x10' in packet.event, lambda packet: hci_packets.ReadBdAddrCompleteView( Loading @@ -30,7 +71,7 @@ def ReadBdAddrCompleteCapture(): bt_packets.PacketViewLittleEndian( list(packet.event)))))) @staticmethod def ConnectionRequestCapture(): return Capture(lambda packet: b'\x04\x0a' in packet.event, lambda packet: hci_packets.ConnectionRequestView( Loading @@ -38,7 +79,7 @@ def ConnectionRequestCapture(): bt_packets.PacketViewLittleEndian( list(packet.event))))) @staticmethod def ConnectionCompleteCapture(): return Capture(lambda packet: b'\x03\x0b\x00' in packet.event, lambda packet: hci_packets.ConnectionCompleteView( Loading @@ -46,7 +87,7 @@ def ConnectionCompleteCapture(): bt_packets.PacketViewLittleEndian( list(packet.event))))) @staticmethod def LeConnectionCompleteCapture(): return Capture(lambda packet: packet.event[0] == 0x3e and (packet.event[2] == 0x01 or packet.event[2] == 0x0a), Loading @@ -54,9 +95,7 @@ def LeConnectionCompleteCapture(): hci_packets.LeMetaEventView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event) ) )))) list(packet.event)))))) class L2capCaptures(object): Loading
system/gd/cert/gd_device.py +0 −18 Original line number Diff line number Diff line Loading @@ -88,9 +88,6 @@ class GdDevice(GdDeviceBase): self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub( self.grpc_channel) self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel) self.hci.register_for_events = self.__register_for_hci_events self.hci.send_command_with_complete = self.__send_hci_command_with_complete self.hci.send_command_with_status = self.__send_hci_command_with_status self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub( self.grpc_channel) self.l2cap_le = l2cap_le_facade_pb2_grpc.L2capLeModuleFacadeStub( Loading @@ -111,18 +108,3 @@ class GdDevice(GdDeviceBase): self.grpc_channel) self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) def __register_for_hci_events(self, *event_codes): for event_code in event_codes: msg = hci_facade.EventCodeMsg(code=int(event_code)) self.hci.RegisterEventHandler(msg) def __send_hci_command_with_complete(self, command): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) self.hci.EnqueueCommandWithComplete(cmd) def __send_hci_command_with_status(self, command): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) self.hci.EnqueueCommandWithStatus(cmd)
system/gd/cert/matchers.py +4 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,10 @@ class HciMatchers(object): return (opcode is None or frame.GetCommandOpCode() == opcode) and\ frame.GetNumHciCommandPackets() == num_complete @staticmethod def EventWithCode(event_code): return lambda msg: HciMatchers.extract_hci_event_with_code(msg.event, event_code) @staticmethod def extract_hci_event_with_code(packet_bytes, event_code=None): hci_event = hci_packets.EventPacketView( Loading
system/gd/cert/py_acl_manager.py +3 −3 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.event_stream import IEventStream from cert.captures import ConnectionCompleteCapture from cert.captures import HciCaptures from cert.closable import Closable from cert.closable import safeClose from bluetooth_packets_python3 import hci_packets Loading Loading @@ -51,7 +51,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): safeClose(self.connection_event_stream) def wait_for_connection_complete(self): connection_complete = ConnectionCompleteCapture() connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.connection_event_stream).emits(connection_complete) self.handle = connection_complete.get().GetConnectionHandle() Loading Loading @@ -90,7 +90,7 @@ class PyAclManager(Closable): remote_addr, None) def accept_connection(self): connection_complete = ConnectionCompleteCapture() connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.incoming_connection_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyAclManagerAclConnection(self.device, self.acl_stream, None, Loading
system/gd/cert/py_hal.py 0 → 100644 +51 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.closable import Closable from cert.closable import safeClose from hal import facade_pb2 as hal_facade class PyHal(Closable): def __init__(self, device): self.device = device self.hci_event_stream = EventStream( self.device.hal.FetchHciEvent(empty_proto.Empty())) self.acl_stream = EventStream( self.device.hal.FetchHciAcl(empty_proto.Empty())) # We don't deal with SCO for now def close(self): safeClose(self.hci_event_stream) safeClose(self.acl_stream) def get_hci_event_stream(self): return self.hci_event_stream def get_acl_stream(self): return self.acl_stream def send_hci_command(self, command): self.device.hal.SendHciCommand( hal_facade.HciCommandPacket(payload=bytes(command))) def send_acl(self, acl): self.device.hal.SendHciAcl(hal_facade.HciAclPacket(payload=bytes(acl)))