Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f5f18465 authored by Sharvil Nanavati's avatar Sharvil Nanavati
Browse files

Implement the reactor pattern in C.

This code will form the basis of most select-based event loops in
bluedroid. It provides a thread-safe abort routine and a separation
between the dispatcher and event handler code.

Change-Id: I6f1c033d18f045ba273187dab607c209dfe32d30
parent d05e8d4e
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ LOCAL_SRC_FILES := \
    ./src/config.c \
    ./src/fixed_queue.c \
    ./src/list.c \
    ./src/reactor.c \
    ./src/semaphore.c

LOCAL_CFLAGS := -std=c99 -Wall -Werror
@@ -28,7 +29,8 @@ LOCAL_C_INCLUDES := \

LOCAL_SRC_FILES := \
    ./test/config_test.cpp \
    ./test/list_test.cpp
    ./test/list_test.cpp \
    ./test/reactor_test.cpp

LOCAL_CFLAGS := -Wall -Werror
LOCAL_MODULE := ositests
+2 −0
Original line number Diff line number Diff line
@@ -5,3 +5,5 @@

#define UNUSED_ATTR __attribute__((unused))
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))

typedef uint32_t timeout_t;
+89 −0
Original line number Diff line number Diff line
/******************************************************************************
 *
 *  Copyright (C) 2014 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#pragma once

#include <stdbool.h>
#include <stdint.h>

#include "osi.h"

// This module implements the Reactor pattern.
// See http://en.wikipedia.org/wiki/Reactor_pattern for details.

struct reactor_t;
typedef struct reactor_t reactor_t;

struct reactor_object_t;
typedef struct reactor_object_t reactor_object_t;

// Enumerates the types of events a reactor object is interested
// in responding to.
typedef enum {
  REACTOR_INTEREST_READ  = 1,
  REACTOR_INTEREST_WRITE = 2,
  REACTOR_INTEREST_READ_WRITE = 3,
} reactor_interest_t;

// Enumerates the reasons a reactor has stopped.
typedef enum {
  REACTOR_STATUS_STOP,     // |reactor_stop| was called.
  REACTOR_STATUS_TIMEOUT,  // a timeout was specified and the reactor timed out.
  REACTOR_STATUS_ERROR,    // there was an error during the operation.
  REACTOR_STATUS_DONE,     // the reactor completed its work (for the _run_once* variants).
} reactor_status_t;

struct reactor_object_t {
  void *context;                       // a context that's passed back to the *_ready functions.
  int fd;                              // the file descriptor to monitor for events.
  reactor_interest_t interest;         // the event types to monitor the file descriptor for.

  void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
  void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
};

// Creates a new reactor object. Returns NULL on failure. The returned object
// must be freed by calling |reactor_free|.
reactor_t *reactor_new(void);

// Frees a reactor object created with |reactor_new|. |reactor| may be NULL.
void reactor_free(reactor_t *reactor);

// Starts the reactor. This function blocks the caller until |reactor_stop| is called
// from another thread or in a callback. |reactor| may not be NULL.
reactor_status_t reactor_start(reactor_t *reactor);

// Runs one iteration of the reactor. This function blocks until at least one registered object
// becomes ready. |reactor| may not be NULL.
reactor_status_t reactor_run_once(reactor_t *reactor);

// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready.
reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms);

// Immediately unblocks the reactor. This function is safe to call from any thread.
// |reactor| may not be NULL.
void reactor_stop(reactor_t *reactor);

// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred
// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither
// |reactor| nor |obj| may be NULL.
void reactor_register(reactor_t *reactor, reactor_object_t *obj);

// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj|
// may be NULL.
void reactor_unregister(reactor_t *reactor, reactor_object_t *obj);
+174 −0
Original line number Diff line number Diff line
/******************************************************************************
 *
 *  Copyright (C) 2014 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#define LOG_TAG "bt_osi_reactor"

#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <sys/select.h>
#include <utils/Log.h>

#include "list.h"
#include "reactor.h"

#if !defined(EFD_SEMAPHORE)
#  define EFD_SEMAPHORE (1 << 0)
#endif

struct reactor_t {
  int event_fd;
  list_t *objects;
};

static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);

reactor_t *reactor_new(void) {
  reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
  if (!ret)
    return NULL;

  ret->event_fd = eventfd(0, EFD_SEMAPHORE);
  if (ret->event_fd == -1) {
    ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
    goto error;
  }

  ret->objects = list_new(NULL);
  if (!ret->objects)
    goto error;

  return ret;

error:;
  list_free(ret->objects);
  close(ret->event_fd);
  free(ret);
  return NULL;
}

void reactor_free(reactor_t *reactor) {
  if (!reactor)
    return;

  list_free(reactor->objects);
  close(reactor->event_fd);
  free(reactor);
}

reactor_status_t reactor_start(reactor_t *reactor) {
  assert(reactor != NULL);
  return run_reactor(reactor, 0, NULL);
}

reactor_status_t reactor_run_once(reactor_t *reactor) {
  assert(reactor != NULL);
  return run_reactor(reactor, 1, NULL);
}

reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
  assert(reactor != NULL);

  struct timeval tv;
  tv.tv_sec = timeout_ms / 1000;
  tv.tv_usec = (timeout_ms % 1000) * 1000;
  return run_reactor(reactor, 1, &tv);
}

void reactor_stop(reactor_t *reactor) {
  assert(reactor != NULL);

  eventfd_write(reactor->event_fd, 1);
}

void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
  assert(reactor != NULL);
  assert(obj != NULL);

  list_append(reactor->objects, obj);
}

void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
  assert(reactor != NULL);
  assert(obj != NULL);

  list_remove(reactor->objects, obj);
}

// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
// 0 |iterations| means loop forever.
// NULL |tv| means no timeout (block until an event occurs).
// |reactor| may not be NULL.
static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
  assert(reactor != NULL);

  for (int i = 0; iterations == 0 || i < iterations; ++i) {
    fd_set read_set;
    fd_set write_set;
    FD_ZERO(&read_set);
    FD_ZERO(&write_set);
    FD_SET(reactor->event_fd, &read_set);

    int max_fd = reactor->event_fd;
    for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
      reactor_object_t *object = (reactor_object_t *)list_node(iter);
      int fd = object->fd;
      reactor_interest_t interest = object->interest;
      if (interest & REACTOR_INTEREST_READ)
        FD_SET(fd, &read_set);
      if (interest & REACTOR_INTEREST_WRITE)
        FD_SET(fd, &write_set);
      if (fd > max_fd)
        max_fd = fd;
    }

    int ret;
    do {
      ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
    } while (ret == -1 && errno == EINTR);

    if (ret == -1) {
      ALOGE("%s error in select: %s", __func__, strerror(errno));
      return REACTOR_STATUS_ERROR;
    }

    if (ret == 0)
      return REACTOR_STATUS_TIMEOUT;

    if (FD_ISSET(reactor->event_fd, &read_set)) {
      eventfd_t value;
      eventfd_read(reactor->event_fd, &value);
      return REACTOR_STATUS_STOP;
    }

    for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
      reactor_object_t *object = (reactor_object_t *)list_node(iter);
      int fd = object->fd;
      if (FD_ISSET(fd, &read_set)) {
        object->read_ready(object->context);
        --ret;
      }
      if (FD_ISSET(fd, &write_set)) {
        object->write_ready(object->context);
        --ret;
      }
    }
  }
  return REACTOR_STATUS_DONE;
}
+101 −0
Original line number Diff line number Diff line
#include <gtest/gtest.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>

extern "C" {
#include "reactor.h"
}

static pthread_t thread;
static volatile bool thread_running;

static void *reactor_thread(void *ptr) {
  reactor_t *reactor = (reactor_t *)ptr;

  thread_running = true;
  reactor_start(reactor);
  thread_running = false;

  return NULL;
}

static void spawn_reactor_thread(reactor_t *reactor) {
  int ret = pthread_create(&thread, NULL, reactor_thread, reactor);
  EXPECT_EQ(ret, 0);
}

static void join_reactor_thread() {
  pthread_join(thread, NULL);
}

static uint64_t get_timestamp(void) {
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}

TEST(ReactorTest, reactor_new) {
  reactor_t *reactor = reactor_new();
  EXPECT_TRUE(reactor != NULL);
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_free_null) {
  reactor_free(NULL);
}

TEST(ReactorTest, reactor_stop_start) {
  reactor_t *reactor = reactor_new();
  reactor_stop(reactor);
  reactor_start(reactor);
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_repeated_stop_start) {
  reactor_t *reactor = reactor_new();
  for (int i = 0; i < 10; ++i) {
    reactor_stop(reactor);
    reactor_start(reactor);
  }
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_multi_stop_start) {
  reactor_t *reactor = reactor_new();

  reactor_stop(reactor);
  reactor_stop(reactor);
  reactor_stop(reactor);

  reactor_start(reactor);
  reactor_start(reactor);
  reactor_start(reactor);

  reactor_free(reactor);
}

TEST(ReactorTest, reactor_start_wait_stop) {
  reactor_t *reactor = reactor_new();

  spawn_reactor_thread(reactor);
  usleep(50 * 1000);
  EXPECT_TRUE(thread_running);

  reactor_stop(reactor);
  join_reactor_thread();
  EXPECT_FALSE(thread_running);

  reactor_free(reactor);
}

TEST(ReactorTest, reactor_run_once_timeout) {
  reactor_t *reactor = reactor_new();

  uint64_t start = get_timestamp();
  reactor_status_t status = reactor_run_once_timeout(reactor, 50);
  EXPECT_GE(get_timestamp() - start, 50);
  EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT);

  reactor_free(reactor);
}