Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a59644ac authored by Steve Paik's avatar Steve Paik
Browse files

Add pipe interface to DefaultVehicleHal

- Create base communications interface
- Refactor socket calls to use SocketComm class

Test: Use python scripts and custom emulator to test communications

Change-Id: Ia401587223035e748991516a2285cc31cb71a9c9
parent cef9212a
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -76,6 +76,8 @@ include $(CLEAR_VARS)
LOCAL_MODULE:= $(vhal_v2_0)-default-impl-lib
LOCAL_SRC_FILES:= \
    impl/vhal_v2_0/DefaultVehicleHal.cpp \
    impl/vhal_v2_0/PipeComm.cpp \
    impl/vhal_v2_0/SocketComm.cpp

LOCAL_C_INCLUDES := \
    $(LOCAL_PATH)/impl/vhal_v2_0
@@ -87,6 +89,7 @@ LOCAL_WHOLE_STATIC_LIBRARIES := \
    $(vhal_v2_0)-manager-lib \

LOCAL_SHARED_LIBRARIES := \
    libbase \
    libbinder \
    libhidlbase \
    libhidltransport \
@@ -99,6 +102,8 @@ LOCAL_SHARED_LIBRARIES := \
LOCAL_STATIC_LIBRARIES := \
    $(vhal_v2_0)-libproto-native \

LOCAL_CFLAGS += -Wall -Wextra -Werror

include $(BUILD_STATIC_LIBRARY)


@@ -148,6 +153,7 @@ LOCAL_SRC_FILES := \
    VehicleService.cpp

LOCAL_SHARED_LIBRARIES := \
    libbase \
    libbinder \
    libhidlbase \
    libhidltransport \
@@ -162,4 +168,6 @@ LOCAL_STATIC_LIBRARIES := \
    $(vhal_v2_0)-default-impl-lib \
    $(vhal_v2_0)-libproto-native \

LOCAL_CFLAGS += -Wall -Wextra -Werror

include $(BUILD_EXECUTABLE)
+86 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef android_hardware_automotive_vehicle_V2_0_impl_CommBase_H_
#define android_hardware_automotive_vehicle_V2_0_impl_CommBase_H_

#include <string>
#include <vector>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
namespace V2_0 {

namespace impl {

/**
 * This is the communications base class.  It defines the interface used in DefaultVehicleHal to
 * send and receive data to and from the emulator.
 */
class CommBase {
public:
    virtual ~CommBase() = default;

    /**
     * Closes a connection if it is open.
     */
    virtual void stop() {}

    /**
     * Creates a connection to the other side.
     *
     * @return int Returns fd or socket number if connection is successful.
     *              Otherwise, returns -1 if no connection is availble.
     */
    virtual int connect() { return 0; }

    /**
     * Opens the communications channel.
     *
     * @return int Returns 0 if channel is opened, else -errno if failed.
     */
    virtual int open() = 0;

    /**
     * Blocking call to read data from the connection.
     *
     * @return std::vector<uint8_t> Serialized protobuf data received from emulator.  This will be
     *              an empty vector if the connection was closed or some other error occurred.
     */
    virtual std::vector<uint8_t> read() = 0;

    /**
     * Transmits a string of data to the emulator.
     *
     * @param data Serialized protobuf data to transmit.
     *
     * @return int Number of bytes transmitted, or -1 if failed.
     */
    virtual int write(const std::vector<uint8_t>& data) = 0;
};

}  // impl

}  // namespace V2_0
}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android


#endif  // android_hardware_automotive_vehicle_V2_0_impl_CommBase_H_
+61 −112
Original line number Diff line number Diff line
@@ -18,14 +18,14 @@
#include <android/log.h>

#include <algorithm>
#include <netinet/in.h>
#include <sys/socket.h>
#include <android-base/properties.h>
#include <cstdio>

#include "DefaultVehicleHal.h"
#include "PipeComm.h"
#include "SocketComm.h"
#include "VehicleHalProto.pb.h"

#define DEBUG_SOCKET    (33452)

