Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 26222462 authored by Sharvil Nanavati's avatar Sharvil Nanavati Committed by Android Git Automerger
Browse files

am f5f18465: Implement the reactor pattern in C.

* commit 'f5f18465':
  Implement the reactor pattern in C.
parents bda908ac f5f18465
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ LOCAL_SRC_FILES := \
    ./src/config.c \
    ./src/fixed_queue.c \
    ./src/list.c \
    ./src/reactor.c \
    ./src/semaphore.c

LOCAL_CFLAGS := -std=c99 -Wall -Werror
@@ -28,7 +29,8 @@ LOCAL_C_INCLUDES := \

LOCAL_SRC_FILES := \
    ./test/config_test.cpp \
    ./test/list_test.cpp
    ./test/list_test.cpp \
    ./test/reactor_test.cpp

LOCAL_CFLAGS := -Wall -Werror
LOCAL_MODULE := ositests
+2 −0
Original line number Diff line number Diff line
@@ -5,3 +5,5 @@

#define UNUSED_ATTR __attribute__((unused))
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))

typedef uint32_t timeout_t;
+89 −0
Original line number Diff line number Diff line
/******************************************************************************
 *
 *  Copyright (C) 2014 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#pragma once

#include <stdbool.h>
#include <stdint.h>

#include "osi.h"

// This module implements the Reactor pattern.
// See http://en.wikipedia.org/wiki/Reactor_pattern for details.

struct reactor_t;
typedef struct reactor_t reactor_t;

struct reactor_object_t;
typedef struct reactor_object_t reactor_object_t;

// Enumerates the types of events a reactor object is interested
// in responding to.
typedef enum {
  REACTOR_INTEREST_READ  = 1,
  REACTOR_INTEREST_WRITE = 2,
  REACTOR_INTEREST_READ_WRITE = 3,
} reactor_interest_t;

// Enumerates the reasons a reactor has stopped.
typedef enum {
  REACTOR_STATUS_STOP,     // |reactor_stop| was called.
  REACTOR_STATUS_TIMEOUT,  // a timeout was specified and the reactor timed out.
  REACTOR_STATUS_ERROR,    // there was an error during the operation.
  REACTOR_STATUS_DONE,     // the reactor completed its work (for the _run_once* variants).
} reactor_status_t;

struct reactor_object_t {
  void *context;                       // a context that's passed back to the *_ready functions.
  int fd;                              // the file descriptor to monitor for events.
  reactor_interest_t interest;         // the event types to monitor the file descriptor for.

  void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
  void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
};

// Creates a new reactor object. Returns NULL on failure. The returned object
// must be freed by calling |reactor_free|.
reactor_t *reactor_new(void);

// Frees a reactor object created with |reactor_new|. |reactor| may be NULL.
void reactor_free(reactor_t *reactor);

// Starts the reactor. This function blocks the caller until |reactor_stop| is called
// from another thread or in a callback. |reactor| may not be NULL.
reactor_status_t reactor_start(reactor_t *reactor);

// Runs one iteration of the reactor. This function blocks until at least one registered object
// becomes ready. |reactor| may not be NULL.
reactor_status_t reactor_run_once(reactor_t *reactor);

// Same as |reactor_run_once| with a bounded wait time in case no object becomes ready.
reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms);

// Immediately unblocks the reactor. This function is safe to call from any thread.
// |reactor| may not be NULL.
void reactor_stop(reactor_t *reactor);

// Registers an object with the reactor. |obj| is neither copied nor is its ownership transferred
// so the pointer must remain valid until it is unregistered with |reactor_unregister|. Neither
// |reactor| nor |obj| may be NULL.
void reactor_register(reactor_t *reactor, reactor_object_t *obj);

// Unregisters a previously registered object with the |reactor|. Neither |reactor| nor |obj|
// may be NULL.
void reactor_unregister(reactor_t *reactor, reactor_object_t *obj);
+174 −0
Original line number Diff line number Diff line
/******************************************************************************
 *
 *  Copyright (C) 2014 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#define LOG_TAG "bt_osi_reactor"

#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <sys/select.h>
#include <utils/Log.h>

#include "list.h"
#include "reactor.h"

#if !defined(EFD_SEMAPHORE)
#  define EFD_SEMAPHORE (1 << 0)
#endif

struct reactor_t {
  int event_fd;
  list_t *objects;
};

static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv);

reactor_t *reactor_new(void) {
  reactor_t *ret = (reactor_t *)calloc(1, sizeof(reactor_t));
  if (!ret)
    return NULL;

  ret->event_fd = eventfd(0, EFD_SEMAPHORE);
  if (ret->event_fd == -1) {
    ALOGE("%s unable to create eventfd: %s", __func__, strerror(errno));
    goto error;
  }

  ret->objects = list_new(NULL);
  if (!ret->objects)
    goto error;

  return ret;

error:;
  list_free(ret->objects);
  close(ret->event_fd);
  free(ret);
  return NULL;
}

void reactor_free(reactor_t *reactor) {
  if (!reactor)
    return;

  list_free(reactor->objects);
  close(reactor->event_fd);
  free(reactor);
}

reactor_status_t reactor_start(reactor_t *reactor) {
  assert(reactor != NULL);
  return run_reactor(reactor, 0, NULL);
}

reactor_status_t reactor_run_once(reactor_t *reactor) {
  assert(reactor != NULL);
  return run_reactor(reactor, 1, NULL);
}

reactor_status_t reactor_run_once_timeout(reactor_t *reactor, timeout_t timeout_ms) {
  assert(reactor != NULL);

  struct timeval tv;
  tv.tv_sec = timeout_ms / 1000;
  tv.tv_usec = (timeout_ms % 1000) * 1000;
  return run_reactor(reactor, 1, &tv);
}

void reactor_stop(reactor_t *reactor) {
  assert(reactor != NULL);

  eventfd_write(reactor->event_fd, 1);
}

void reactor_register(reactor_t *reactor, reactor_object_t *obj) {
  assert(reactor != NULL);
  assert(obj != NULL);

  list_append(reactor->objects, obj);
}

void reactor_unregister(reactor_t *reactor, reactor_object_t *obj) {
  assert(reactor != NULL);
  assert(obj != NULL);

  list_remove(reactor->objects, obj);
}

// Runs the reactor loop for a maximum of |iterations| with the given timeout, |tv|.
// 0 |iterations| means loop forever.
// NULL |tv| means no timeout (block until an event occurs).
// |reactor| may not be NULL.
static reactor_status_t run_reactor(reactor_t *reactor, int iterations, struct timeval *tv) {
  assert(reactor != NULL);

  for (int i = 0; iterations == 0 || i < iterations; ++i) {
    fd_set read_set;
    fd_set write_set;
    FD_ZERO(&read_set);
    FD_ZERO(&write_set);
    FD_SET(reactor->event_fd, &read_set);

    int max_fd = reactor->event_fd;
    for (const list_node_t *iter = list_begin(reactor->objects); iter != list_end(reactor->objects); iter = list_next(iter)) {
      reactor_object_t *object = (reactor_object_t *)list_node(iter);
      int fd = object->fd;
      reactor_interest_t interest = object->interest;
      if (interest & REACTOR_INTEREST_READ)
        FD_SET(fd, &read_set);
      if (interest & REACTOR_INTEREST_WRITE)
        FD_SET(fd, &write_set);
      if (fd > max_fd)
        max_fd = fd;
    }

    int ret;
    do {
      ret = select(max_fd + 1, &read_set, &write_set, NULL, tv);
    } while (ret == -1 && errno == EINTR);

    if (ret == -1) {
      ALOGE("%s error in select: %s", __func__, strerror(errno));
      return REACTOR_STATUS_ERROR;
    }

    if (ret == 0)
      return REACTOR_STATUS_TIMEOUT;

    if (FD_ISSET(reactor->event_fd, &read_set)) {
      eventfd_t value;
      eventfd_read(reactor->event_fd, &value);
      return REACTOR_STATUS_STOP;
    }

    for (const list_node_t *iter = list_begin(reactor->objects); ret > 0 && iter != list_end(reactor->objects); iter = list_next(iter)) {
      reactor_object_t *object = (reactor_object_t *)list_node(iter);
      int fd = object->fd;
      if (FD_ISSET(fd, &read_set)) {
        object->read_ready(object->context);
        --ret;
      }
      if (FD_ISSET(fd, &write_set)) {
        object->write_ready(object->context);
        --ret;
      }
    }
  }
  return REACTOR_STATUS_DONE;
}
+101 −0
Original line number Diff line number Diff line
#include <gtest/gtest.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>

extern "C" {
#include "reactor.h"
}

static pthread_t thread;
static volatile bool thread_running;

static void *reactor_thread(void *ptr) {
  reactor_t *reactor = (reactor_t *)ptr;

  thread_running = true;
  reactor_start(reactor);
  thread_running = false;

  return NULL;
}

static void spawn_reactor_thread(reactor_t *reactor) {
  int ret = pthread_create(&thread, NULL, reactor_thread, reactor);
  EXPECT_EQ(ret, 0);
}

static void join_reactor_thread() {
  pthread_join(thread, NULL);
}

static uint64_t get_timestamp(void) {
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}

TEST(ReactorTest, reactor_new) {
  reactor_t *reactor = reactor_new();
  EXPECT_TRUE(reactor != NULL);
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_free_null) {
  reactor_free(NULL);
}

TEST(ReactorTest, reactor_stop_start) {
  reactor_t *reactor = reactor_new();
  reactor_stop(reactor);
  reactor_start(reactor);
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_repeated_stop_start) {
  reactor_t *reactor = reactor_new();
  for (int i = 0; i < 10; ++i) {
    reactor_stop(reactor);
    reactor_start(reactor);
  }
  reactor_free(reactor);
}

TEST(ReactorTest, reactor_multi_stop_start) {
  reactor_t *reactor = reactor_new();

  reactor_stop(reactor);
  reactor_stop(reactor);
  reactor_stop(reactor);

  reactor_start(reactor);
  reactor_start(reactor);
  reactor_start(reactor);

  reactor_free(reactor);
}

TEST(ReactorTest, reactor_start_wait_stop) {
  reactor_t *reactor = reactor_new();

  spawn_reactor_thread(reactor);
  usleep(50 * 1000);
  EXPECT_TRUE(thread_running);

  reactor_stop(reactor);
  join_reactor_thread();
  EXPECT_FALSE(thread_running);

  reactor_free(reactor);
}

TEST(ReactorTest, reactor_run_once_timeout) {
  reactor_t *reactor = reactor_new();

  uint64_t start = get_timestamp();
  reactor_status_t status = reactor_run_once_timeout(reactor, 50);
  EXPECT_GE(get_timestamp() - start, 50);
  EXPECT_EQ(status, REACTOR_STATUS_TIMEOUT);

  reactor_free(reactor);
}