Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 94d36a5c authored by Sharvil Nanavati's avatar Sharvil Nanavati
Browse files

Implement a fixed queue.

Change-Id: Ifad7605d0b6e1a57f4767f9de1bed7e99284ded7
parent c29d94ab
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -6,7 +6,9 @@ LOCAL_C_INCLUDES := \
    $(LOCAL_PATH)/include

LOCAL_SRC_FILES := \
    ./src/list.c
    ./src/fixed_queue.c \
    ./src/list.c \
    ./src/semaphore.c

LOCAL_CFLAGS := -std=c99 -Wall -Werror
LOCAL_MODULE := libosi
+17 −0
Original line number Diff line number Diff line
#pragma once

#include "list.h"

struct fixed_queue_t;
typedef struct fixed_queue_t fixed_queue_t;

typedef void (*fixed_queue_free_cb)(void *data);

fixed_queue_t *fixed_queue_new(size_t capacity);

// Freeing a queue that is currently in use (i.e. has waiters
// blocked on it) resuls in undefined behaviour.
void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb);

void fixed_queue_enqueue(fixed_queue_t *queue, void *data);
void *fixed_queue_dequeue(fixed_queue_t *queue);
+10 −0
Original line number Diff line number Diff line
#pragma once

struct semaphore_t;
typedef struct semaphore_t semaphore_t;

semaphore_t *semaphore_new(unsigned int value);
void semaphore_free(semaphore_t *semaphore);

void semaphore_wait(semaphore_t *semaphore);
void semaphore_post(semaphore_t *semaphore);
+94 −0
Original line number Diff line number Diff line
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>

#include "fixed_queue.h"
#include "list.h"
#include "osi.h"
#include "semaphore.h"

typedef struct fixed_queue_t {
  list_t *list;
  semaphore_t *enqueue_sem;
  semaphore_t *dequeue_sem;
  pthread_mutex_t lock;
  size_t capacity;
} fixed_queue_t;

fixed_queue_t *fixed_queue_new(size_t capacity) {
  fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
  if (!ret)
    goto error;

  ret->list = list_new(NULL);
  if (!ret->list)
    goto error;

  ret->enqueue_sem = semaphore_new(capacity);
  if (!ret->enqueue_sem)
    goto error;

  ret->dequeue_sem = semaphore_new(0);
  if (!ret->dequeue_sem)
    goto error;

  pthread_mutex_init(&ret->lock, NULL);
  ret->capacity = capacity;

  return ret;

error:;
  if (ret) {
    list_free(ret->list);
    semaphore_free(ret->enqueue_sem);
    semaphore_free(ret->dequeue_sem);
  }

  free(ret);
  return NULL;
}

void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
  if (!queue)
    return;

  if (free_cb)
    for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
      free_cb(list_node(node));

  list_free(queue->list);
  semaphore_free(queue->enqueue_sem);
  semaphore_free(queue->dequeue_sem);
  pthread_mutex_destroy(&queue->lock);
  free(queue);
}

void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
  assert(queue != NULL);
  assert(data != NULL);

  semaphore_wait(queue->enqueue_sem);

  pthread_mutex_lock(&queue->lock);
  list_append(queue->list, data);
  pthread_mutex_unlock(&queue->lock);

  semaphore_post(queue->dequeue_sem);
}

void *fixed_queue_dequeue(fixed_queue_t *queue) {
  assert(queue != NULL);

  void *ret;

  semaphore_wait(queue->dequeue_sem);

  pthread_mutex_lock(&queue->lock);
  ret = list_front(queue->list);
  list_remove(queue->list, ret);
  pthread_mutex_unlock(&queue->lock);

  semaphore_post(queue->enqueue_sem);

  return ret;
}
+53 −0
Original line number Diff line number Diff line
#define LOG_TAG "osi_semaphore"

#include <assert.h>
#include <errno.h>
#include <string.h>
#include <sys/eventfd.h>
#include <utils/Log.h>

#include "semaphore.h"

#if !defined(EFD_SEMAPHORE)
#  define EFD_SEMAPHORE (1 << 0)
#endif

struct semaphore_t {
  int fd;
};

semaphore_t *semaphore_new(unsigned int value) {
  semaphore_t *ret = malloc(sizeof(semaphore_t));
  if (ret) {
    ret->fd = eventfd(value, EFD_SEMAPHORE);
    if (ret->fd == -1) {
      ALOGE("%s unable to allocate semaphore: %s", __func__, strerror(errno));
      free(ret);
      ret = NULL;
    }
  }
  return ret;
}

void semaphore_free(semaphore_t *semaphore) {
  if (semaphore->fd != -1)
    close(semaphore->fd);
  free(semaphore);
}

void semaphore_wait(semaphore_t *semaphore) {
  assert(semaphore != NULL);
  assert(semaphore->fd != -1);

  uint64_t value;
  if (eventfd_read(semaphore->fd, &value) == -1)
    ALOGE("%s unable to wait on semaphore: %s", __func__, strerror(errno));
}

void semaphore_post(semaphore_t *semaphore) {
  assert(semaphore != NULL);
  assert(semaphore->fd != -1);

  if (eventfd_write(semaphore->fd, 1ULL) == -1)
    ALOGE("%s unable to post to semaphore: %s", __func__, strerror(errno));
}