namespace android {
namespace hardware {
namespace automotive {
@@ -293,10 +293,8 @@ void DefaultVehicleHal::initObd2FreezeFrame(VehiclePropConfig& propConfig) {
void DefaultVehicleHal::parseRxProtoBuf(std::vector<uint8_t>& msg) {
    emulator::EmulatorMessage rxMsg;
    emulator::EmulatorMessage respMsg;
    std::string str(reinterpret_cast<const char*>(msg.data()), msg.size());

    rxMsg.ParseFromString(str);

    if (rxMsg.ParseFromArray(msg.data(), msg.size())) {
        switch (rxMsg.msg_type()) {
        case emulator::GET_CONFIG_CMD:
            doGetConfig(rxMsg, respMsg);
@@ -321,6 +319,9 @@ void DefaultVehicleHal::parseRxProtoBuf(std::vector<uint8_t>& msg) {

        // Send the reply
        txMsg(respMsg);
    } else {
        ALOGE("%s: ParseFromString() failed. msgSize=%d", __FUNCTION__, static_cast<int>(msg.size()));
    }
}

// Copies internal VehiclePropConfig data structure to protobuf VehiclePropConfig
@@ -415,94 +416,50 @@ void DefaultVehicleHal::populateProtoVehiclePropValue(emulator::VehiclePropValue
    }
}

void DefaultVehicleHal::rxMsg(void) {
void DefaultVehicleHal::rxMsg() {
    int  numBytes = 0;
    int32_t msgSize;
    do {
        // This is a variable length message.
        // Read the number of bytes to rx over the socket
        numBytes = read(mCurSocket, &msgSize, sizeof(msgSize));

        if (numBytes != sizeof(msgSize)) {
            // This happens when connection is closed
            ALOGD("%s: numBytes=%d, expected=4", __FUNCTION__, numBytes);
            break;
        }

        std::vector<uint8_t> msg = std::vector<uint8_t>(msgSize);

        numBytes = read(mCurSocket, msg.data(), msgSize);
    while (mExit == 0) {
        std::vector<uint8_t> msg = mComm->read();

        if ((numBytes == msgSize) && (msgSize > 0)) {
        if (msg.size() > 0) {
            // Received a message.
            parseRxProtoBuf(msg);
        } else {
            // This happens when connection is closed
            ALOGD("%s: numBytes=%d, msgSize=%d", __FUNCTION__, numBytes, msgSize);
            ALOGD("%s: numBytes=%d, msgSize=%d", __FUNCTION__, numBytes,
                  static_cast<int32_t>(msg.size()));
            break;
        }
    } while (mExit == 0);
    }

void DefaultVehicleHal::rxThread(void) {
    // Initialize the socket
    {
        int retVal;
        struct sockaddr_in servAddr;

        mSocket = socket(AF_INET, SOCK_STREAM, 0);
        if (mSocket < 0) {
            ALOGE("%s: socket() failed, mSocket=%d, errno=%d", __FUNCTION__, mSocket, errno);
            mSocket = -1;
            return;
}

        bzero(&servAddr, sizeof(servAddr));
        servAddr.sin_family = AF_INET;
        servAddr.sin_addr.s_addr = INADDR_ANY;
        servAddr.sin_port = htons(DEBUG_SOCKET);
void DefaultVehicleHal::rxThread() {
    bool isEmulator = android::base::GetBoolProperty("ro.kernel.qemu", false);

        retVal = bind(mSocket, reinterpret_cast<struct sockaddr*>(&servAddr), sizeof(servAddr));
        if(retVal < 0) {
            ALOGE("%s: Error on binding: retVal=%d, errno=%d", __FUNCTION__, retVal, errno);
            close(mSocket);
            mSocket = -1;
            return;
    if (isEmulator) {
        // Initialize pipe to Emulator
        mComm.reset(new PipeComm);
    } else {
        // Initialize socket over ADB
        mComm.reset(new SocketComm);
    }

        listen(mSocket, 1);

        // Set the socket to be non-blocking so we can poll it continouously
        fcntl(mSocket, F_SETFL, O_NONBLOCK);
    }
    int retVal = mComm->open();

    if (retVal == 0) {
        // Comms are properly opened
        while (mExit == 0) {
        struct sockaddr_in cliAddr;
        socklen_t cliLen = sizeof(cliAddr);
        int cSocket = accept(mSocket, reinterpret_cast<struct sockaddr*>(&cliAddr), &cliLen);
            retVal = mComm->connect();

        if (cSocket >= 0) {
            {
                std::lock_guard<std::mutex> lock(mTxMutex);
                mCurSocket = cSocket;
            }
            ALOGD("%s: Incoming connection received on socket %d", __FUNCTION__, cSocket);
            if (retVal >= 0) {
                rxMsg();
            ALOGD("%s: Connection terminated on socket %d", __FUNCTION__, cSocket);
            {
                std::lock_guard<std::mutex> lock(mTxMutex);
                mCurSocket = -1;
            }
            }

        // TODO:  Use a blocking socket?
        // Check every 100ms for a new socket connection
            // Check every 100ms for a new connection
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

    // Shutdown the socket
    close(mSocket);
    mSocket = -1;
    }
}

// This function sets the default value of a property if we are interested in setting it.
@@ -569,21 +526,15 @@ void DefaultVehicleHal::setDefaultValue(VehiclePropValue* prop) {

// Transmit a reply back to the emulator
void DefaultVehicleHal::txMsg(emulator::EmulatorMessage& txMsg) {
    std::string msgString;
    int numBytes = txMsg.ByteSize();
    std::vector<uint8_t> msg(numBytes);

    if (txMsg.SerializeToString(&msgString)) {
        int32_t msgLen = msgString.length();
    if (txMsg.SerializeToArray(msg.data(), msg.size())) {
        int retVal = 0;

        // TODO:  Prepend the message length to the string without a copy
        msgString.insert(0, reinterpret_cast<char*>(&msgLen), 4);

        // Send the message
        {
            std::lock_guard<std::mutex> lock(mTxMutex);
            if (mCurSocket != -1) {
                retVal = write(mCurSocket, msgString.data(), msgString.size());
            }
        if (mExit == 0) {
            mComm->write(msg);
        }

        if (retVal < 0) {
@@ -683,9 +634,7 @@ StatusCode DefaultVehicleHal::set(const VehiclePropValue& propValue) {
// Parse supported properties list and generate vector of property values to hold current values.
void DefaultVehicleHal::onCreate() {
    // Initialize member variables
    mCurSocket = -1;
    mExit = 0;
    mSocket = -1;

    // Get the list of configurations supported by this HAL
    std::vector<VehiclePropConfig> configs = listProperties();
+7 −12
Original line number Diff line number Diff line
@@ -23,6 +23,9 @@

#include <utils/SystemClock.h>

#include "CommBase.h"
#include "VehicleHalProto.pb.h"

#include <vhal_v2_0/VehicleHal.h>
#include <vhal_v2_0/Obd2SensorStore.h>

@@ -45,13 +48,7 @@ public:
        mExit = 1;

        // Close emulator socket if it is open
        {
            std::lock_guard<std::mutex> lock(mTxMutex);
            if (mCurSocket != -1) {
                close(mCurSocket);
                mCurSocket = -1;
            }
        }
        mComm->stop();

        mThread.join();
    }
@@ -94,8 +91,8 @@ private:
    void populateProtoVehiclePropValue(emulator::VehiclePropValue* protoVal,
                                       const VehiclePropValue* val);
    void setDefaultValue(VehiclePropValue* prop);
    void rxMsg(void);
    void rxThread(void);
    void rxMsg();
    void rxThread();
    void txMsg(emulator::EmulatorMessage& txMsg);
    StatusCode updateProperty(const VehiclePropValue& propValue);
    StatusCode fillObd2LiveFrame(VehiclePropValue* v);
@@ -106,14 +103,12 @@ private:
private:
    // TODO:  Use a hashtable to support indexing props
    std::vector<std::unique_ptr<VehiclePropValue>> mProps;
    std::atomic<int> mCurSocket;
    std::atomic<int> mExit;
    std::unique_ptr<VehiclePropValue> mLiveObd2Frame {nullptr};
    std::vector<std::unique_ptr<VehiclePropValue>> mFreezeObd2Frames;
    std::mutex mPropsMutex;
    int mSocket;
    std::mutex mTxMutex;
    std::thread mThread;
    std::unique_ptr<CommBase> mComm{nullptr};
};

}  // impl
+106 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "PipeComm"

#include <android/hardware/automotive/vehicle/2.0/IVehicle.h>
#include <android/log.h>
#include <system/qemu_pipe.h>

#include "PipeComm.h"

#define CAR_SERVICE_NAME "pipe:qemud:car"


namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
namespace V2_0 {

namespace impl {

PipeComm::PipeComm() {
    // Initialize member vars
    mPipeFd = -1;
}


int PipeComm::open() {
    int fd = qemu_pipe_open(CAR_SERVICE_NAME);

    if (fd < 0) {
        ALOGE("%s: Could not open connection to service: %s %d", __FUNCTION__, strerror(errno), fd);
        return -errno;
    }

    ALOGI("%s: OPENED PIPE, fd=%d", __FUNCTION__, fd);
    mPipeFd = fd;
    return 0;
}

std::vector<uint8_t> PipeComm::read() {
    static constexpr int MAX_RX_MSG_SZ = 2048;
    std::vector<uint8_t> msg = std::vector<uint8_t>(MAX_RX_MSG_SZ);
    int numBytes;

    numBytes = qemu_pipe_frame_recv(mPipeFd, msg.data(), msg.size());

    if (numBytes == MAX_RX_MSG_SZ) {
        ALOGE("%s:  Received max size = %d", __FUNCTION__, MAX_RX_MSG_SZ);
    } else if (numBytes > 0) {
        msg.resize(numBytes);
        return msg;
    } else {
        ALOGD("%s: Connection terminated on pipe %d, numBytes=%d", __FUNCTION__, mPipeFd, numBytes);
        {
            std::lock_guard<std::mutex> lock(mMutex);
            mPipeFd = -1;
        }
    }

    return std::vector<uint8_t>();
}

int PipeComm::write(const std::vector<uint8_t>& data) {
    int retVal = 0;

    {
        std::lock_guard<std::mutex> lock(mMutex);
        if (mPipeFd != -1) {
            retVal = qemu_pipe_frame_send(mPipeFd, data.data(), data.size());
        }
    }

    if (retVal < 0) {
        retVal = -errno;
        ALOGE("%s:  send_cmd: (fd=%d): ERROR: %s", __FUNCTION__, mPipeFd, strerror(errno));
    }

    return retVal;
}


}  // impl

}  // namespace V2_0
}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android


Loading