Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0c82c5d9 authored by Myles Watson's avatar Myles Watson
Browse files

Neighbor: Test remote name

Bug: 145638034
Test: ./cert/run_cert_facade_only.sh
Change-Id: I3f56057573d2646ce44ed2a57019fef97a1e392b
parent 6b7be1df
Loading
Loading
Loading
Loading
+66 −9
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ from cert.event_asserts import EventAsserts
from google.protobuf import empty_pb2 as empty_proto
from facade import rootservice_pb2 as facade_rootservice
from hci.facade import facade_pb2 as hci_facade
from hci.facade import controller_facade_pb2 as controller_facade
from neighbor.facade import facade_pb2 as neighbor_facade
from bluetooth_packets_python3 import hci_packets
import bluetooth_packets_python3 as bt_packets
@@ -53,6 +54,10 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
        self.cert_device.rootservice.StopStack(
            facade_rootservice.StopStackRequest())

    def register_for_dut_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.device_under_test.hci.RegisterEventHandler(msg)

    def register_for_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.cert_device.hci.RegisterEventHandler(msg)
@@ -70,9 +75,6 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
            self.cert_device.hci.EnqueueCommandWithStatus(cmd)

    def test_inquiry_from_dut(self):
        self.enqueue_hci_command(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
        inquiry_msg = neighbor_facade.InquiryMsg(
            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
            result_mode=neighbor_facade.ResultMode.STANDARD,
@@ -82,15 +84,15 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
                self.device_under_test.neighbor.SetInquiryMode(
                    inquiry_msg)) as inquiry_event_stream:
            hci_event_asserts = EventAsserts(inquiry_event_stream)
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            hci_event_asserts.assert_event_occurs(
                lambda msg: b'\x02\x0f' in msg.packet
                # Expecting an HCI Event (code 0x02, length 0x0f)
            )

    def test_inquiry_rssi_from_dut(self):
        self.enqueue_hci_command(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
        inquiry_msg = neighbor_facade.InquiryMsg(
            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
            result_mode=neighbor_facade.ResultMode.RSSI,
@@ -100,6 +102,9 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
                self.device_under_test.neighbor.SetInquiryMode(
                    inquiry_msg)) as inquiry_event_stream:
            hci_event_asserts = EventAsserts(inquiry_event_stream)
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            hci_event_asserts.assert_event_occurs(
                lambda msg: b'\x22\x0f' in msg.packet
                # Expecting an HCI Event (code 0x22, length 0x0f)
@@ -115,9 +120,6 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
        self.enqueue_hci_command(
            hci_packets.WriteExtendedInquiryResponseBuilder(
                hci_packets.FecRequired.NOT_REQUIRED, gap_data), True)
        self.enqueue_hci_command(
            hci_packets.WriteScanEnableBuilder(
                hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
        inquiry_msg = neighbor_facade.InquiryMsg(
            inquiry_mode=neighbor_facade.DiscoverabilityMode.GENERAL,
            result_mode=neighbor_facade.ResultMode.EXTENDED,
@@ -127,5 +129,60 @@ class NeighborTest(GdFacadeOnlyBaseTestClass):
                self.device_under_test.neighbor.SetInquiryMode(
                    inquiry_msg)) as inquiry_event_stream:
            hci_event_asserts = EventAsserts(inquiry_event_stream)
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            hci_event_asserts.assert_event_occurs(
                lambda msg: name_string in msg.packet)

    def test_remote_name(self):
        self.register_for_dut_event(
            hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)

        with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
            EventCallbackStream(self.device_under_test.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as name_event_stream:
            name_event_asserts = EventAsserts(name_event_stream)
            hci_event_asserts = EventAsserts(hci_event_stream)

            cert_name = b'Im_A_Cert'
            padded_name = cert_name
            while len(padded_name) < 248:
                padded_name = padded_name + b'\0'
            self.enqueue_hci_command(
                hci_packets.WriteLocalNameBuilder(padded_name), True)

            hci_event_asserts.assert_event_occurs(
                lambda msg: b'\x0e\x04\x01\x13\x0c' in msg.event)

            address = hci_packets.Address()

            def get_address_from_complete(packet):
                packet_bytes = packet.event
                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
                    nonlocal address
                    addr_view = hci_packets.ReadBdAddrCompleteView(
                        hci_packets.CommandCompleteView(
                            hci_packets.EventPacketView(
                                bt_packets.PacketViewLittleEndian(
                                    list(packet_bytes)))))
                    address = addr_view.GetBdAddr()
                    return True
                return False

            # DUT Enables scans and gets its address
            self.enqueue_hci_command(
                hci_packets.WriteScanEnableBuilder(
                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)

            hci_event_asserts.assert_event_occurs(get_address_from_complete)

            cert_address = address.encode('utf8')

            self.device_under_test.neighbor.ReadRemoteName(
                neighbor_facade.RemoteNameRequestMsg(
                    address=cert_address,
                    page_scan_repetition_mode=1,
                    clock_offset=0x6855))
            name_event_asserts.assert_event_occurs(
                lambda msg: cert_name in msg.name)
+14 −8
Original line number Diff line number Diff line
@@ -103,7 +103,7 @@ class NeighborFacadeService : public NeighborFacade::Service {

  ::grpc::Status ReadRemoteName(::grpc::ServerContext* context,
                                const ::bluetooth::neighbor::RemoteNameRequestMsg* request,
                                ::bluetooth::neighbor::RemoteNameResponseMsg* response) override {
                                ::google::protobuf::Empty* response) override {
    hci::Address remote;
    ASSERT(hci::Address::FromString(request->address(), remote));
    hci::PageScanRepetitionMode mode;
@@ -123,11 +123,15 @@ class NeighborFacadeService : public NeighborFacade::Service {
    name_module_->ReadRemoteNameRequest(
        remote, mode, request->clock_offset(),
        (request->clock_offset() != 0 ? hci::ClockOffsetValid::VALID : hci::ClockOffsetValid::INVALID),
        common::Bind(&NeighborFacadeService::on_remote_name, common::Unretained(this), common::Unretained(response)),
        facade_handler_);
        common::Bind(&NeighborFacadeService::on_remote_name, common::Unretained(this)), facade_handler_);
    return ::grpc::Status::OK;
  }

  ::grpc::Status GetRemoteNameEvents(::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
                                     ::grpc::ServerWriter<RemoteNameResponseMsg>* writer) override {
    return pending_remote_names_.RunLoop(context, writer);
  }

  ::grpc::Status EnableInquiryScan(::grpc::ServerContext* context, const ::bluetooth::neighbor::EnableMsg* request,
                                   ::google::protobuf::Empty* response) override {
    if (request->enabled()) {
@@ -167,11 +171,12 @@ class NeighborFacadeService : public NeighborFacade::Service {
      .extended_result = [this](hci::ExtendedInquiryResultView view) { on_incoming_inquiry_result(view); },
      .complete = [this](hci::ErrorCode status) { on_incoming_inquiry_complete(status); }};

  void on_remote_name(::bluetooth::neighbor::RemoteNameResponseMsg* response, hci::ErrorCode status,
                      hci::Address address, std::array<uint8_t, 248> name) {
    response->set_status(static_cast<int>(status));
    response->set_address(address.ToString());
    response->set_name(name.begin(), name.size());
  void on_remote_name(hci::ErrorCode status, hci::Address address, std::array<uint8_t, 248> name) {
    RemoteNameResponseMsg response;
    response.set_status(static_cast<int>(status));
    response.set_address(address.ToString());
    response.set_name(name.begin(), name.size());
    pending_remote_names_.OnIncomingEvent(response);
  }

  ConnectabilityModule* connectability_module_;
@@ -181,6 +186,7 @@ class NeighborFacadeService : public NeighborFacade::Service {
  ScanModule* scan_module_;
  ::bluetooth::os::Handler* facade_handler_;
  ::bluetooth::grpc::GrpcEventQueue<InquiryResultMsg> pending_events_{"InquiryResponses"};
  ::bluetooth::grpc::GrpcEventQueue<RemoteNameResponseMsg> pending_remote_names_{"RemoteNameResponses"};
};

void NeighborFacadeModule::ListDependencies(ModuleList* list) {
+2 −1
Original line number Diff line number Diff line
@@ -8,7 +8,8 @@ service NeighborFacade {
  rpc SetConnectability(EnableMsg) returns (google.protobuf.Empty) {}
  rpc SetDiscoverability(DiscoverabilitiyMsg) returns (google.protobuf.Empty) {}
  rpc SetInquiryMode(InquiryMsg) returns (stream InquiryResultMsg) {}
  rpc ReadRemoteName(RemoteNameRequestMsg) returns (RemoteNameResponseMsg) {}
  rpc ReadRemoteName(RemoteNameRequestMsg) returns (google.protobuf.Empty) {}
  rpc GetRemoteNameEvents(google.protobuf.Empty) returns (stream RemoteNameResponseMsg) {}
  rpc EnableInquiryScan(EnableMsg) returns (google.protobuf.Empty) {}
  rpc EnablePageScan(EnableMsg) returns (google.protobuf.Empty) {}
}