Loading system/gd/cert/event_stream.py 0 → 100644 +66 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from acts import asserts from facade import common_pb2 from datetime import datetime from datetime import timedelta from grpc import RpcError from grpc import StatusCode class EventStream(object): def __init__(self, stream_stub_fn): self.stream_stub_fn = stream_stub_fn def subscribe(self): return self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.SUBSCRIBE, fetch_mode=common_pb2.NONE ) ) def unsubscribe(self): return self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.UNSUBSCRIBE, fetch_mode=common_pb2.NONE ) ) def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)): expiration_time = datetime.now() + timeout while (True): if datetime.now() > expiration_time: asserts.fail("timeout of %s exceeded" % str(timeout)) response = self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.NONE, fetch_mode=common_pb2.AT_LEAST_ONE, timeout_ms = int((expiration_time - datetime.now()).total_seconds() * 1000) ) ) try: for event in response: if (match_fn(event)): return except RpcError: if response.code() == StatusCode.DEADLINE_EXCEEDED: continue raise system/gd/cert/gd_device.py +5 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ from acts.libs.proc import job import grpc from hal import facade_pb2_grpc as hal_facade_pb2_grpc from cert.event_stream import EventStream ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP') ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT') Loading Loading @@ -114,6 +115,10 @@ class GdDevice: self.grpc_channel = grpc.insecure_channel("localhost:" + grpc_port) self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel) self.hal.hci_event_stream = EventStream(self.hal.FetchHciEvent) self.hal.hci_acl_stream = EventStream(self.hal.FetchHciAcl) self.hal.hci_sco_stream = EventStream(self.hal.FetchHciSco) def clean_up(self): self.grpc_channel.close() Loading system/gd/common/blocking_queue.h +14 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ #pragma once #include <chrono> #include <condition_variable> #include <mutex> #include <queue> Loading Loading @@ -44,6 +45,19 @@ class BlockingQueue { return data; }; bool take_for(std::chrono::milliseconds time, T& data) { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) { return false; } } data = queue_.front(); queue_.pop(); return true; } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); Loading system/gd/facade/common.proto +1 −0 Original line number Diff line number Diff line Loading @@ -17,4 +17,5 @@ enum EventFetchMode { message EventStreamRequest { EventSubscriptionMode subscription_mode = 1; EventFetchMode fetch_mode = 2; uint32 timeout_ms = 3; } system/gd/grpc/grpc_event_stream.h +11 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <grpc++/grpc++.h> #include <chrono> #include "common/blocking_queue.h" #include "facade/common.pb.h" #include "os/log.h" Loading Loading @@ -49,15 +51,23 @@ class GrpcEventStream { ::grpc::ServerWriter<RES>* writer) { ::bluetooth::facade::EventSubscriptionMode subscription_mode = request->subscription_mode(); ::bluetooth::facade::EventFetchMode fetch_mode = request->fetch_mode(); uint32_t timeout_ms = request->timeout_ms(); if (timeout_ms == 0) { timeout_ms = 3000; } if (subscription_mode == ::bluetooth::facade::SUBSCRIBE) { event_queue_.clear(); callback_->OnSubscribe(); subscribed_ = true; } if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) { RES response; EVENT event = event_queue_.take(); EVENT event; if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) { return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded"); } callback_->OnWriteResponse(&response, event); writer->Write(response); } Loading Loading
system/gd/cert/event_stream.py 0 → 100644 +66 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from acts import asserts from facade import common_pb2 from datetime import datetime from datetime import timedelta from grpc import RpcError from grpc import StatusCode class EventStream(object): def __init__(self, stream_stub_fn): self.stream_stub_fn = stream_stub_fn def subscribe(self): return self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.SUBSCRIBE, fetch_mode=common_pb2.NONE ) ) def unsubscribe(self): return self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.UNSUBSCRIBE, fetch_mode=common_pb2.NONE ) ) def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)): expiration_time = datetime.now() + timeout while (True): if datetime.now() > expiration_time: asserts.fail("timeout of %s exceeded" % str(timeout)) response = self.stream_stub_fn( common_pb2.EventStreamRequest( subscription_mode=common_pb2.NONE, fetch_mode=common_pb2.AT_LEAST_ONE, timeout_ms = int((expiration_time - datetime.now()).total_seconds() * 1000) ) ) try: for event in response: if (match_fn(event)): return except RpcError: if response.code() == StatusCode.DEADLINE_EXCEEDED: continue raise
system/gd/cert/gd_device.py +5 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ from acts.libs.proc import job import grpc from hal import facade_pb2_grpc as hal_facade_pb2_grpc from cert.event_stream import EventStream ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP') ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT') Loading Loading @@ -114,6 +115,10 @@ class GdDevice: self.grpc_channel = grpc.insecure_channel("localhost:" + grpc_port) self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel) self.hal.hci_event_stream = EventStream(self.hal.FetchHciEvent) self.hal.hci_acl_stream = EventStream(self.hal.FetchHciAcl) self.hal.hci_sco_stream = EventStream(self.hal.FetchHciSco) def clean_up(self): self.grpc_channel.close() Loading
system/gd/common/blocking_queue.h +14 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ #pragma once #include <chrono> #include <condition_variable> #include <mutex> #include <queue> Loading Loading @@ -44,6 +45,19 @@ class BlockingQueue { return data; }; bool take_for(std::chrono::milliseconds time, T& data) { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) { return false; } } data = queue_.front(); queue_.pop(); return true; } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); Loading
system/gd/facade/common.proto +1 −0 Original line number Diff line number Diff line Loading @@ -17,4 +17,5 @@ enum EventFetchMode { message EventStreamRequest { EventSubscriptionMode subscription_mode = 1; EventFetchMode fetch_mode = 2; uint32 timeout_ms = 3; }
system/gd/grpc/grpc_event_stream.h +11 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <grpc++/grpc++.h> #include <chrono> #include "common/blocking_queue.h" #include "facade/common.pb.h" #include "os/log.h" Loading Loading @@ -49,15 +51,23 @@ class GrpcEventStream { ::grpc::ServerWriter<RES>* writer) { ::bluetooth::facade::EventSubscriptionMode subscription_mode = request->subscription_mode(); ::bluetooth::facade::EventFetchMode fetch_mode = request->fetch_mode(); uint32_t timeout_ms = request->timeout_ms(); if (timeout_ms == 0) { timeout_ms = 3000; } if (subscription_mode == ::bluetooth::facade::SUBSCRIBE) { event_queue_.clear(); callback_->OnSubscribe(); subscribed_ = true; } if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) { RES response; EVENT event = event_queue_.take(); EVENT event; if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) { return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded"); } callback_->OnWriteResponse(&response, event); writer->Write(response); } Loading