Loading system/gd/hci/cert/acl_manager_test.py +22 −37 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ from bluetooth_packets_python3 import hci_packets from captures import ReadBdAddrCompleteCapture from captures import ConnectionCompleteCapture from captures import ConnectionRequestCapture from py_hci import PyHci class AclManagerTest(GdFacadeOnlyBaseTestClass): Loading @@ -52,21 +53,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address self.cert.hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.cert.hci.send_command_with_complete( hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(cert_hci_event_stream).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() cert_hci.enable_inquiry_and_page_scan() cert_address = cert_hci.read_own_address() with EventStream( self.dut.hci_acl_manager.CreateConnection( Loading @@ -78,16 +69,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() assertThat(cert_hci_event_stream).emits(connection_request) assertThat( cert_hci.get_event_stream()).emits(connection_request) self.cert.hci.send_command_with_status( cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading @@ -110,7 +103,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) assertThat(cert_acl_data_stream).emits( assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) Loading @@ -121,8 +114,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -133,7 +125,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): neighbor_facade.EnableMsg(enabled=True)) # Cert connects self.cert.hci.send_command_with_status( cert_hci.send_command_with_status( hci_packets.CreateConnectionBuilder( dut_address.decode('utf-8'), 0xcc18, # Packet Type Loading @@ -155,7 +147,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): ))) connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat(cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading @@ -165,7 +157,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) assertThat(cert_acl_data_stream).emits( assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) Loading @@ -176,21 +168,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address self.cert.hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.cert.hci.send_command_with_complete( hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(cert_hci_event_stream).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() cert_hci.enable_inquiry_and_page_scan() cert_address = cert_hci.read_own_address() with EventStream( self.dut.hci_acl_manager.CreateConnection( Loading @@ -202,15 +185,17 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() assertThat(cert_hci_event_stream).emits(connection_request) self.cert.hci.send_command_with_status( assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading system/gd/hci/cert/py_hci.py 0 → 100644 +67 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from captures import ReadBdAddrCompleteCapture from bluetooth_packets_python3 import hci_packets from cert.truth import assertThat class PyHci(object): def __init__(self, device): self.device = device self.event_stream = self.device.hci.new_event_stream() self.acl_stream = EventStream( self.device.hci.FetchAclPackets(empty_proto.Empty())) def __enter__(self): return self def __exit__(self, type, value, traceback): self.clean_up() return traceback is None def __del__(self): self.clean_up() def clean_up(self): self.event_stream.shutdown() self.acl_stream.shutdown() def get_event_stream(self): return self.event_stream def get_acl_stream(self): return self.acl_stream def send_command_with_complete(self, command): self.device.hci.send_command_with_complete(command) def send_command_with_status(self, command): self.device.hci.send_command_with_status(command) def enable_inquiry_and_page_scan(self): self.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) def read_own_address(self): self.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() Loading
system/gd/hci/cert/acl_manager_test.py +22 −37 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ from bluetooth_packets_python3 import hci_packets from captures import ReadBdAddrCompleteCapture from captures import ConnectionCompleteCapture from captures import ConnectionRequestCapture from py_hci import PyHci class AclManagerTest(GdFacadeOnlyBaseTestClass): Loading @@ -52,21 +53,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address self.cert.hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.cert.hci.send_command_with_complete( hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(cert_hci_event_stream).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() cert_hci.enable_inquiry_and_page_scan() cert_address = cert_hci.read_own_address() with EventStream( self.dut.hci_acl_manager.CreateConnection( Loading @@ -78,16 +69,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() assertThat(cert_hci_event_stream).emits(connection_request) assertThat( cert_hci.get_event_stream()).emits(connection_request) self.cert.hci.send_command_with_status( cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading @@ -110,7 +103,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) assertThat(cert_acl_data_stream).emits( assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) Loading @@ -121,8 +114,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -133,7 +125,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): neighbor_facade.EnableMsg(enabled=True)) # Cert connects self.cert.hci.send_command_with_status( cert_hci.send_command_with_status( hci_packets.CreateConnectionBuilder( dut_address.decode('utf-8'), 0xcc18, # Packet Type Loading @@ -155,7 +147,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): ))) connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat(cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading @@ -165,7 +157,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) assertThat(cert_acl_data_stream).emits( assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) Loading @@ -176,21 +168,12 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address self.cert.hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.cert.hci.send_command_with_complete( hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(cert_hci_event_stream).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() cert_hci.enable_inquiry_and_page_scan() cert_address = cert_hci.read_own_address() with EventStream( self.dut.hci_acl_manager.CreateConnection( Loading @@ -202,15 +185,17 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() assertThat(cert_hci_event_stream).emits(connection_request) self.cert.hci.send_command_with_status( assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat(cert_hci_event_stream).emits(connection_complete) assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( Loading
system/gd/hci/cert/py_hci.py 0 → 100644 +67 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from captures import ReadBdAddrCompleteCapture from bluetooth_packets_python3 import hci_packets from cert.truth import assertThat class PyHci(object): def __init__(self, device): self.device = device self.event_stream = self.device.hci.new_event_stream() self.acl_stream = EventStream( self.device.hci.FetchAclPackets(empty_proto.Empty())) def __enter__(self): return self def __exit__(self, type, value, traceback): self.clean_up() return traceback is None def __del__(self): self.clean_up() def clean_up(self): self.event_stream.shutdown() self.acl_stream.shutdown() def get_event_stream(self): return self.event_stream def get_acl_stream(self): return self.acl_stream def send_command_with_complete(self, command): self.device.hci.send_command_with_complete(command) def send_command_with_status(self, command): self.device.hci.send_command_with_status(command) def enable_inquiry_and_page_scan(self): self.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) def read_own_address(self): self.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr()