Loading system/gd/cert/py_hci.py +11 −15 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ class PyHciAdvertisement(object): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) self.py_hci.send_command_with_complete( self.py_hci.send_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -91,7 +91,7 @@ class PyHciAdvertisement(object): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) self.py_hci.send_command_with_complete( self.py_hci.send_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -100,7 +100,7 @@ class PyHciAdvertisement(object): enabled_set.advertising_handle = self.handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) self.py_hci.send_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) assertThat(self.py_hci.get_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) Loading Loading @@ -150,24 +150,20 @@ class PyHci(Closable): for event_code in event_codes: self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code))) def send_command_with_complete(self, command): self.device.hci.SendCommandWithComplete(common.Data(payload=bytes(command.Serialize()))) def send_command_with_status(self, command): self.device.hci.SendCommandWithStatus(common.Data(payload=bytes(command.Serialize()))) def send_command(self, command): self.device.hci.SendCommand(common.Data(payload=bytes(command.Serialize()))) def enable_inquiry_and_page_scan(self): self.send_command_with_complete( hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) def read_own_address(self): self.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) self.send_command(hci_packets.ReadBdAddrBuilder()) read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() def initiate_connection(self, remote_addr): self.send_command_with_status( self.send_command( hci_packets.CreateConnectionBuilder( remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type Loading @@ -180,7 +176,7 @@ class PyHci(Closable): connection_request = HciCaptures.ConnectionRequestCapture() assertThat(self.event_stream).emits(connection_request) self.send_command_with_status( self.send_command( hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) return self.complete_connection() Loading Loading @@ -209,10 +205,10 @@ class PyHci(Closable): sid=1, scan_request_notification=Enable.DISABLED): self.send_command_with_complete( self.send_command( LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, own_address_type, peer_address_type, peer_address, filter_policy, tx_power, sid, scan_request_notification)) self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) self.send_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) return PyHciAdvertisement(handle, self) system/gd/hci/cert/direct_hci_test.py +11 −14 Original line number Diff line number Diff line Loading @@ -102,9 +102,9 @@ class DirectHciTest(GdBaseTestClass): def test_local_hci_cmd_and_event(self): # Loopback mode responds with ACL and SCO connection complete self.dut_hci.register_for_events(EventCode.LOOPBACK_COMMAND) self.dut_hci.send_command_with_complete(WriteLoopbackModeBuilder(LoopbackMode.ENABLE_LOCAL)) self.dut_hci.send_command(WriteLoopbackModeBuilder(LoopbackMode.ENABLE_LOCAL)) self.dut_hci.send_command_with_complete(ReadLocalNameBuilder()) self.dut_hci.send_command(ReadLocalNameBuilder()) assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.LoopbackOf(ReadLocalNameBuilder())) def test_inquiry_from_dut(self): Loading @@ -113,24 +113,23 @@ class DirectHciTest(GdBaseTestClass): self.cert_hal.enable_inquiry_and_page_scan() lap = Lap() lap.lap = 0x33 self.dut_hci.send_command_with_status(InquiryBuilder(lap, 0x30, 0xff)) self.dut_hci.send_command(InquiryBuilder(lap, 0x30, 0xff)) assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.EventWithCode(EventCode.INQUIRY_RESULT)) def test_le_ad_scan_cert_advertises(self): self.dut_hci.register_for_le_events(SubeventCode.EXTENDED_ADVERTISING_REPORT, SubeventCode.ADVERTISING_REPORT) # DUT Scans self.dut_hci.send_command_with_complete(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = LeScanType.ACTIVE self.dut_hci.send_command_with_complete( self.dut_hci.send_command( LeSetExtendedScanParametersBuilder(OwnAddressType.RANDOM_DEVICE_ADDRESS, LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.dut_hci.send_command_with_complete( LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) # CERT Advertises advertising_handle = 0 Loading Loading @@ -177,8 +176,7 @@ class DirectHciTest(GdBaseTestClass): assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) self.dut_hci.send_command_with_complete( LeSetExtendedScanEnableBuilder(Enable.DISABLED, FilterDuplicates.DISABLED, 0, 0)) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.DISABLED, FilterDuplicates.DISABLED, 0, 0)) def _verify_le_connection_complete(self): cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture() Loading Loading @@ -235,11 +233,10 @@ class DirectHciTest(GdBaseTestClass): def test_le_connect_list_connection_cert_advertises(self): self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE) # DUT Connects self.dut_hci.send_command_with_complete(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command_with_complete( LeAddDeviceToConnectListBuilder(ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command(LeAddDeviceToConnectListBuilder(ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = DirectHciTest._create_phy_scan_params() self.dut_hci.send_command_with_status( self.dut_hci.send_command( LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_CONNECT_LIST, OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) Loading @@ -259,7 +256,7 @@ class DirectHciTest(GdBaseTestClass): self._verify_le_connection_complete() def test_connection_dut_connects(self): self.dut_hci.send_command_with_complete(WritePageTimeoutBuilder(0x4000)) self.dut_hci.send_command(WritePageTimeoutBuilder(0x4000)) self.cert_hal.enable_inquiry_and_page_scan() address = self.cert_hal.read_own_address() Loading system/gd/hci/cert/le_acl_manager_test.py +9 −13 Original line number Diff line number Diff line Loading @@ -63,13 +63,10 @@ class LeAclManagerTest(GdBaseTestClass): msg = hci_facade.EventRequest(code=int(event_code)) self.cert.hci.RequestLeSubevent(msg) def enqueue_hci_command(self, command, expect_complete): def enqueue_hci_command(self, command): cmd_bytes = bytes(command.Serialize()) cmd = common.Data(payload=cmd_bytes) if (expect_complete): self.cert.hci.SendCommandWithComplete(cmd) else: self.cert.hci.SendCommandWithStatus(cmd) self.cert.hci.SendCommand(cmd) def enqueue_acl_data(self, handle, pb_flag, b_flag, data): acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) Loading @@ -95,11 +92,10 @@ class LeAclManagerTest(GdBaseTestClass): 0xF8, 1, #SID hci_packets.Enable.DISABLED # Scan request notification ), True) )) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'), True) hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME Loading @@ -108,7 +104,7 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]), True) hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) gap_short_name = hci_packets.GapData() gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME Loading @@ -117,14 +113,14 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]), True) hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) enabled_set = hci_packets.EnabledSet() enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( Loading Loading @@ -235,7 +231,7 @@ class LeAclManagerTest(GdBaseTestClass): self.dut.hci_le_advertising_manager.CreateAdvertiser(request) # Cert Connects self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), True) self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -249,7 +245,7 @@ class LeAclManagerTest(GdBaseTestClass): hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, self.dut_address.decode(), 1, [phy_scan_params]), False) self.dut_address.decode(), 1, [phy_scan_params])) # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff Loading system/gd/hci/cert/le_advertising_manager_test.py +6 −9 Original line number Diff line number Diff line Loading @@ -42,13 +42,10 @@ class LeAdvertisingManagerTest(GdBaseTestClass): msg = hci_facade.EventRequest(code=int(event_code)) self.cert.hci.RequestLeSubevent(msg) def enqueue_hci_command(self, command, expect_complete): def enqueue_hci_command(self, command): cmd_bytes = bytes(command.Serialize()) cmd = common.Data(payload=cmd_bytes) if (expect_complete): self.cert.hci.SendCommandWithComplete(cmd) else: self.cert.hci.SendCommandWithStatus(cmd) self.cert.hci.SendCommand(cmd) def test_le_ad_scan_dut_advertises(self): self.register_for_le_event(hci_packets.SubeventCode.ADVERTISING_REPORT) Loading @@ -56,7 +53,7 @@ class LeAdvertisingManagerTest(GdBaseTestClass): with EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty())) as hci_le_event_stream: # CERT Scans self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), True) self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) scan_parameters = hci_packets.PhyScanParameters() scan_parameters.le_scan_type = hci_packets.LeScanType.ACTIVE scan_parameters.le_scan_interval = 40 Loading @@ -64,10 +61,10 @@ class LeAdvertisingManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [scan_parameters]), True) [scan_parameters])) self.enqueue_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0), True) hci_packets.FilterDuplicates.DISABLED, 0, 0)) # DUT Advertises gap_name = hci_packets.GapData() Loading @@ -91,4 +88,4 @@ class LeAdvertisingManagerTest(GdBaseTestClass): remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.enqueue_hci_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED), True) hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED)) system/gd/hci/facade/facade.cc +9 −14 Original line number Diff line number Diff line Loading @@ -66,23 +66,18 @@ class HciFacadeService : public HciFacade::Service { std::vector<uint8_t> bytes_; }; ::grpc::Status SendCommandWithComplete( ::grpc::Status SendCommand( ::grpc::ServerContext* context, const ::bluetooth::facade::Data* command, ::google::protobuf::Empty* response) override { auto packet = std::make_unique<TestCommandBuilder>( std::vector<uint8_t>(command->payload().begin(), command->payload().end())); auto payload = std::vector<uint8_t>(command->payload().begin(), command->payload().end()); auto packet = std::make_unique<TestCommandBuilder>(payload); auto opcode = static_cast<const bluetooth::hci::OpCode>(payload.at(1) << 8 | payload.at(0)); if (Checker::IsCommandStatusOpcode(opcode)) { hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_status)); } else { hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_complete)); return ::grpc::Status::OK; } ::grpc::Status SendCommandWithStatus( ::grpc::ServerContext* context, const ::bluetooth::facade::Data* command, ::google::protobuf::Empty* response) override { auto packet = std::make_unique<TestCommandBuilder>( std::vector<uint8_t>(command->payload().begin(), command->payload().end())); hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_status)); return ::grpc::Status::OK; } Loading Loading
system/gd/cert/py_hci.py +11 −15 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ class PyHciAdvertisement(object): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) self.py_hci.send_command_with_complete( self.py_hci.send_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -91,7 +91,7 @@ class PyHciAdvertisement(object): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) self.py_hci.send_command_with_complete( self.py_hci.send_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -100,7 +100,7 @@ class PyHciAdvertisement(object): enabled_set.advertising_handle = self.handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) self.py_hci.send_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) assertThat(self.py_hci.get_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) Loading Loading @@ -150,24 +150,20 @@ class PyHci(Closable): for event_code in event_codes: self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code))) def send_command_with_complete(self, command): self.device.hci.SendCommandWithComplete(common.Data(payload=bytes(command.Serialize()))) def send_command_with_status(self, command): self.device.hci.SendCommandWithStatus(common.Data(payload=bytes(command.Serialize()))) def send_command(self, command): self.device.hci.SendCommand(common.Data(payload=bytes(command.Serialize()))) def enable_inquiry_and_page_scan(self): self.send_command_with_complete( hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) def read_own_address(self): self.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) self.send_command(hci_packets.ReadBdAddrBuilder()) read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() def initiate_connection(self, remote_addr): self.send_command_with_status( self.send_command( hci_packets.CreateConnectionBuilder( remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type Loading @@ -180,7 +176,7 @@ class PyHci(Closable): connection_request = HciCaptures.ConnectionRequestCapture() assertThat(self.event_stream).emits(connection_request) self.send_command_with_status( self.send_command( hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) return self.complete_connection() Loading Loading @@ -209,10 +205,10 @@ class PyHci(Closable): sid=1, scan_request_notification=Enable.DISABLED): self.send_command_with_complete( self.send_command( LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, own_address_type, peer_address_type, peer_address, filter_policy, tx_power, sid, scan_request_notification)) self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) self.send_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) return PyHciAdvertisement(handle, self)
system/gd/hci/cert/direct_hci_test.py +11 −14 Original line number Diff line number Diff line Loading @@ -102,9 +102,9 @@ class DirectHciTest(GdBaseTestClass): def test_local_hci_cmd_and_event(self): # Loopback mode responds with ACL and SCO connection complete self.dut_hci.register_for_events(EventCode.LOOPBACK_COMMAND) self.dut_hci.send_command_with_complete(WriteLoopbackModeBuilder(LoopbackMode.ENABLE_LOCAL)) self.dut_hci.send_command(WriteLoopbackModeBuilder(LoopbackMode.ENABLE_LOCAL)) self.dut_hci.send_command_with_complete(ReadLocalNameBuilder()) self.dut_hci.send_command(ReadLocalNameBuilder()) assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.LoopbackOf(ReadLocalNameBuilder())) def test_inquiry_from_dut(self): Loading @@ -113,24 +113,23 @@ class DirectHciTest(GdBaseTestClass): self.cert_hal.enable_inquiry_and_page_scan() lap = Lap() lap.lap = 0x33 self.dut_hci.send_command_with_status(InquiryBuilder(lap, 0x30, 0xff)) self.dut_hci.send_command(InquiryBuilder(lap, 0x30, 0xff)) assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.EventWithCode(EventCode.INQUIRY_RESULT)) def test_le_ad_scan_cert_advertises(self): self.dut_hci.register_for_le_events(SubeventCode.EXTENDED_ADVERTISING_REPORT, SubeventCode.ADVERTISING_REPORT) # DUT Scans self.dut_hci.send_command_with_complete(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = LeScanType.ACTIVE self.dut_hci.send_command_with_complete( self.dut_hci.send_command( LeSetExtendedScanParametersBuilder(OwnAddressType.RANDOM_DEVICE_ADDRESS, LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.dut_hci.send_command_with_complete( LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) # CERT Advertises advertising_handle = 0 Loading Loading @@ -177,8 +176,7 @@ class DirectHciTest(GdBaseTestClass): assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) self.dut_hci.send_command_with_complete( LeSetExtendedScanEnableBuilder(Enable.DISABLED, FilterDuplicates.DISABLED, 0, 0)) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.DISABLED, FilterDuplicates.DISABLED, 0, 0)) def _verify_le_connection_complete(self): cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture() Loading Loading @@ -235,11 +233,10 @@ class DirectHciTest(GdBaseTestClass): def test_le_connect_list_connection_cert_advertises(self): self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE) # DUT Connects self.dut_hci.send_command_with_complete(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command_with_complete( LeAddDeviceToConnectListBuilder(ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command(LeAddDeviceToConnectListBuilder(ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = DirectHciTest._create_phy_scan_params() self.dut_hci.send_command_with_status( self.dut_hci.send_command( LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_CONNECT_LIST, OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) Loading @@ -259,7 +256,7 @@ class DirectHciTest(GdBaseTestClass): self._verify_le_connection_complete() def test_connection_dut_connects(self): self.dut_hci.send_command_with_complete(WritePageTimeoutBuilder(0x4000)) self.dut_hci.send_command(WritePageTimeoutBuilder(0x4000)) self.cert_hal.enable_inquiry_and_page_scan() address = self.cert_hal.read_own_address() Loading
system/gd/hci/cert/le_acl_manager_test.py +9 −13 Original line number Diff line number Diff line Loading @@ -63,13 +63,10 @@ class LeAclManagerTest(GdBaseTestClass): msg = hci_facade.EventRequest(code=int(event_code)) self.cert.hci.RequestLeSubevent(msg) def enqueue_hci_command(self, command, expect_complete): def enqueue_hci_command(self, command): cmd_bytes = bytes(command.Serialize()) cmd = common.Data(payload=cmd_bytes) if (expect_complete): self.cert.hci.SendCommandWithComplete(cmd) else: self.cert.hci.SendCommandWithStatus(cmd) self.cert.hci.SendCommand(cmd) def enqueue_acl_data(self, handle, pb_flag, b_flag, data): acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) Loading @@ -95,11 +92,10 @@ class LeAclManagerTest(GdBaseTestClass): 0xF8, 1, #SID hci_packets.Enable.DISABLED # Scan request notification ), True) )) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'), True) hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME Loading @@ -108,7 +104,7 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]), True) hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) gap_short_name = hci_packets.GapData() gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME Loading @@ -117,14 +113,14 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]), True) hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) enabled_set = hci_packets.EnabledSet() enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( Loading Loading @@ -235,7 +231,7 @@ class LeAclManagerTest(GdBaseTestClass): self.dut.hci_le_advertising_manager.CreateAdvertiser(request) # Cert Connects self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), True) self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -249,7 +245,7 @@ class LeAclManagerTest(GdBaseTestClass): hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, self.dut_address.decode(), 1, [phy_scan_params]), False) self.dut_address.decode(), 1, [phy_scan_params])) # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff Loading
system/gd/hci/cert/le_advertising_manager_test.py +6 −9 Original line number Diff line number Diff line Loading @@ -42,13 +42,10 @@ class LeAdvertisingManagerTest(GdBaseTestClass): msg = hci_facade.EventRequest(code=int(event_code)) self.cert.hci.RequestLeSubevent(msg) def enqueue_hci_command(self, command, expect_complete): def enqueue_hci_command(self, command): cmd_bytes = bytes(command.Serialize()) cmd = common.Data(payload=cmd_bytes) if (expect_complete): self.cert.hci.SendCommandWithComplete(cmd) else: self.cert.hci.SendCommandWithStatus(cmd) self.cert.hci.SendCommand(cmd) def test_le_ad_scan_dut_advertises(self): self.register_for_le_event(hci_packets.SubeventCode.ADVERTISING_REPORT) Loading @@ -56,7 +53,7 @@ class LeAdvertisingManagerTest(GdBaseTestClass): with EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty())) as hci_le_event_stream: # CERT Scans self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), True) self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) scan_parameters = hci_packets.PhyScanParameters() scan_parameters.le_scan_type = hci_packets.LeScanType.ACTIVE scan_parameters.le_scan_interval = 40 Loading @@ -64,10 +61,10 @@ class LeAdvertisingManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [scan_parameters]), True) [scan_parameters])) self.enqueue_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0), True) hci_packets.FilterDuplicates.DISABLED, 0, 0)) # DUT Advertises gap_name = hci_packets.GapData() Loading @@ -91,4 +88,4 @@ class LeAdvertisingManagerTest(GdBaseTestClass): remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.enqueue_hci_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED), True) hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED))
system/gd/hci/facade/facade.cc +9 −14 Original line number Diff line number Diff line Loading @@ -66,23 +66,18 @@ class HciFacadeService : public HciFacade::Service { std::vector<uint8_t> bytes_; }; ::grpc::Status SendCommandWithComplete( ::grpc::Status SendCommand( ::grpc::ServerContext* context, const ::bluetooth::facade::Data* command, ::google::protobuf::Empty* response) override { auto packet = std::make_unique<TestCommandBuilder>( std::vector<uint8_t>(command->payload().begin(), command->payload().end())); auto payload = std::vector<uint8_t>(command->payload().begin(), command->payload().end()); auto packet = std::make_unique<TestCommandBuilder>(payload); auto opcode = static_cast<const bluetooth::hci::OpCode>(payload.at(1) << 8 | payload.at(0)); if (Checker::IsCommandStatusOpcode(opcode)) { hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_status)); } else { hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_complete)); return ::grpc::Status::OK; } ::grpc::Status SendCommandWithStatus( ::grpc::ServerContext* context, const ::bluetooth::facade::Data* command, ::google::protobuf::Empty* response) override { auto packet = std::make_unique<TestCommandBuilder>( std::vector<uint8_t>(command->payload().begin(), command->payload().end())); hci_layer_->EnqueueCommand(std::move(packet), facade_handler_->BindOnceOn(this, &HciFacadeService::on_status)); return ::grpc::Status::OK; } Loading