Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2519d527 authored by Chienyuan's avatar Chienyuan Committed by Hansong Zhang
Browse files

Cert: add common helper for event stream on the py side

* add common helper for event stream on the py side, implement
  assert_event_occurs in the helper
* add event stream helpers for hci event, sco and acl in GdDevice
* add take_for() function in BlockingQueue for take data with timeout

Test: run gd/cert/run_cert.sh
Change-Id: Ia8168159ee47441ec332046627124e0ed3811d6b
parent c8032598
Loading
Loading
Loading
Loading
+66 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from acts import asserts

from facade import common_pb2
from datetime import datetime
from datetime import timedelta
from grpc import RpcError
from grpc import StatusCode

class EventStream(object):
  def __init__(self, stream_stub_fn):
    self.stream_stub_fn = stream_stub_fn

  def subscribe(self):
    return self.stream_stub_fn(
        common_pb2.EventStreamRequest(
          subscription_mode=common_pb2.SUBSCRIBE,
          fetch_mode=common_pb2.NONE
        )
    )

  def unsubscribe(self):
    return self.stream_stub_fn(
        common_pb2.EventStreamRequest(
            subscription_mode=common_pb2.UNSUBSCRIBE,
            fetch_mode=common_pb2.NONE
        )
    )

  def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)):
    expiration_time = datetime.now() + timeout
    while (True):
      if datetime.now() > expiration_time:
        asserts.fail("timeout of %s exceeded" % str(timeout))

      response = self.stream_stub_fn(
          common_pb2.EventStreamRequest(
              subscription_mode=common_pb2.NONE,
              fetch_mode=common_pb2.AT_LEAST_ONE,
              timeout_ms = int((expiration_time - datetime.now()).total_seconds() * 1000)
          )
      )

      try:
        for event in response:
          if (match_fn(event)):
            return
      except RpcError:
        if response.code() == StatusCode.DEADLINE_EXCEEDED:
          continue
        raise
+5 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ from acts.libs.proc import job
import grpc

from hal import facade_pb2_grpc as hal_facade_pb2_grpc
from cert.event_stream import EventStream

ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP')
ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT')
@@ -114,6 +115,10 @@ class GdDevice:

        self.grpc_channel = grpc.insecure_channel("localhost:" + grpc_port)
        self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel)
        self.hal.hci_event_stream = EventStream(self.hal.FetchHciEvent)
        self.hal.hci_acl_stream = EventStream(self.hal.FetchHciAcl)
        self.hal.hci_sco_stream = EventStream(self.hal.FetchHciSco)


    def clean_up(self):
        self.grpc_channel.close()
+14 −0
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

#pragma once

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <queue>
@@ -44,6 +45,19 @@ class BlockingQueue {
    return data;
  };

  bool take_for(std::chrono::milliseconds time, T& data) {
    std::unique_lock<std::mutex> lock(mutex_);
    while (queue_.empty()) {
      if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) {
        return false;
      }
    }
    data = queue_.front();
    queue_.pop();

    return true;
  }

  bool empty() const {
    std::unique_lock<std::mutex> lock(mutex_);
    return queue_.empty();
+1 −0
Original line number Diff line number Diff line
@@ -17,4 +17,5 @@ enum EventFetchMode {
message EventStreamRequest {
  EventSubscriptionMode subscription_mode = 1;
  EventFetchMode fetch_mode = 2;
  uint32 timeout_ms = 3;
}
+11 −1
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@

#include <grpc++/grpc++.h>

#include <chrono>

#include "common/blocking_queue.h"
#include "facade/common.pb.h"
#include "os/log.h"
@@ -49,15 +51,23 @@ class GrpcEventStream {
                               ::grpc::ServerWriter<RES>* writer) {
    ::bluetooth::facade::EventSubscriptionMode subscription_mode = request->subscription_mode();
    ::bluetooth::facade::EventFetchMode fetch_mode = request->fetch_mode();
    uint32_t timeout_ms = request->timeout_ms();
    if (timeout_ms == 0) {
      timeout_ms = 3000;
    }

    if (subscription_mode == ::bluetooth::facade::SUBSCRIBE) {
      event_queue_.clear();
      callback_->OnSubscribe();
      subscribed_ = true;
    }

    if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) {
      RES response;
      EVENT event = event_queue_.take();
      EVENT event;
      if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) {
        return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded");
      }
      callback_->OnWriteResponse(&response, event);
      writer->Write(response);
    }
Loading