Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit abd92c1f authored by Yu Shan's avatar Yu Shan
Browse files

Use AAOS side timestamp for VehiclePropValue.

Update the timestamp set by the host-side VHAL proxy server with
the Android-side timestamp at AAOS VHAL side. This makes sure that
we always expose VehiclePropValue that is perfectly synced with
Android elapsedRealtimeNano. This CL also adds logic to deal with
cases when a property update event or a get value result is outdated.

This CL updates the unit test to cover more cases and remove the flaky
test case that requires starting a local GRPC server.

Flag: EXEMPT HAL change
Test: atest GRPCVehicleHardwareUnitTest
Bug: 349678711
Change-Id: I5e2c07e77869f7286a438cb2a04d1b6c130c3c36
parent 4f76feda
Loading
Loading
Loading
Loading
+3 −0
Original line number Original line Diff line number Diff line
@@ -44,6 +44,9 @@
    {
    {
      "name": "FakeVehicleHardwareTest"
      "name": "FakeVehicleHardwareTest"
    },
    },
    {
      "name": "GRPCVehicleHardwareUnitTest"
    },
    {
    {
      "name": "CarServiceUnitTest",
      "name": "CarServiceUnitTest",
      "options" : [
      "options" : [
+160 −54
Original line number Original line Diff line number Diff line
@@ -20,6 +20,7 @@


#include <android-base/logging.h>
#include <android-base/logging.h>
#include <grpc++/grpc++.h>
#include <grpc++/grpc++.h>
#include <utils/SystemClock.h>


#include <cstdlib>
#include <cstdlib>
#include <mutex>
#include <mutex>
@@ -28,11 +29,16 @@


namespace android::hardware::automotive::vehicle::virtualization {
namespace android::hardware::automotive::vehicle::virtualization {


static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
namespace {
    // TODO(chenhaosjtuacm): get secured credentials here

constexpr size_t MAX_RETRY_COUNT = 5;

std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
    return ::grpc::InsecureChannelCredentials();
    return ::grpc::InsecureChannelCredentials();
}
}


}  // namespace

GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
    : mServiceAddr(std::move(service_addr)),
    : mServiceAddr(std::move(service_addr)),
      mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
      mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
@@ -40,11 +46,13 @@ GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
      mValuePollingThread([this] { ValuePollingLoop(); }) {}
      mValuePollingThread([this] { ValuePollingLoop(); }) {}


// Only used for unit testing.
// Only used for unit testing.
GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
    : mServiceAddr(""),
                                         bool startValuePollingLoop)
      mGrpcChannel(nullptr),
    : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
      mGrpcStub(std::move(stub)),
    if (startValuePollingLoop) {
      mValuePollingThread([] {}) {}
        mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
    }
}


GRPCVehicleHardware::~GRPCVehicleHardware() {
GRPCVehicleHardware::~GRPCVehicleHardware() {
    {
    {
@@ -52,8 +60,10 @@ GRPCVehicleHardware::~GRPCVehicleHardware() {
        mShuttingDownFlag.store(true);
        mShuttingDownFlag.store(true);
    }
    }
    mShutdownCV.notify_all();
    mShutdownCV.notify_all();
    if (mValuePollingThread.joinable()) {
        mValuePollingThread.join();
        mValuePollingThread.join();
    }
    }
}


std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
    std::vector<aidlvhal::VehiclePropConfig> configs;
    std::vector<aidlvhal::VehiclePropConfig> configs;
@@ -109,36 +119,117 @@ aidlvhal::StatusCode GRPCVehicleHardware::setValues(
aidlvhal::StatusCode GRPCVehicleHardware::getValues(
aidlvhal::StatusCode GRPCVehicleHardware::getValues(
        std::shared_ptr<const GetValuesCallback> callback,
        std::shared_ptr<const GetValuesCallback> callback,
        const std::vector<aidlvhal::GetValueRequest>& requests) const {
        const std::vector<aidlvhal::GetValueRequest>& requests) const {
    ::grpc::ClientContext context;
    std::vector<aidlvhal::GetValueResult> results;
    auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
    if (status != aidlvhal::StatusCode::OK) {
        return status;
    }
    if (!results.empty()) {
        (*callback)(std::move(results));
    }
    return status;
}

aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
        const std::vector<aidlvhal::GetValueRequest>& requests,
        std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
    if (retryCount == MAX_RETRY_COUNT) {
        LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
                   << retryCount << " retries";
        return aidlvhal::StatusCode::TRY_AGAIN;
    }

    proto::VehiclePropValueRequests protoRequests;
    proto::VehiclePropValueRequests protoRequests;
    proto::GetValueResults protoResults;
    std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
    for (const auto& request : requests) {
    for (const auto& request : requests) {
        auto& protoRequest = *protoRequests.add_requests();
        auto& protoRequest = *protoRequests.add_requests();
        protoRequest.set_request_id(request.requestId);
        protoRequest.set_request_id(request.requestId);
        proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
        proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
        requestById[request.requestId] = &request;
    }
    }

    // TODO(chenhaosjtuacm): Make it Async.
    // TODO(chenhaosjtuacm): Make it Async.
    ::grpc::ClientContext context;
    proto::GetValueResults protoResults;
    auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
    auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
    if (!grpc_status.ok()) {
    if (!grpc_status.ok()) {
        LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
        LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
        return aidlvhal::StatusCode::INTERNAL_ERROR;
        return aidlvhal::StatusCode::INTERNAL_ERROR;
    }
    }
    std::vector<aidlvhal::GetValueResult> results;

    std::vector<aidlvhal::GetValueRequest> retryRequests;
    for (const auto& protoResult : protoResults.results()) {
    for (const auto& protoResult : protoResults.results()) {
        auto& result = results.emplace_back();
        int64_t requestId = protoResult.request_id();
        result.requestId = protoResult.request_id();
        auto it = requestById.find(requestId);
        if (it == requestById.end()) {
            LOG(ERROR) << __func__
                       << "Invalid getValue request with unknown request ID: " << requestId
                       << ", ignore";
            continue;
        }

        if (!protoResult.has_value()) {
            auto& result = results->emplace_back();
            result.requestId = requestId;
            result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
            result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
        if (protoResult.has_value()) {
            continue;
        }

        aidlvhal::VehiclePropValue value;
        aidlvhal::VehiclePropValue value;
        proto_msg_converter::protoToAidl(protoResult.value(), &value);
        proto_msg_converter::protoToAidl(protoResult.value(), &value);

        // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
        // the timestamp.
        // TODO(b/350822044): Remove this once we use timestamp from proxy server.
        if (!setAndroidTimestamp(&value)) {
            // This is a rare case when we receive a property update event reflecting a new value
            // for the property before we receive the get value result. This means that the result
            // is already outdated, hence we should retry getting the latest value again.
            LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
                         << " areaId: " << value.areaId << " is oudated, retry";
            retryRequests.push_back(*(it->second));
            continue;
        }

        auto& result = results->emplace_back();
        result.requestId = requestId;
        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
        result.prop = std::move(value);
        result.prop = std::move(value);
    }
    }

    if (retryRequests.size() != 0) {
        return getValuesWithRetry(retryRequests, results, retryCount++);
    }
    }
    (*callback)(std::move(results));


    return aidlvhal::StatusCode::OK;
    return aidlvhal::StatusCode::OK;
}
}


bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
    PropIdAreaId propIdAreaId = {
            .propId = propValue->prop,
            .areaId = propValue->areaId,
    };
    int64_t now = elapsedRealtimeNano();
    int64_t externalTimestamp = propValue->timestamp;

    {
        std::lock_guard lck(mLatestUpdateTimestampsMutex);
        auto it = mLatestUpdateTimestamps.find(propIdAreaId);
        if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
            mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
            mLatestUpdateTimestamps[propIdAreaId].second = now;
            propValue->timestamp = now;
            return true;
        }
        if (externalTimestamp == (it->second).first) {
            propValue->timestamp = (it->second).second;
            return true;
        }
    }
    // externalTimestamp < (it->second).first, the value is outdated.
    return false;
}

void GRPCVehicleHardware::registerOnPropertyChangeEvent(
void GRPCVehicleHardware::registerOnPropertyChangeEvent(
        std::unique_ptr<const PropertyChangeCallback> callback) {
        std::unique_ptr<const PropertyChangeCallback> callback) {
    std::lock_guard lck(mCallbackMutex);
    std::lock_guard lck(mCallbackMutex);
@@ -248,26 +339,44 @@ bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {


void GRPCVehicleHardware::ValuePollingLoop() {
void GRPCVehicleHardware::ValuePollingLoop() {
    while (!mShuttingDownFlag.load()) {
    while (!mShuttingDownFlag.load()) {
        pollValue();
        // try to reconnect
    }
}

void GRPCVehicleHardware::pollValue() {
    ::grpc::ClientContext context;
    ::grpc::ClientContext context;


    bool rpc_stopped{false};
    bool rpc_stopped{false};
    std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
    std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
        std::unique_lock<std::mutex> lck(mShutdownMutex);
        std::unique_lock<std::mutex> lck(mShutdownMutex);
            mShutdownCV.wait(lck, [this, &rpc_stopped]() {
        mShutdownCV.wait(
                return rpc_stopped || mShuttingDownFlag.load();
                lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
            });
        context.TryCancel();
        context.TryCancel();
    });
    });


        auto value_stream =
    auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
                mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
    LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
    LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
    proto::VehiclePropValues protoValues;
    proto::VehiclePropValues protoValues;
    while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
    while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
        std::vector<aidlvhal::VehiclePropValue> values;
        std::vector<aidlvhal::VehiclePropValue> values;
        for (const auto protoValue : protoValues.values()) {
        for (const auto protoValue : protoValues.values()) {
                values.push_back(aidlvhal::VehiclePropValue());
            aidlvhal::VehiclePropValue aidlValue = {};
                proto_msg_converter::protoToAidl(protoValue, &values.back());
            proto_msg_converter::protoToAidl(protoValue, &aidlValue);

            // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
            // reset the timestamp.
            // TODO(b/350822044): Remove this once we use timestamp from proxy server.
            if (!setAndroidTimestamp(&aidlValue)) {
                LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
                             << " areaId: " << aidlValue.areaId << " is outdated, ignore";
                continue;
            }

            values.push_back(std::move(aidlValue));
        }
        if (values.empty()) {
            continue;
        }
        }
        std::shared_lock lck(mCallbackMutex);
        std::shared_lock lck(mCallbackMutex);
        if (mOnPropChange) {
        if (mOnPropChange) {
@@ -285,9 +394,6 @@ void GRPCVehicleHardware::ValuePollingLoop() {
    auto grpc_status = value_stream->Finish();
    auto grpc_status = value_stream->Finish();
    // never reach here until connection lost
    // never reach here until connection lost
    LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
    LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();

        // try to reconnect
    }
}
}


}  // namespace android::hardware::automotive::vehicle::virtualization
}  // namespace android::hardware::automotive::vehicle::virtualization
+28 −4
Original line number Original line Diff line number Diff line
@@ -20,6 +20,7 @@
#include <VehicleHalTypes.h>
#include <VehicleHalTypes.h>
#include <VehicleUtils.h>
#include <VehicleUtils.h>
#include <android-base/result.h>
#include <android-base/result.h>
#include <android-base/thread_annotations.h>


#include "VehicleServer.grpc.pb.h"
#include "VehicleServer.grpc.pb.h"
#include "VehicleServer.pb.h"
#include "VehicleServer.pb.h"
@@ -33,6 +34,7 @@
#include <shared_mutex>
#include <shared_mutex>
#include <string>
#include <string>
#include <thread>
#include <thread>
#include <unordered_map>
#include <vector>
#include <vector>


namespace android::hardware::automotive::vehicle::virtualization {
namespace android::hardware::automotive::vehicle::virtualization {
@@ -43,9 +45,6 @@ class GRPCVehicleHardware : public IVehicleHardware {
  public:
  public:
    explicit GRPCVehicleHardware(std::string service_addr);
    explicit GRPCVehicleHardware(std::string service_addr);


    // Only used for unit testing.
    explicit GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub);

    ~GRPCVehicleHardware();
    ~GRPCVehicleHardware();


    // Get all the property configs.
    // Get all the property configs.
@@ -94,7 +93,7 @@ class GRPCVehicleHardware : public IVehicleHardware {
    std::unique_ptr<const PropertyChangeCallback> mOnPropChange;
    std::unique_ptr<const PropertyChangeCallback> mOnPropChange;


  private:
  private:
    void ValuePollingLoop();
    friend class GRPCVehicleHardwareUnitTest;


    std::string mServiceAddr;
    std::string mServiceAddr;
    std::shared_ptr<::grpc::Channel> mGrpcChannel;
    std::shared_ptr<::grpc::Channel> mGrpcChannel;
@@ -106,6 +105,31 @@ class GRPCVehicleHardware : public IVehicleHardware {
    std::mutex mShutdownMutex;
    std::mutex mShutdownMutex;
    std::condition_variable mShutdownCV;
    std::condition_variable mShutdownCV;
    std::atomic<bool> mShuttingDownFlag{false};
    std::atomic<bool> mShuttingDownFlag{false};

    mutable std::mutex mLatestUpdateTimestampsMutex;

    // A map from [propId, areaId] to the latest timestamp this property is updated.
    // The key is a tuple, the first element is the external timestamp (timestamp set by VHAL
    // server), the second element is the Android timestamp (elapsedRealtimeNano).
    mutable std::unordered_map<PropIdAreaId, std::pair<int64_t, int64_t>,
                               PropIdAreaIdHash> mLatestUpdateTimestamps
            GUARDED_BY(mLatestUpdateTimestampsMutex);

    // Only used for unit testing.
    GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
                        bool startValuePollingLoop);

    void ValuePollingLoop();
    void pollValue();

    aidlvhal::StatusCode getValuesWithRetry(const std::vector<aidlvhal::GetValueRequest>& requests,
                                            std::vector<aidlvhal::GetValueResult>* results,
                                            size_t retryCount) const;

    // Check the external timestamp of propValue against the latest updated external timestamp, if
    // this is an outdated value, return false. Otherwise, update the external timestamp to the
    // Android timestamp and return true.
    bool setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const;
};
};


}  // namespace android::hardware::automotive::vehicle::virtualization
}  // namespace android::hardware::automotive::vehicle::virtualization
+288 −76

File changed.

Preview size limit exceeded, changes collapsed.