Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ccf9b815 authored by Jerry Zhang's avatar Jerry Zhang Committed by android-build-merger
Browse files

Merge "Switch to a thread pool for aio operations"

am: dd946e87

Change-Id: I5ef02f77c0320d88e4f81b21e986511de6d9bfaa
parents e85263ca dd946e87
Loading
Loading
Loading
Loading
+86 −16
Original line number Original line Diff line number Diff line
@@ -15,42 +15,109 @@
 */
 */


#include <android-base/logging.h>
#include <android-base/logging.h>
#include <condition_variable>
#include <memory>
#include <memory>
#include <mutex>
#include <pthread.h>
#include <queue>
#include <queue>
#include <thread>
#include <unistd.h>
#include <unistd.h>


#include "PosixAsyncIO.h"
#include "PosixAsyncIO.h"


namespace {
namespace {


void read_func(struct aiocb *aiocbp) {
std::thread gWorkerThread;
    aiocbp->ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes,
std::deque<struct aiocb*> gWorkQueue;
bool gSuspended = true;
int gAiocbRefcount = 0;
std::mutex gLock;
std::condition_variable gWait;

void work_func(void *) {
    pthread_setname_np(pthread_self(), "AsyncIO work");
    while (true) {
        struct aiocb *aiocbp;
        {
            std::unique_lock<std::mutex> lk(gLock);
            gWait.wait(lk, []{return gWorkQueue.size() > 0 || gSuspended;});
            if (gSuspended)
                return;
            aiocbp = gWorkQueue.back();
            gWorkQueue.pop_back();
        }
        CHECK(aiocbp->queued);
        int ret;
        if (aiocbp->read) {
            ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes,
                    aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
        } else {
            ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
               aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
               aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
    if (aiocbp->ret == -1) aiocbp->error = errno;
        }
        {
            std::unique_lock<std::mutex> lk(aiocbp->lock);
            aiocbp->ret = ret;
            if (aiocbp->ret == -1) {
                aiocbp->error = errno;
            }
            aiocbp->queued = false;
        }
        aiocbp->cv.notify_all();
    }
}
}


void write_func(struct aiocb *aiocbp) {
int aio_add(struct aiocb *aiocbp) {
    aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
    CHECK(!aiocbp->queued);
                aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
    aiocbp->queued = true;
    if (aiocbp->ret == -1) aiocbp->error = errno;
    {
        std::unique_lock<std::mutex> lk(gLock);
        gWorkQueue.push_front(aiocbp);
    }
    gWait.notify_one();
    return 0;
}
}


} // end anonymous namespace
} // end anonymous namespace


aiocb::aiocb() {
    this->ret = 0;
    this->queued = false;
    {
        std::unique_lock<std::mutex> lk(gLock);
        if (gAiocbRefcount == 0) {
            CHECK(gWorkQueue.size() == 0);
            CHECK(gSuspended);
            gSuspended = false;
            gWorkerThread = std::thread(work_func, nullptr);
        }
        gAiocbRefcount++;
    }
}

aiocb::~aiocb() {
aiocb::~aiocb() {
    CHECK(!thread.joinable());
    CHECK(!this->queued);
    {
        std::unique_lock<std::mutex> lk(gLock);
        CHECK(!gSuspended);
        if (gAiocbRefcount == 1) {
            CHECK(gWorkQueue.size() == 0);
            gSuspended = true;
            lk.unlock();
            gWait.notify_one();
            gWorkerThread.join();
            lk.lock();
        }
        gAiocbRefcount--;
    }
}
}


int aio_read(struct aiocb *aiocbp) {
int aio_read(struct aiocb *aiocbp) {
    aiocbp->thread = std::thread(read_func, aiocbp);
    aiocbp->read = true;
    return 0;
    return aio_add(aiocbp);
}
}


int aio_write(struct aiocb *aiocbp) {
int aio_write(struct aiocb *aiocbp) {
    aiocbp->thread = std::thread(write_func, aiocbp);
    aiocbp->read = false;
    return 0;
    return aio_add(aiocbp);
}
}


int aio_error(const struct aiocb *aiocbp) {
int aio_error(const struct aiocb *aiocbp) {
@@ -64,7 +131,10 @@ ssize_t aio_return(struct aiocb *aiocbp) {
int aio_suspend(struct aiocb *aiocbp[], int n,
int aio_suspend(struct aiocb *aiocbp[], int n,
        const struct timespec *) {
        const struct timespec *) {
    for (int i = 0; i < n; i++) {
    for (int i = 0; i < n; i++) {
        aiocbp[i]->thread.join();
        {
            std::unique_lock<std::mutex> lk(aiocbp[i]->lock);
            aiocbp[i]->cv.wait(lk, [aiocbp, i]{return !aiocbp[i]->queued;});
        }
    }
    }
    return 0;
    return 0;
}
}
+8 −2
Original line number Original line Diff line number Diff line
@@ -17,10 +17,11 @@
#ifndef _POSIXASYNCIO_H
#ifndef _POSIXASYNCIO_H
#define _POSIXASYNCIO_H
#define _POSIXASYNCIO_H


#include <condition_variable>
#include <mutex>
#include <sys/cdefs.h>
#include <sys/cdefs.h>
#include <sys/types.h>
#include <sys/types.h>
#include <time.h>
#include <time.h>
#include <thread>
#include <unistd.h>
#include <unistd.h>


/**
/**
@@ -35,10 +36,15 @@ struct aiocb {
    size_t aio_nbytes;
    size_t aio_nbytes;


    // Used internally
    // Used internally
    std::thread thread;
    bool read;
    bool queued;
    ssize_t ret;
    ssize_t ret;
    int error;
    int error;


    std::mutex lock;
    std::condition_variable cv;

    aiocb();
    ~aiocb();
    ~aiocb();
};
};