Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a75f34c2 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Use AAOS side timestamp for VehiclePropValue." into main

parents 891c9a20 abd92c1f
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -44,6 +44,9 @@
    {
      "name": "FakeVehicleHardwareTest"
    },
    {
      "name": "GRPCVehicleHardwareUnitTest"
    },
    {
      "name": "CarServiceUnitTest",
      "options" : [
+160 −54
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@

#include <android-base/logging.h>
#include <grpc++/grpc++.h>
#include <utils/SystemClock.h>

#include <cstdlib>
#include <mutex>
@@ -28,11 +29,16 @@

namespace android::hardware::automotive::vehicle::virtualization {

static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
    // TODO(chenhaosjtuacm): get secured credentials here
namespace {

constexpr size_t MAX_RETRY_COUNT = 5;

std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
    return ::grpc::InsecureChannelCredentials();
}

}  // namespace

GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
    : mServiceAddr(std::move(service_addr)),
      mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
@@ -40,11 +46,13 @@ GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
      mValuePollingThread([this] { ValuePollingLoop(); }) {}

// Only used for unit testing.
GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
    : mServiceAddr(""),
      mGrpcChannel(nullptr),
      mGrpcStub(std::move(stub)),
      mValuePollingThread([] {}) {}
GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
                                         bool startValuePollingLoop)
    : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
    if (startValuePollingLoop) {
        mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
    }
}

GRPCVehicleHardware::~GRPCVehicleHardware() {
    {
@@ -52,8 +60,10 @@ GRPCVehicleHardware::~GRPCVehicleHardware() {
        mShuttingDownFlag.store(true);
    }
    mShutdownCV.notify_all();
    if (mValuePollingThread.joinable()) {
        mValuePollingThread.join();
    }
}

std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
    std::vector<aidlvhal::VehiclePropConfig> configs;
@@ -109,36 +119,117 @@ aidlvhal::StatusCode GRPCVehicleHardware::setValues(
aidlvhal::StatusCode GRPCVehicleHardware::getValues(
        std::shared_ptr<const GetValuesCallback> callback,
        const std::vector<aidlvhal::GetValueRequest>& requests) const {
    ::grpc::ClientContext context;
    std::vector<aidlvhal::GetValueResult> results;
    auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
    if (status != aidlvhal::StatusCode::OK) {
        return status;
    }
    if (!results.empty()) {
        (*callback)(std::move(results));
    }
    return status;
}

aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
        const std::vector<aidlvhal::GetValueRequest>& requests,
        std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
    if (retryCount == MAX_RETRY_COUNT) {
        LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
                   << retryCount << " retries";
        return aidlvhal::StatusCode::TRY_AGAIN;
    }

    proto::VehiclePropValueRequests protoRequests;
    proto::GetValueResults protoResults;
    std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
    for (const auto& request : requests) {
        auto& protoRequest = *protoRequests.add_requests();
        protoRequest.set_request_id(request.requestId);
        proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
        requestById[request.requestId] = &request;
    }

    // TODO(chenhaosjtuacm): Make it Async.
    ::grpc::ClientContext context;
    proto::GetValueResults protoResults;
    auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
    if (!grpc_status.ok()) {
        LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
        return aidlvhal::StatusCode::INTERNAL_ERROR;
    }
    std::vector<aidlvhal::GetValueResult> results;

    std::vector<aidlvhal::GetValueRequest> retryRequests;
    for (const auto& protoResult : protoResults.results()) {
        auto& result = results.emplace_back();
        result.requestId = protoResult.request_id();
        int64_t requestId = protoResult.request_id();
        auto it = requestById.find(requestId);
        if (it == requestById.end()) {
            LOG(ERROR) << __func__
                       << "Invalid getValue request with unknown request ID: " << requestId
                       << ", ignore";
            continue;
        }

        if (!protoResult.has_value()) {
            auto& result = results->emplace_back();
            result.requestId = requestId;
            result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
        if (protoResult.has_value()) {
            continue;
        }

        aidlvhal::VehiclePropValue value;
        proto_msg_converter::protoToAidl(protoResult.value(), &value);

        // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
        // the timestamp.
        // TODO(b/350822044): Remove this once we use timestamp from proxy server.
        if (!setAndroidTimestamp(&value)) {
            // This is a rare case when we receive a property update event reflecting a new value
            // for the property before we receive the get value result. This means that the result
            // is already outdated, hence we should retry getting the latest value again.
            LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
                         << " areaId: " << value.areaId << " is oudated, retry";
            retryRequests.push_back(*(it->second));
            continue;
        }

        auto& result = results->emplace_back();
        result.requestId = requestId;
        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
        result.prop = std::move(value);
    }

    if (retryRequests.size() != 0) {
        return getValuesWithRetry(retryRequests, results, retryCount++);
    }
    (*callback)(std::move(results));

    return aidlvhal::StatusCode::OK;
}

bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
    PropIdAreaId propIdAreaId = {
            .propId = propValue->prop,
            .areaId = propValue->areaId,
    };
    int64_t now = elapsedRealtimeNano();
    int64_t externalTimestamp = propValue->timestamp;

    {
        std::lock_guard lck(mLatestUpdateTimestampsMutex);
        auto it = mLatestUpdateTimestamps.find(propIdAreaId);
        if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
            mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
            mLatestUpdateTimestamps[propIdAreaId].second = now;
            propValue->timestamp = now;
            return true;
        }
        if (externalTimestamp == (it->second).first) {
            propValue->timestamp = (it->second).second;
            return true;
        }
    }
    // externalTimestamp < (it->second).first, the value is outdated.
    return false;
}

