Loading system/gd/cert/matchers.py +13 −0 Original line number Diff line number Diff line Loading @@ -208,6 +208,19 @@ class HciMatchers(object): return lambda event: data == event.payload class AdvertisingMatchers(object): @staticmethod def CallbackMsg(type, advertiser_id=None, status=None, data=None): return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ and (status == None or status == event.status) and (data == None or data == event.data) else False @staticmethod def AddressMsg(type, advertiser_id=None, address=None): return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ and (address == None or address == event.address) else False class NeighborMatchers(object): @staticmethod Loading system/gd/hci/cert/le_advertising_manager_test_lib.py +180 −17 Original line number Diff line number Diff line Loading @@ -18,38 +18,74 @@ import os import sys import logging from bluetooth_packets_python3 import hci_packets from cert.event_stream import EventStream from google.protobuf import empty_pb2 as empty_proto from cert.closable import Closable from cert.closable import safeClose from cert.matchers import AdvertisingMatchers from cert.py_hci import PyHci from cert.truth import assertThat from facade import common_pb2 as common from facade import rootservice_pb2 as facade_rootservice from google.protobuf import empty_pb2 as empty_proto from hci.facade import hci_facade_pb2 as hci_facade from hci.facade import \ le_advertising_manager_facade_pb2 as le_advertising_facade from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade from bluetooth_packets_python3 import hci_packets from facade import common_pb2 as common from cert.py_hci import PyHci from cert.truth import assertThat from hci.facade.le_advertising_manager_facade_pb2 import AdvertisingStatus from hci.facade.le_advertising_manager_facade_pb2 import CallbackMsgType class LeAdvertisingManagerTestBase(): def setup_test(self, cert): self.cert_hci = PyHci(cert, acl_streaming=True) self.dut.callback_event_stream = EventStream( self.dut.hci_le_advertising_manager.FetchCallbackEvents(empty_proto.Empty())) self.dut.address_event_stream = EventStream( self.dut.hci_le_advertising_manager.FetchAddressEvents(empty_proto.Empty())) def teardown_test(self): self.cert_hci.close() if self.dut.callback_event_stream is not None: safeClose(self.dut.callback_event_stream) else: logging.info("DUT: Callback Event Stream is None!") if self.dut.address_event_stream is not None: safeClose(self.dut.address_event_stream) else: logging.info("DUT: address Event Stream is None!") def test_le_ad_scan_dut_advertises(self): def set_address_policy_with_static_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), address=common.BluetoothAddress(address=bytes(b'd0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS), rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', minimum_rotation_time=0, maximum_rotation_time=0) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) def create_advertiser(self): gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) return create_response def test_le_ad_scan_dut_advertises(self): self.set_address_policy_with_static_address() self.cert_hci.register_for_le_events(hci_packets.SubeventCode.ADVERTISING_REPORT, hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT) Loading @@ -67,26 +103,153 @@ class LeAdvertisingManagerTestBase(): hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) # DUT Advertises create_response = self.create_advertiser() assertThat(self.cert_hci.get_le_event_stream()).emits(lambda packet: b'Im_The_DUT' in packet.payload) remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.cert_hci.send_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED)) def test_advertising_set_started_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_SET_STARTED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x00)) def test_enable_advertiser_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() enable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=True) self.dut.hci_le_advertising_manager.EnableAdvertiser(enable_advertiser_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_ENABLED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x01)) def test_disable_advertiser_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() disable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=False) self.dut.hci_le_advertising_manager.EnableAdvertiser(disable_advertiser_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_ENABLED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x00)) def test_set_advertising_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_data_request = le_advertising_facade.SetDataRequest( advertiser_id=create_response.advertiser_id, set_scan_rsp=False, data=[gap_data]) self.dut.hci_le_advertising_manager.SetData(set_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_DATA_SET, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) def test_set_scan_response_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_data_request = le_advertising_facade.SetDataRequest( advertiser_id=create_response.advertiser_id, set_scan_rsp=True, data=[gap_data]) self.dut.hci_le_advertising_manager.SetData(set_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.SCAN_RESPONSE_DATA_SET, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) def test_set_parameters_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() # The Host shall not issue set parameters command when advertising is enabled disable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=False) self.dut.hci_le_advertising_manager.EnableAdvertiser(disable_advertiser_request) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) set_parameters_request = le_advertising_facade.SetParametersRequest( advertiser_id=create_response.advertiser_id, config=config) self.dut.hci_le_advertising_manager.SetParameters(set_parameters_request) assertThat(self.cert_hci.get_le_event_stream()).emits(lambda packet: b'Im_The_DUT' in packet.payload) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_PARAMETERS_UPDATED, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.cert_hci.send_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED)) def test_set_periodic_parameters_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() config = le_advertising_facade.PeriodicAdvertisingParameters( min_interval=512, max_interval=768, advertising_property=le_advertising_facade.AdvertisingProperty.INCLUDE_TX_POWER) set_periodic_parameters_request = le_advertising_facade.SetPeriodicParametersRequest( advertiser_id=create_response.advertiser_id, config=config) self.dut.hci_le_advertising_manager.SetPeriodicParameters(set_periodic_parameters_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_PARAMETERS_UPDATED, create_response.advertiser_id)) def test_set_periodic_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_periodic_data_request = le_advertising_facade.SetPeriodicDataRequest( advertiser_id=create_response.advertiser_id, data=[gap_data]) self.dut.hci_le_advertising_manager.SetPeriodicData(set_periodic_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_DATA_SET, create_response.advertiser_id)) def test_enable_periodic_advertising_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() enable_periodic_advertising_request = le_advertising_facade.EnablePeriodicAdvertisingRequest( advertiser_id=create_response.advertiser_id, enable=True) self.dut.hci_le_advertising_manager.EnablePeriodicAdvertising(enable_periodic_advertising_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_ENABLED, create_response.advertiser_id)) def test_get_own_address(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() get_own_address_request = le_advertising_facade.GetOwnAddressRequest( advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.GetOwnAddress(get_own_address_request) address_with_type = common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(b'd0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) assertThat(self.dut.address_event_stream).emits( AdvertisingMatchers.AddressMsg(CallbackMsgType.OWN_ADDRESS_READ, create_response.advertiser_id, address_with_type)) system/gd/hci/facade/le_advertising_manager_facade.cc +188 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #include "common/bidi_queue.h" #include "common/bind.h" #include "grpc/grpc_event_queue.h" #include "hci/address.h" #include "hci/address_with_type.h" #include "hci/facade/le_advertising_manager_facade.grpc.pb.h" Loading Loading @@ -96,6 +97,23 @@ bool AdvertisingConfigFromProto(const AdvertisingConfig& config_proto, hci::Adve return true; } bool PeriodicAdvertisingParametersFromProto( const PeriodicAdvertisingParameters& config_proto, hci::PeriodicAdvertisingParameters* config) { if (config_proto.min_interval() > UINT16_MAX || config_proto.min_interval() < 0) { LOG_WARN("Bad interval_min: %d", config_proto.min_interval()); return false; } config->min_interval = static_cast<uint16_t>(config_proto.min_interval()); if (config_proto.max_interval() > UINT16_MAX || config_proto.max_interval() < 0) { LOG_WARN("Bad interval_max: %d", config_proto.max_interval()); return false; } config->max_interval = static_cast<uint16_t>(config_proto.max_interval()); config->properties = static_cast<hci::PeriodicAdvertisingParameters::AdvertisingProperty>(config_proto.advertising_property()); return true; } class LeAdvertiser { public: LeAdvertiser(hci::AdvertisingConfig config) : config_(std::move(config)) {} Loading @@ -117,7 +135,7 @@ class LeAdvertiser { hci::AdvertisingConfig config_; }; class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Service { class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Service, AdvertisingCallback { public: LeAdvertisingManagerFacadeService(LeAdvertisingManager* le_advertising_manager, os::Handler* facade_handler) : le_advertising_manager_(le_advertising_manager), facade_handler_(facade_handler) { Loading @@ -135,7 +153,7 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser } LeAdvertiser le_advertiser(config); auto advertiser_id = le_advertising_manager_->ExtendedCreateAdvertiser( -1, 0, config, common::Bind(&LeAdvertiser::ScanCallback, common::Unretained(&le_advertiser)), common::Bind(&LeAdvertiser::TerminatedCallback, common::Unretained(&le_advertiser)), Loading @@ -160,6 +178,79 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "ExtendedCreateAdvertiser is not implemented"); } ::grpc::Status EnableAdvertiser( ::grpc::ServerContext* context, const EnableAdvertiserRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->EnableAdvertiser(request->advertiser_id(), request->enable(), 0, 0); return ::grpc::Status::OK; } ::grpc::Status SetData( ::grpc::ServerContext* context, const SetDataRequest* request, ::google::protobuf::Empty* response) override { std::vector<GapData> advertising_data = {}; for (const auto& elem : request->data()) { advertising_data.push_back(GapDataFromProto(elem)); } le_advertising_manager_->SetData(request->advertiser_id(), request->set_scan_rsp(), advertising_data); return ::grpc::Status::OK; } ::grpc::Status SetParameters( ::grpc::ServerContext* context, const SetParametersRequest* request, ::google::protobuf::Empty* response) override { hci::ExtendedAdvertisingConfig config = {}; if (!AdvertisingConfigFromProto(request->config(), &config)) { LOG_WARN("Error parsing advertising config %s", request->SerializeAsString().c_str()); return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Error while parsing advertising config"); } le_advertising_manager_->SetParameters(request->advertiser_id(), config); return ::grpc::Status::OK; } ::grpc::Status SetPeriodicParameters( ::grpc::ServerContext* context, const SetPeriodicParametersRequest* request, ::google::protobuf::Empty* response) override { hci::PeriodicAdvertisingParameters config = {}; if (!PeriodicAdvertisingParametersFromProto(request->config(), &config)) { LOG_WARN("Error parsing periodic advertising parameters %s", request->SerializeAsString().c_str()); return ::grpc::Status( ::grpc::StatusCode::INVALID_ARGUMENT, "Error while parsing periodic advertising parameters"); } le_advertising_manager_->SetPeriodicParameters(request->advertiser_id(), config); return ::grpc::Status::OK; } ::grpc::Status SetPeriodicData( ::grpc::ServerContext* context, const SetPeriodicDataRequest* request, ::google::protobuf::Empty* response) override { std::vector<GapData> advertising_data = {}; for (const auto& elem : request->data()) { advertising_data.push_back(GapDataFromProto(elem)); } le_advertising_manager_->SetPeriodicData(request->advertiser_id(), advertising_data); return ::grpc::Status::OK; } ::grpc::Status EnablePeriodicAdvertising( ::grpc::ServerContext* context, const EnablePeriodicAdvertisingRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->EnablePeriodicAdvertising(request->advertiser_id(), request->enable()); return ::grpc::Status::OK; } ::grpc::Status GetOwnAddress( ::grpc::ServerContext* context, const GetOwnAddressRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->GetOwnAddress(request->advertiser_id()); return ::grpc::Status::OK; } ::grpc::Status GetNumberOfAdvertisingInstances(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, GetNumberOfAdvertisingInstancesResponse* response) override { Loading @@ -184,9 +275,104 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser return ::grpc::Status::OK; } ::grpc::Status FetchCallbackEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<CallbackMsg>* writer) override { le_advertising_manager_->RegisterAdvertisingCallback(this); return callback_events_.RunLoop(context, writer); } ::grpc::Status FetchAddressEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<AddressMsg>* writer) override { return address_events_.RunLoop(context, writer); } void OnAdvertisingSetStarted(int reg_id, uint8_t advertiser_id, int8_t tx_power, AdvertisingStatus status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_SET_STARTED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); msg.set_data(reg_id); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_ENABLED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); msg.set_data(enable ? 1 : 0); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnScanResponseDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::SCAN_RESPONSE_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingParametersUpdated(uint8_t advertiser_id, int8_t tx_power, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_PARAMETERS_UPDATED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingParametersUpdated(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_PARAMETERS_UPDATED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_ENABLED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnOwnAddressRead(uint8_t advertiser_id, uint8_t address_type, Address address) { LOG_INFO("OnOwnAddressRead Address:%s, address_type:%d", address.ToString().c_str(), address_type); AddressMsg msg; msg.set_message_type(CallbackMsgType::OWN_ADDRESS_READ); msg.set_advertiser_id(advertiser_id); bluetooth::facade::BluetoothAddressWithType facade_address; facade_address.mutable_address()->set_address(address.ToString()); facade_address.set_type(static_cast<facade::BluetoothAddressTypeEnum>(address_type)); *msg.mutable_address() = facade_address; address_events_.OnIncomingEvent(msg); }; std::vector<LeAdvertiser> le_advertisers_; LeAdvertisingManager* le_advertising_manager_; os::Handler* facade_handler_; ::bluetooth::grpc::GrpcEventQueue<CallbackMsg> callback_events_{"callback events"}; ::bluetooth::grpc::GrpcEventQueue<AddressMsg> address_events_{"address events"}; }; void LeAdvertisingManagerFacadeModule::ListDependencies(ModuleList* list) { Loading system/gd/hci/facade/le_advertising_manager_facade.proto +89 −0 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
system/gd/cert/matchers.py +13 −0 Original line number Diff line number Diff line Loading @@ -208,6 +208,19 @@ class HciMatchers(object): return lambda event: data == event.payload class AdvertisingMatchers(object): @staticmethod def CallbackMsg(type, advertiser_id=None, status=None, data=None): return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ and (status == None or status == event.status) and (data == None or data == event.data) else False @staticmethod def AddressMsg(type, advertiser_id=None, address=None): return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ and (address == None or address == event.address) else False class NeighborMatchers(object): @staticmethod Loading
system/gd/hci/cert/le_advertising_manager_test_lib.py +180 −17 Original line number Diff line number Diff line Loading @@ -18,38 +18,74 @@ import os import sys import logging from bluetooth_packets_python3 import hci_packets from cert.event_stream import EventStream from google.protobuf import empty_pb2 as empty_proto from cert.closable import Closable from cert.closable import safeClose from cert.matchers import AdvertisingMatchers from cert.py_hci import PyHci from cert.truth import assertThat from facade import common_pb2 as common from facade import rootservice_pb2 as facade_rootservice from google.protobuf import empty_pb2 as empty_proto from hci.facade import hci_facade_pb2 as hci_facade from hci.facade import \ le_advertising_manager_facade_pb2 as le_advertising_facade from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade from bluetooth_packets_python3 import hci_packets from facade import common_pb2 as common from cert.py_hci import PyHci from cert.truth import assertThat from hci.facade.le_advertising_manager_facade_pb2 import AdvertisingStatus from hci.facade.le_advertising_manager_facade_pb2 import CallbackMsgType class LeAdvertisingManagerTestBase(): def setup_test(self, cert): self.cert_hci = PyHci(cert, acl_streaming=True) self.dut.callback_event_stream = EventStream( self.dut.hci_le_advertising_manager.FetchCallbackEvents(empty_proto.Empty())) self.dut.address_event_stream = EventStream( self.dut.hci_le_advertising_manager.FetchAddressEvents(empty_proto.Empty())) def teardown_test(self): self.cert_hci.close() if self.dut.callback_event_stream is not None: safeClose(self.dut.callback_event_stream) else: logging.info("DUT: Callback Event Stream is None!") if self.dut.address_event_stream is not None: safeClose(self.dut.address_event_stream) else: logging.info("DUT: address Event Stream is None!") def test_le_ad_scan_dut_advertises(self): def set_address_policy_with_static_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), address=common.BluetoothAddress(address=bytes(b'd0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS), rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', minimum_rotation_time=0, maximum_rotation_time=0) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) def create_advertiser(self): gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) return create_response def test_le_ad_scan_dut_advertises(self): self.set_address_policy_with_static_address() self.cert_hci.register_for_le_events(hci_packets.SubeventCode.ADVERTISING_REPORT, hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT) Loading @@ -67,26 +103,153 @@ class LeAdvertisingManagerTestBase(): hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) # DUT Advertises create_response = self.create_advertiser() assertThat(self.cert_hci.get_le_event_stream()).emits(lambda packet: b'Im_The_DUT' in packet.payload) remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.cert_hci.send_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED)) def test_advertising_set_started_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_SET_STARTED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x00)) def test_enable_advertiser_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() enable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=True) self.dut.hci_le_advertising_manager.EnableAdvertiser(enable_advertiser_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_ENABLED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x01)) def test_disable_advertiser_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() disable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=False) self.dut.hci_le_advertising_manager.EnableAdvertiser(disable_advertiser_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_ENABLED, create_response.advertiser_id, AdvertisingStatus.SUCCESS, 0x00)) def test_set_advertising_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_data_request = le_advertising_facade.SetDataRequest( advertiser_id=create_response.advertiser_id, set_scan_rsp=False, data=[gap_data]) self.dut.hci_le_advertising_manager.SetData(set_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_DATA_SET, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) def test_set_scan_response_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_data_request = le_advertising_facade.SetDataRequest( advertiser_id=create_response.advertiser_id, set_scan_rsp=True, data=[gap_data]) self.dut.hci_le_advertising_manager.SetData(set_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.SCAN_RESPONSE_DATA_SET, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) def test_set_parameters_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() # The Host shall not issue set parameters command when advertising is enabled disable_advertiser_request = le_advertising_facade.EnableAdvertiserRequest( advertiser_id=create_response.advertiser_id, enable=False) self.dut.hci_le_advertising_manager.EnableAdvertiser(disable_advertiser_request) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) set_parameters_request = le_advertising_facade.SetParametersRequest( advertiser_id=create_response.advertiser_id, config=config) self.dut.hci_le_advertising_manager.SetParameters(set_parameters_request) assertThat(self.cert_hci.get_le_event_stream()).emits(lambda packet: b'Im_The_DUT' in packet.payload) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.ADVERTISING_PARAMETERS_UPDATED, create_response.advertiser_id, AdvertisingStatus.SUCCESS)) remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.RemoveAdvertiser(remove_request) self.cert_hci.send_command( hci_packets.LeSetScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.Enable.DISABLED)) def test_set_periodic_parameters_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() config = le_advertising_facade.PeriodicAdvertisingParameters( min_interval=512, max_interval=768, advertising_property=le_advertising_facade.AdvertisingProperty.INCLUDE_TX_POWER) set_periodic_parameters_request = le_advertising_facade.SetPeriodicParametersRequest( advertiser_id=create_response.advertiser_id, config=config) self.dut.hci_le_advertising_manager.SetPeriodicParameters(set_periodic_parameters_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_PARAMETERS_UPDATED, create_response.advertiser_id)) def test_set_periodic_data_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT2')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) set_periodic_data_request = le_advertising_facade.SetPeriodicDataRequest( advertiser_id=create_response.advertiser_id, data=[gap_data]) self.dut.hci_le_advertising_manager.SetPeriodicData(set_periodic_data_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_DATA_SET, create_response.advertiser_id)) def test_enable_periodic_advertising_callback(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() enable_periodic_advertising_request = le_advertising_facade.EnablePeriodicAdvertisingRequest( advertiser_id=create_response.advertiser_id, enable=True) self.dut.hci_le_advertising_manager.EnablePeriodicAdvertising(enable_periodic_advertising_request) assertThat(self.dut.callback_event_stream).emits( AdvertisingMatchers.CallbackMsg(CallbackMsgType.PERIODIC_ADVERTISING_ENABLED, create_response.advertiser_id)) def test_get_own_address(self): self.set_address_policy_with_static_address() create_response = self.create_advertiser() get_own_address_request = le_advertising_facade.GetOwnAddressRequest( advertiser_id=create_response.advertiser_id) self.dut.hci_le_advertising_manager.GetOwnAddress(get_own_address_request) address_with_type = common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(b'd0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) assertThat(self.dut.address_event_stream).emits( AdvertisingMatchers.AddressMsg(CallbackMsgType.OWN_ADDRESS_READ, create_response.advertiser_id, address_with_type))
system/gd/hci/facade/le_advertising_manager_facade.cc +188 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #include "common/bidi_queue.h" #include "common/bind.h" #include "grpc/grpc_event_queue.h" #include "hci/address.h" #include "hci/address_with_type.h" #include "hci/facade/le_advertising_manager_facade.grpc.pb.h" Loading Loading @@ -96,6 +97,23 @@ bool AdvertisingConfigFromProto(const AdvertisingConfig& config_proto, hci::Adve return true; } bool PeriodicAdvertisingParametersFromProto( const PeriodicAdvertisingParameters& config_proto, hci::PeriodicAdvertisingParameters* config) { if (config_proto.min_interval() > UINT16_MAX || config_proto.min_interval() < 0) { LOG_WARN("Bad interval_min: %d", config_proto.min_interval()); return false; } config->min_interval = static_cast<uint16_t>(config_proto.min_interval()); if (config_proto.max_interval() > UINT16_MAX || config_proto.max_interval() < 0) { LOG_WARN("Bad interval_max: %d", config_proto.max_interval()); return false; } config->max_interval = static_cast<uint16_t>(config_proto.max_interval()); config->properties = static_cast<hci::PeriodicAdvertisingParameters::AdvertisingProperty>(config_proto.advertising_property()); return true; } class LeAdvertiser { public: LeAdvertiser(hci::AdvertisingConfig config) : config_(std::move(config)) {} Loading @@ -117,7 +135,7 @@ class LeAdvertiser { hci::AdvertisingConfig config_; }; class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Service { class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Service, AdvertisingCallback { public: LeAdvertisingManagerFacadeService(LeAdvertisingManager* le_advertising_manager, os::Handler* facade_handler) : le_advertising_manager_(le_advertising_manager), facade_handler_(facade_handler) { Loading @@ -135,7 +153,7 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser } LeAdvertiser le_advertiser(config); auto advertiser_id = le_advertising_manager_->ExtendedCreateAdvertiser( -1, 0, config, common::Bind(&LeAdvertiser::ScanCallback, common::Unretained(&le_advertiser)), common::Bind(&LeAdvertiser::TerminatedCallback, common::Unretained(&le_advertiser)), Loading @@ -160,6 +178,79 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "ExtendedCreateAdvertiser is not implemented"); } ::grpc::Status EnableAdvertiser( ::grpc::ServerContext* context, const EnableAdvertiserRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->EnableAdvertiser(request->advertiser_id(), request->enable(), 0, 0); return ::grpc::Status::OK; } ::grpc::Status SetData( ::grpc::ServerContext* context, const SetDataRequest* request, ::google::protobuf::Empty* response) override { std::vector<GapData> advertising_data = {}; for (const auto& elem : request->data()) { advertising_data.push_back(GapDataFromProto(elem)); } le_advertising_manager_->SetData(request->advertiser_id(), request->set_scan_rsp(), advertising_data); return ::grpc::Status::OK; } ::grpc::Status SetParameters( ::grpc::ServerContext* context, const SetParametersRequest* request, ::google::protobuf::Empty* response) override { hci::ExtendedAdvertisingConfig config = {}; if (!AdvertisingConfigFromProto(request->config(), &config)) { LOG_WARN("Error parsing advertising config %s", request->SerializeAsString().c_str()); return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Error while parsing advertising config"); } le_advertising_manager_->SetParameters(request->advertiser_id(), config); return ::grpc::Status::OK; } ::grpc::Status SetPeriodicParameters( ::grpc::ServerContext* context, const SetPeriodicParametersRequest* request, ::google::protobuf::Empty* response) override { hci::PeriodicAdvertisingParameters config = {}; if (!PeriodicAdvertisingParametersFromProto(request->config(), &config)) { LOG_WARN("Error parsing periodic advertising parameters %s", request->SerializeAsString().c_str()); return ::grpc::Status( ::grpc::StatusCode::INVALID_ARGUMENT, "Error while parsing periodic advertising parameters"); } le_advertising_manager_->SetPeriodicParameters(request->advertiser_id(), config); return ::grpc::Status::OK; } ::grpc::Status SetPeriodicData( ::grpc::ServerContext* context, const SetPeriodicDataRequest* request, ::google::protobuf::Empty* response) override { std::vector<GapData> advertising_data = {}; for (const auto& elem : request->data()) { advertising_data.push_back(GapDataFromProto(elem)); } le_advertising_manager_->SetPeriodicData(request->advertiser_id(), advertising_data); return ::grpc::Status::OK; } ::grpc::Status EnablePeriodicAdvertising( ::grpc::ServerContext* context, const EnablePeriodicAdvertisingRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->EnablePeriodicAdvertising(request->advertiser_id(), request->enable()); return ::grpc::Status::OK; } ::grpc::Status GetOwnAddress( ::grpc::ServerContext* context, const GetOwnAddressRequest* request, ::google::protobuf::Empty* response) override { le_advertising_manager_->GetOwnAddress(request->advertiser_id()); return ::grpc::Status::OK; } ::grpc::Status GetNumberOfAdvertisingInstances(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, GetNumberOfAdvertisingInstancesResponse* response) override { Loading @@ -184,9 +275,104 @@ class LeAdvertisingManagerFacadeService : public LeAdvertisingManagerFacade::Ser return ::grpc::Status::OK; } ::grpc::Status FetchCallbackEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<CallbackMsg>* writer) override { le_advertising_manager_->RegisterAdvertisingCallback(this); return callback_events_.RunLoop(context, writer); } ::grpc::Status FetchAddressEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<AddressMsg>* writer) override { return address_events_.RunLoop(context, writer); } void OnAdvertisingSetStarted(int reg_id, uint8_t advertiser_id, int8_t tx_power, AdvertisingStatus status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_SET_STARTED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); msg.set_data(reg_id); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_ENABLED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); msg.set_data(enable ? 1 : 0); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnScanResponseDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::SCAN_RESPONSE_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnAdvertisingParametersUpdated(uint8_t advertiser_id, int8_t tx_power, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::ADVERTISING_PARAMETERS_UPDATED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingParametersUpdated(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_PARAMETERS_UPDATED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_DATA_SET); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnPeriodicAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { CallbackMsg msg; msg.set_message_type(CallbackMsgType::PERIODIC_ADVERTISING_ENABLED); msg.set_advertiser_id(advertiser_id); msg.set_status(static_cast<facade::AdvertisingStatus>(status)); callback_events_.OnIncomingEvent(msg); }; void OnOwnAddressRead(uint8_t advertiser_id, uint8_t address_type, Address address) { LOG_INFO("OnOwnAddressRead Address:%s, address_type:%d", address.ToString().c_str(), address_type); AddressMsg msg; msg.set_message_type(CallbackMsgType::OWN_ADDRESS_READ); msg.set_advertiser_id(advertiser_id); bluetooth::facade::BluetoothAddressWithType facade_address; facade_address.mutable_address()->set_address(address.ToString()); facade_address.set_type(static_cast<facade::BluetoothAddressTypeEnum>(address_type)); *msg.mutable_address() = facade_address; address_events_.OnIncomingEvent(msg); }; std::vector<LeAdvertiser> le_advertisers_; LeAdvertisingManager* le_advertising_manager_; os::Handler* facade_handler_; ::bluetooth::grpc::GrpcEventQueue<CallbackMsg> callback_events_{"callback events"}; ::bluetooth::grpc::GrpcEventQueue<AddressMsg> address_events_{"address events"}; }; void LeAdvertisingManagerFacadeModule::ListDependencies(ModuleList* list) { Loading
system/gd/hci/facade/le_advertising_manager_facade.proto +89 −0 File changed.Preview size limit exceeded, changes collapsed. Show changes