Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ad961a19 authored by Sharvil Nanavati's avatar Sharvil Nanavati Committed by Sharvil Nanavati
Browse files

Extend the threading library to back thread with an event queue.

This change allows arbitrary functions to be called on a given
thread which will clean up much of the dispatch code in bluedroid.
Looking forward, this code will be extended to allow additional objects
and queues to be attached to a thread for more customizable dispatch.

Change-Id: Id3a16256c264e3d35e6db5a562cb0e7762676457
parent bfd08910
Loading
Loading
Loading
Loading
+23 −8
Original line number Diff line number Diff line
@@ -20,16 +20,31 @@

#define THREAD_NAME_MAX 16

struct thread_t;
typedef struct thread_t thread_t;
typedef void (*thread_fn)(void *context);

typedef void *(*thread_start_cb) (void *);
// Creates and starts a new thread with the given name. Only THREAD_NAME_MAX
// bytes from |name| will be assigned to the newly-created thread. Returns a
// thread object if the thread was successfully started, NULL otherwise. The
// returned thread object must be freed with |thread_free|. |name| may not
// be NULL.
thread_t *thread_new(const char *name);

// Lifecycle
thread_t *thread_create(const char *name,
                         thread_start_cb start_routine, void *arg);
int thread_join(thread_t *thread, void **retval);
// Frees the given |thread|. If the thread is still running, it is stopped
// and the calling thread will block until |thread| terminates. |thread|
// may be NULL.
void thread_free(thread_t *thread);

// Query
pid_t thread_id(const thread_t *thread);
// Call |func| with the argument |context| on |thread|. This function typically
// does not block unless there are an excessive number of functions posted to
// |thread| that have not been dispatched yet. Neither |thread| nor |func| may
// be NULL. |context| may be NULL.
bool thread_post(thread_t *thread, thread_fn func, void *context);

// Requests |thread| to stop. Only |thread_free| and |thread_name| may be called
// after calling |thread_stop|. This function is guaranteed to not block.
// |thread| may not be NULL.
void thread_stop(thread_t *thread);

// Returns the name of the given |thread|. |thread| may not be NULL.
const char *thread_name(const thread_t *thread);
+126 −51
Original line number Diff line number Diff line
@@ -19,75 +19,59 @@
#define LOG_TAG "osi_thread"

#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/prctl.h>
#include <sys/types.h>
#include <utils/Log.h>

#include "fixed_queue.h"
#include "reactor.h"
#include "semaphore.h"
#include "thread.h"

typedef struct thread_t {
struct thread_t {
  pthread_t pthread;
  pid_t tid;
  char name[THREAD_NAME_MAX + 1];
} thread_t;

pid_t thread_id(const thread_t *thread) {
  assert(thread != NULL);
  return thread->tid;
}

const char *thread_name(const thread_t *thread) {
  assert(thread != NULL);
  return thread->name;
}
  reactor_t *reactor;
  fixed_queue_t *work_queue;
};

struct start_arg {
  thread_t *thread;
  semaphore_t *start_sem;
  int error;
  thread_start_cb start_routine;
  void *arg;
};

static void *run_thread(void *start_arg) {
  assert(start_arg != NULL);
typedef struct {
  thread_fn func;
  void *context;
} work_item_t;

  struct start_arg *start = start_arg;
  thread_t *thread = start->thread;
static void *run_thread(void *start_arg);
static void work_queue_read_cb(void *context);

  assert(thread != NULL);
static const size_t WORK_QUEUE_CAPACITY = 128;

  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
    ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
    start->error = errno;
    semaphore_post(start->start_sem);
    return NULL;
  }
  thread->tid = gettid();

  // Cache local values because we are about to let thread_create
  // continue
  thread_start_cb start_routine = start->start_routine;
  void *arg = start->arg;

  semaphore_post(start->start_sem);
  return start_routine(arg);
}