void GRPCVehicleHardware::registerOnPropertyChangeEvent(
        std::unique_ptr<const PropertyChangeCallback> callback) {
    std::lock_guard lck(mCallbackMutex);
@@ -248,26 +339,44 @@ bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {

void GRPCVehicleHardware::ValuePollingLoop() {
    while (!mShuttingDownFlag.load()) {
        pollValue();
        // try to reconnect
    }
}

void GRPCVehicleHardware::pollValue() {
    ::grpc::ClientContext context;

    bool rpc_stopped{false};
    std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
        std::unique_lock<std::mutex> lck(mShutdownMutex);
            mShutdownCV.wait(lck, [this, &rpc_stopped]() {
                return rpc_stopped || mShuttingDownFlag.load();
            });
        mShutdownCV.wait(
                lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
        context.TryCancel();
    });

        auto value_stream =
                mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
    auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
    LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
    proto::VehiclePropValues protoValues;
    while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
        std::vector<aidlvhal::VehiclePropValue> values;
        for (const auto protoValue : protoValues.values()) {
                values.push_back(aidlvhal::VehiclePropValue());
                proto_msg_converter::protoToAidl(protoValue, &values.back());
            aidlvhal::VehiclePropValue aidlValue = {};
            proto_msg_converter::protoToAidl(protoValue, &aidlValue);

            // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
            // reset the timestamp.
            // TODO(b/350822044): Remove this once we use timestamp from proxy server.
            if (!setAndroidTimestamp(&aidlValue)) {
                LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
                             << " areaId: " << aidlValue.areaId << " is outdated, ignore";
                continue;
            }

            values.push_back(std::move(aidlValue));
        }
        if (values.empty()) {
            continue;
        }
        std::shared_lock lck(mCallbackMutex);
        if (mOnPropChange) {
@@ -285,9 +394,6 @@ void GRPCVehicleHardware::ValuePollingLoop() {
    auto grpc_status = value_stream->Finish();
    // never reach here until connection lost
    LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();

        // try to reconnect
    }
}

}  // namespace android::hardware::automotive::vehicle::virtualization
+28 −4
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include <VehicleHalTypes.h>
#include <VehicleUtils.h>
#include <android-base/result.h>
#include <android-base/thread_annotations.h>

#include "VehicleServer.grpc.pb.h"
#include "VehicleServer.pb.h"
@@ -33,6 +34,7 @@
#include <shared_mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace android::hardware::automotive::vehicle::virtualization {
@@ -43,9 +45,6 @@ class GRPCVehicleHardware : public IVehicleHardware {
  public:
    explicit GRPCVehicleHardware(std::string service_addr);

    // Only used for unit testing.
    explicit GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub);

    ~GRPCVehicleHardware();

    // Get all the property configs.
@@ -94,7 +93,7 @@ class GRPCVehicleHardware : public IVehicleHardware {
    std::unique_ptr<const PropertyChangeCallback> mOnPropChange;

  private:
    void ValuePollingLoop();
    friend class GRPCVehicleHardwareUnitTest;

    std::string mServiceAddr;
    std::shared_ptr<::grpc::Channel> mGrpcChannel;
@@ -106,6 +105,31 @@ class GRPCVehicleHardware : public IVehicleHardware {
    std::mutex mShutdownMutex;
    std::condition_variable mShutdownCV;
    std::atomic<bool> mShuttingDownFlag{false};

    mutable std::mutex mLatestUpdateTimestampsMutex;

    // A map from [propId, areaId] to the latest timestamp this property is updated.
    // The key is a tuple, the first element is the external timestamp (timestamp set by VHAL
    // server), the second element is the Android timestamp (elapsedRealtimeNano).
    mutable std::unordered_map<PropIdAreaId, std::pair<int64_t, int64_t>,
                               PropIdAreaIdHash> mLatestUpdateTimestamps
            GUARDED_BY(mLatestUpdateTimestampsMutex);

    // Only used for unit testing.
    GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
                        bool startValuePollingLoop);

    void ValuePollingLoop();
    void pollValue();

    aidlvhal::StatusCode getValuesWithRetry(const std::vector<aidlvhal::GetValueRequest>& requests,
                                            std::vector<aidlvhal::GetValueResult>* results,
                                            size_t retryCount) const;

    // Check the external timestamp of propValue against the latest updated external timestamp, if
    // this is an outdated value, return false. Otherwise, update the external timestamp to the
    // Android timestamp and return true.
    bool setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const;
};

}  // namespace android::hardware::automotive::vehicle::virtualization
+288 −76

File changed.

Preview size limit exceeded, changes collapsed.