thread_t *thread_create(const char *name,
                        thread_start_cb start_routine, void *arg) {
thread_t *thread_new(const char *name) {
  assert(name != NULL);
  assert(start_routine != NULL);

  // Start is on the stack, but we use a semaphore, so it's safe
  struct start_arg start;
  thread_t *ret;
  ret = calloc(1, sizeof(thread_t));
  thread_t *ret = calloc(1, sizeof(thread_t));
  if (!ret)
    goto error;

  ret->reactor = reactor_new();
  if (!ret->reactor)
    goto error;

  ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY);
  if (!ret->work_queue)
    goto error;

  struct start_arg start;
  start.start_sem = semaphore_new(0);
  if (!start.start_sem)
    goto error;
@@ -95,23 +79,114 @@ thread_t *thread_create(const char *name,
  strncpy(ret->name, name, THREAD_NAME_MAX);
  start.thread = ret;
  start.error = 0;
  start.start_routine = start_routine;
  start.arg = arg;
  pthread_create(&ret->pthread, NULL, run_thread, &start);
  semaphore_wait(start.start_sem);
  semaphore_free(start.start_sem);
  if (start.error)
    goto error;
  return ret;

error:;
  semaphore_free(start.start_sem);
  if (ret) {
    fixed_queue_free(ret->work_queue, free);
    reactor_free(ret->reactor);
  }
  free(ret);
  return NULL;
}

int thread_join(thread_t *thread, void **retval) {
  int ret = pthread_join(thread->pthread, retval);
  if (!ret)
void thread_free(thread_t *thread) {
  if (!thread)
    return;

  thread_stop(thread);
  pthread_join(thread->pthread, NULL);
  fixed_queue_free(thread->work_queue, free);
  reactor_free(thread->reactor);
  free(thread);
  return ret;
}

bool thread_post(thread_t *thread, thread_fn func, void *context) {
  assert(thread != NULL);
  assert(func != NULL);

  // TODO(sharvil): if the current thread == |thread| and we've run out
  // of queue space, we should abort this operation, otherwise we'll
  // deadlock.

  // Queue item is freed either when the queue itself is destroyed
  // or when the item is removed from the queue for dispatch.
  work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t));
  if (!item) {
    ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno));
    return false;
  }
  item->func = func;
  item->context = context;
  fixed_queue_enqueue(thread->work_queue, item);
  return true;
}

void thread_stop(thread_t *thread) {
  assert(thread != NULL);
  reactor_stop(thread->reactor);
}

const char *thread_name(const thread_t *thread) {
  assert(thread != NULL);
  return thread->name;
}

static void *run_thread(void *start_arg) {
  assert(start_arg != NULL);

  struct start_arg *start = start_arg;
  thread_t *thread = start->thread;

  assert(thread != NULL);

  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
    ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
    start->error = errno;
    semaphore_post(start->start_sem);
    return NULL;
  }
  thread->tid = gettid();

  semaphore_post(start->start_sem);

  reactor_object_t work_queue_object;
  work_queue_object.context = thread->work_queue;
  work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
  work_queue_object.interest = REACTOR_INTEREST_READ;
  work_queue_object.read_ready = work_queue_read_cb;

  reactor_register(thread->reactor, &work_queue_object);
  reactor_start(thread->reactor);

  // Make sure we dispatch all queued work items before exiting the thread.
  // This allows a caller to safely tear down by enqueuing a teardown
  // work item and then joining the thread.
  size_t count = 0;
  work_item_t *item = fixed_queue_try_dequeue(thread->work_queue);
  while (item && count <= WORK_QUEUE_CAPACITY) {
    item->func(item->context);
    free(item);
    item = fixed_queue_try_dequeue(thread->work_queue);
    ++count;
  }

  if (count > WORK_QUEUE_CAPACITY)
    ALOGD("%s growing event queue on shutdown.", __func__);

  return NULL;
}

static void work_queue_read_cb(void *context) {
  assert(context != NULL);

  fixed_queue_t *queue = (fixed_queue_t *)context;
  work_item_t *item = fixed_queue_dequeue(queue);
  item->func(item->context);
  free(item);
}
+11 −24
Original line number Diff line number Diff line
@@ -5,44 +5,31 @@ extern "C" {
#include "osi.h"
}

void *start_routine(void *arg)
{
  return arg;
}

TEST(ThreadTest, test_new_simple) {
  thread_t *thread = thread_create("test_thread", &start_routine, NULL);
  thread_t *thread = thread_new("test_thread");
  ASSERT_TRUE(thread != NULL);
  thread_join(thread, NULL);
  thread_free(thread);
}

TEST(ThreadTest, test_join_simple) {
  thread_t *thread = thread_create("test_thread", &start_routine, NULL);
  thread_join(thread, NULL);
TEST(ThreadTest, test_free_simple) {
  thread_t *thread = thread_new("test_thread");
  thread_free(thread);
}

TEST(ThreadTest, test_name) {
  thread_t *thread = thread_create("test_name", &start_routine, NULL);
  thread_t *thread = thread_new("test_name");
  ASSERT_STREQ(thread_name(thread), "test_name");
  thread_join(thread, NULL);
  thread_free(thread);
}

TEST(ThreadTest, test_long_name) {
  thread_t *thread = thread_create("0123456789abcdef", &start_routine, NULL);
  thread_t *thread = thread_new("0123456789abcdef");
  ASSERT_STREQ("0123456789abcdef", thread_name(thread));
  thread_join(thread, NULL);
  thread_free(thread);
}

TEST(ThreadTest, test_very_long_name) {
  thread_t *thread = thread_create("0123456789abcdefg", &start_routine, NULL);
  thread_t *thread = thread_new("0123456789abcdefg");
  ASSERT_STREQ("0123456789abcdef", thread_name(thread));
  thread_join(thread, NULL);
}

TEST(ThreadTest, test_return) {
  int arg = 10;
  void *ret;
  thread_t *thread = thread_create("test", &start_routine, &arg);
  thread_join(thread, &ret);
  ASSERT_EQ(ret, &arg);
  thread_free(thread);
}