Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3241b1d3 authored by Joe Thornber's avatar Joe Thornber Committed by Alasdair G Kergon
Browse files

dm: add persistent data library



The persistent-data library offers a re-usable framework for the storage
and management of on-disk metadata in device-mapper targets.

It's used by the thin-provisioning target in the next patch and in an
upcoming hierarchical storage target.

For further information, please read
Documentation/device-mapper/persistent-data.txt

Signed-off-by: default avatarJoe Thornber <thornber@redhat.com>
Signed-off-by: default avatarMike Snitzer <snitzer@redhat.com>
Signed-off-by: default avatarAlasdair G Kergon <agk@redhat.com>
parent 95d402f0
Loading
Loading
Loading
Loading
+84 −0
Original line number Diff line number Diff line
Introduction
============

The more-sophisticated device-mapper targets require complex metadata
that is managed in kernel.  In late 2010 we were seeing that various
different targets were rolling their own data strutures, for example:

- Mikulas Patocka's multisnap implementation
- Heinz Mauelshagen's thin provisioning target
- Another btree-based caching target posted to dm-devel
- Another multi-snapshot target based on a design of Daniel Phillips

Maintaining these data structures takes a lot of work, so if possible
we'd like to reduce the number.

The persistent-data library is an attempt to provide a re-usable
framework for people who want to store metadata in device-mapper
targets.  It's currently used by the thin-provisioning target and an
upcoming hierarchical storage target.

Overview
========

The main documentation is in the header files which can all be found
under drivers/md/persistent-data.

The block manager
-----------------

dm-block-manager.[hc]

This provides access to the data on disk in fixed sized-blocks.  There
is a read/write locking interface to prevent concurrent accesses, and
keep data that is being used in the cache.

Clients of persistent-data are unlikely to use this directly.

The transaction manager
-----------------------

dm-transaction-manager.[hc]

This restricts access to blocks and enforces copy-on-write semantics.
The only way you can get hold of a writable block through the
transaction manager is by shadowing an existing block (ie. doing
copy-on-write) or allocating a fresh one.  Shadowing is elided within
the same transaction so performance is reasonable.  The commit method
ensures that all data is flushed before it writes the superblock.
On power failure your metadata will be as it was when last committed.

The Space Maps
--------------

dm-space-map.h
dm-space-map-metadata.[hc]
dm-space-map-disk.[hc]

On-disk data structures that keep track of reference counts of blocks.
Also acts as the allocator of new blocks.  Currently two
implementations: a simpler one for managing blocks on a different
device (eg. thinly-provisioned data blocks); and one for managing
the metadata space.  The latter is complicated by the need to store
its own data within the space it's managing.

The data structures
-------------------

dm-btree.[hc]
dm-btree-remove.c
dm-btree-spine.c
dm-btree-internal.h

Currently there is only one data structure, a hierarchical btree.
There are plans to add more.  For example, something with an
array-like interface would see a lot of use.

The btree is 'hierarchical' in that you can define it to be composed
of nested btrees, and take multiple keys.  For example, the
thin-provisioning target uses a btree with two levels of nesting.
The first maps a device id to a mapping tree, and that in turn maps a
virtual block to a physical block.

Values stored in the btrees can have arbitrary size.  Keys are always
64bits, although nesting allows you to use multiple keys.
+8 −0
Original line number Diff line number Diff line
config DM_PERSISTENT_DATA
       tristate
       depends on BLK_DEV_DM && EXPERIMENTAL
       select LIBCRC32C
       select DM_BUFIO
       ---help---
	 Library providing immutable on-disk data structure support for
	 device-mapper targets such as the thin provisioning target.
+11 −0
Original line number Diff line number Diff line
obj-$(CONFIG_DM_PERSISTENT_DATA) += dm-persistent-data.o
dm-persistent-data-objs := \
	dm-block-manager.o \
	dm-space-map-checker.o \
	dm-space-map-common.o \
	dm-space-map-disk.o \
	dm-space-map-metadata.o \
	dm-transaction-manager.o \
	dm-btree.o \
	dm-btree-remove.o \
	dm-btree-spine.o
+620 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2011 Red Hat, Inc.
 *
 * This file is released under the GPL.
 */
#include "dm-block-manager.h"
#include "dm-persistent-data-internal.h"
#include "../dm-bufio.h"

#include <linux/crc32c.h>
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/rwsem.h>
#include <linux/device-mapper.h>
#include <linux/stacktrace.h>

#define DM_MSG_PREFIX "block manager"

/*----------------------------------------------------------------*/

/*
 * This is a read/write semaphore with a couple of differences.
 *
 * i) There is a restriction on the number of concurrent read locks that
 * may be held at once.  This is just an implementation detail.
 *
 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
 * trace is also emitted for the previous lock aquisition.
 *
 * iii) Priority is given to write locks.
 */
#define MAX_HOLDERS 4
#define MAX_STACK 10

typedef unsigned long stack_entries[MAX_STACK];

struct block_lock {
	spinlock_t lock;
	__s32 count;
	struct list_head waiters;
	struct task_struct *holders[MAX_HOLDERS];

#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
	struct stack_trace traces[MAX_HOLDERS];
	stack_entries entries[MAX_HOLDERS];
#endif
};

struct waiter {
	struct list_head list;
	struct task_struct *task;
	int wants_write;
};

static unsigned __find_holder(struct block_lock *lock,
			      struct task_struct *task)
{
	unsigned i;

	for (i = 0; i < MAX_HOLDERS; i++)
		if (lock->holders[i] == task)
			break;

	BUG_ON(i == MAX_HOLDERS);
	return i;
}

/* call this *after* you increment lock->count */
static void __add_holder(struct block_lock *lock, struct task_struct *task)
{
	unsigned h = __find_holder(lock, NULL);
#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
	struct stack_trace *t;
#endif

	get_task_struct(task);
	lock->holders[h] = task;

#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
	t = lock->traces + h;
	t->nr_entries = 0;
	t->max_entries = MAX_STACK;
	t->entries = lock->entries[h];
	t->skip = 2;
	save_stack_trace(t);
#endif
}

/* call this *before* you decrement lock->count */
static void __del_holder(struct block_lock *lock, struct task_struct *task)
{
	unsigned h = __find_holder(lock, task);
	lock->holders[h] = NULL;
	put_task_struct(task);
}

static int __check_holder(struct block_lock *lock)
{
	unsigned i;
#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
	static struct stack_trace t;
	static stack_entries entries;
#endif

	for (i = 0; i < MAX_HOLDERS; i++) {
		if (lock->holders[i] == current) {
			DMERR("recursive lock detected in pool metadata");
#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
			DMERR("previously held here:");
			print_stack_trace(lock->traces + i, 4);

			DMERR("subsequent aquisition attempted here:");
			t.nr_entries = 0;
			t.max_entries = MAX_STACK;
			t.entries = entries;
			t.skip = 3;
			save_stack_trace(&t);
			print_stack_trace(&t, 4);
#endif
			return -EINVAL;
		}
	}

	return 0;
}

static void __wait(struct waiter *w)
{
	for (;;) {
		set_task_state(current, TASK_UNINTERRUPTIBLE);

		if (!w->task)
			break;

		schedule();
	}

	set_task_state(current, TASK_RUNNING);
}

static void __wake_waiter(struct waiter *w)
{
	struct task_struct *task;

	list_del(&w->list);
	task = w->task;
	smp_mb();
	w->task = NULL;
	wake_up_process(task);
}

/*
 * We either wake a few readers or a single writer.
 */
static void __wake_many(struct block_lock *lock)
{
	struct waiter *w, *tmp;

	BUG_ON(lock->count < 0);
	list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
		if (lock->count >= MAX_HOLDERS)
			return;

		if (w->wants_write) {
			if (lock->count > 0)
				return; /* still read locked */

			lock->count = -1;
			__add_holder(lock, w->task);
			__wake_waiter(w);
			return;
		}

		lock->count++;
		__add_holder(lock, w->task);
		__wake_waiter(w);
	}
}

static void bl_init(struct block_lock *lock)
{
	int i;

	spin_lock_init(&lock->lock);
	lock->count = 0;
	INIT_LIST_HEAD(&lock->waiters);
	for (i = 0; i < MAX_HOLDERS; i++)
		lock->holders[i] = NULL;
}

static int __available_for_read(struct block_lock *lock)
{
	return lock->count >= 0 &&
		lock->count < MAX_HOLDERS &&
		list_empty(&lock->waiters);
}

static int bl_down_read(struct block_lock *lock)
{
	int r;
	struct waiter w;

	spin_lock(&lock->lock);
	r = __check_holder(lock);
	if (r) {
		spin_unlock(&lock->lock);
		return r;
	}

	if (__available_for_read(lock)) {
		lock->count++;
		__add_holder(lock, current);
		spin_unlock(&lock->lock);
		return 0;
	}

	get_task_struct(current);

	w.task = current;
	w.wants_write = 0;
	list_add_tail(&w.list, &lock->waiters);
	spin_unlock(&lock->lock);

	__wait(&w);
	put_task_struct(current);
	return 0;
}

static int bl_down_read_nonblock(struct block_lock *lock)
{
	int r;

	spin_lock(&lock->lock);
	r = __check_holder(lock);
	if (r)
		goto out;

	if (__available_for_read(lock)) {
		lock->count++;
		__add_holder(lock, current);
		r = 0;
	} else
		r = -EWOULDBLOCK;

out:
	spin_unlock(&lock->lock);
	return r;
}

static void bl_up_read(struct block_lock *lock)
{
	spin_lock(&lock->lock);
	BUG_ON(lock->count <= 0);
	__del_holder(lock, current);
	--lock->count;
	if (!list_empty(&lock->waiters))
		__wake_many(lock);
	spin_unlock(&lock->lock);
}

static int bl_down_write(struct block_lock *lock)
{
	int r;
	struct waiter w;

	spin_lock(&lock->lock);
	r = __check_holder(lock);
	if (r) {
		spin_unlock(&lock->lock);
		return r;
	}

	if (lock->count == 0 && list_empty(&lock->waiters)) {
		lock->count = -1;
		__add_holder(lock, current);
		spin_unlock(&lock->lock);
		return 0;
	}

	get_task_struct(current);
	w.task = current;
	w.wants_write = 1;

	/*
	 * Writers given priority. We know there's only one mutator in the
	 * system, so ignoring the ordering reversal.
	 */
	list_add(&w.list, &lock->waiters);
	spin_unlock(&lock->lock);

	__wait(&w);
	put_task_struct(current);

	return 0;
}

static void bl_up_write(struct block_lock *lock)
{
	spin_lock(&lock->lock);
	__del_holder(lock, current);
	lock->count = 0;
	if (!list_empty(&lock->waiters))
		__wake_many(lock);
	spin_unlock(&lock->lock);
}

static void report_recursive_bug(dm_block_t b, int r)
{
	if (r == -EINVAL)
		DMERR("recursive acquisition of block %llu requested.",
		      (unsigned long long) b);
}

/*----------------------------------------------------------------*/

/*
 * Block manager is currently implemented using dm-bufio.  struct
 * dm_block_manager and struct dm_block map directly onto a couple of
 * structs in the bufio interface.  I want to retain the freedom to move
 * away from bufio in the future.  So these structs are just cast within
 * this .c file, rather than making it through to the public interface.
 */
static struct dm_buffer *to_buffer(struct dm_block *b)
{
	return (struct dm_buffer *) b;
}

static struct dm_bufio_client *to_bufio(struct dm_block_manager *bm)
{
	return (struct dm_bufio_client *) bm;
}

dm_block_t dm_block_location(struct dm_block *b)
{
	return dm_bufio_get_block_number(to_buffer(b));
}
EXPORT_SYMBOL_GPL(dm_block_location);

void *dm_block_data(struct dm_block *b)
{
	return dm_bufio_get_block_data(to_buffer(b));
}
EXPORT_SYMBOL_GPL(dm_block_data);

struct buffer_aux {
	struct dm_block_validator *validator;
	struct block_lock lock;
	int write_locked;
};

static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
{
	struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
	aux->validator = NULL;
	bl_init(&aux->lock);
}

static void dm_block_manager_write_callback(struct dm_buffer *buf)
{
	struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
	if (aux->validator) {
		aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
			 dm_bufio_get_block_size(dm_bufio_get_client(buf)));
	}
}

/*----------------------------------------------------------------
 * Public interface
 *--------------------------------------------------------------*/
struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
						 unsigned block_size,
						 unsigned cache_size,
						 unsigned max_held_per_thread)
{
	return (struct dm_block_manager *)
		dm_bufio_client_create(bdev, block_size, max_held_per_thread,
				       sizeof(struct buffer_aux),
				       dm_block_manager_alloc_callback,
				       dm_block_manager_write_callback);
}
EXPORT_SYMBOL_GPL(dm_block_manager_create);

void dm_block_manager_destroy(struct dm_block_manager *bm)
{
	return dm_bufio_client_destroy(to_bufio(bm));
}
EXPORT_SYMBOL_GPL(dm_block_manager_destroy);

unsigned dm_bm_block_size(struct dm_block_manager *bm)
{
	return dm_bufio_get_block_size(to_bufio(bm));
}
EXPORT_SYMBOL_GPL(dm_bm_block_size);

dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
{
	return dm_bufio_get_device_size(to_bufio(bm));
}

static int dm_bm_validate_buffer(struct dm_block_manager *bm,
				 struct dm_buffer *buf,
				 struct buffer_aux *aux,
				 struct dm_block_validator *v)
{
	if (unlikely(!aux->validator)) {
		int r;
		if (!v)
			return 0;
		r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(to_bufio(bm)));
		if (unlikely(r))
			return r;
		aux->validator = v;
	} else {
		if (unlikely(aux->validator != v)) {
			DMERR("validator mismatch (old=%s vs new=%s) for block %llu",
				aux->validator->name, v ? v->name : "NULL",
				(unsigned long long)
					dm_bufio_get_block_number(buf));
			return -EINVAL;
		}
	}

	return 0;
}
int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
		    struct dm_block_validator *v,
		    struct dm_block **result)
{
	struct buffer_aux *aux;
	void *p;
	int r;

	p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
	if (unlikely(IS_ERR(p)))
		return PTR_ERR(p);

	aux = dm_bufio_get_aux_data(to_buffer(*result));
	r = bl_down_read(&aux->lock);
	if (unlikely(r)) {
		dm_bufio_release(to_buffer(*result));
		report_recursive_bug(b, r);
		return r;
	}

	aux->write_locked = 0;

	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
	if (unlikely(r)) {
		bl_up_read(&aux->lock);
		dm_bufio_release(to_buffer(*result));
		return r;
	}

	return 0;
}
EXPORT_SYMBOL_GPL(dm_bm_read_lock);

int dm_bm_write_lock(struct dm_block_manager *bm,
		     dm_block_t b, struct dm_block_validator *v,
		     struct dm_block **result)
{
	struct buffer_aux *aux;
	void *p;
	int r;

	p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
	if (unlikely(IS_ERR(p)))
		return PTR_ERR(p);

	aux = dm_bufio_get_aux_data(to_buffer(*result));
	r = bl_down_write(&aux->lock);
	if (r) {
		dm_bufio_release(to_buffer(*result));
		report_recursive_bug(b, r);
		return r;
	}

	aux->write_locked = 1;

	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
	if (unlikely(r)) {
		bl_up_write(&aux->lock);
		dm_bufio_release(to_buffer(*result));
		return r;
	}

	return 0;
}
EXPORT_SYMBOL_GPL(dm_bm_write_lock);

int dm_bm_read_try_lock(struct dm_block_manager *bm,
			dm_block_t b, struct dm_block_validator *v,
			struct dm_block **result)
{
	struct buffer_aux *aux;
	void *p;
	int r;

	p = dm_bufio_get(to_bufio(bm), b, (struct dm_buffer **) result);
	if (unlikely(IS_ERR(p)))
		return PTR_ERR(p);
	if (unlikely(!p))
		return -EWOULDBLOCK;

	aux = dm_bufio_get_aux_data(to_buffer(*result));
	r = bl_down_read_nonblock(&aux->lock);
	if (r < 0) {
		dm_bufio_release(to_buffer(*result));
		report_recursive_bug(b, r);
		return r;
	}
	aux->write_locked = 0;

	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
	if (unlikely(r)) {
		bl_up_read(&aux->lock);
		dm_bufio_release(to_buffer(*result));
		return r;
	}

	return 0;
}

int dm_bm_write_lock_zero(struct dm_block_manager *bm,
			  dm_block_t b, struct dm_block_validator *v,
			  struct dm_block **result)
{
	int r;
	struct buffer_aux *aux;
	void *p;

	p = dm_bufio_new(to_bufio(bm), b, (struct dm_buffer **) result);
	if (unlikely(IS_ERR(p)))
		return PTR_ERR(p);

	memset(p, 0, dm_bm_block_size(bm));

	aux = dm_bufio_get_aux_data(to_buffer(*result));
	r = bl_down_write(&aux->lock);
	if (r) {
		dm_bufio_release(to_buffer(*result));
		return r;
	}

	aux->write_locked = 1;
	aux->validator = v;

	return 0;
}

int dm_bm_unlock(struct dm_block *b)
{
	struct buffer_aux *aux;
	aux = dm_bufio_get_aux_data(to_buffer(b));

	if (aux->write_locked) {
		dm_bufio_mark_buffer_dirty(to_buffer(b));
		bl_up_write(&aux->lock);
	} else
		bl_up_read(&aux->lock);

	dm_bufio_release(to_buffer(b));

	return 0;
}
EXPORT_SYMBOL_GPL(dm_bm_unlock);

int dm_bm_unlock_move(struct dm_block *b, dm_block_t n)
{
	struct buffer_aux *aux;

	aux = dm_bufio_get_aux_data(to_buffer(b));

	if (aux->write_locked) {
		dm_bufio_mark_buffer_dirty(to_buffer(b));
		bl_up_write(&aux->lock);
	} else
		bl_up_read(&aux->lock);

	dm_bufio_release_move(to_buffer(b), n);
	return 0;
}

int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
			   struct dm_block *superblock)
{
	int r;

	r = dm_bufio_write_dirty_buffers(to_bufio(bm));
	if (unlikely(r))
		return r;
	r = dm_bufio_issue_flush(to_bufio(bm));
	if (unlikely(r))
		return r;

	dm_bm_unlock(superblock);

	r = dm_bufio_write_dirty_buffers(to_bufio(bm));
	if (unlikely(r))
		return r;
	r = dm_bufio_issue_flush(to_bufio(bm));
	if (unlikely(r))
		return r;

	return 0;
}

u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
{
	return crc32c(~(u32) 0, data, len) ^ init_xor;
}
EXPORT_SYMBOL_GPL(dm_bm_checksum);

/*----------------------------------------------------------------*/

MODULE_LICENSE("GPL");
MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
MODULE_DESCRIPTION("Immutable metadata library for dm");

/*----------------------------------------------------------------*/
+123 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2011 Red Hat, Inc.
 *
 * This file is released under the GPL.
 */

#ifndef _LINUX_DM_BLOCK_MANAGER_H
#define _LINUX_DM_BLOCK_MANAGER_H

#include <linux/types.h>
#include <linux/blkdev.h>

/*----------------------------------------------------------------*/

/*
 * Block number.
 */
typedef uint64_t dm_block_t;
struct dm_block;

dm_block_t dm_block_location(struct dm_block *b);
void *dm_block_data(struct dm_block *b);

/*----------------------------------------------------------------*/

/*
 * @name should be a unique identifier for the block manager, no longer
 * than 32 chars.
 *
 * @max_held_per_thread should be the maximum number of locks, read or
 * write, that an individual thread holds at any one time.
 */
struct dm_block_manager;
struct dm_block_manager *dm_block_manager_create(
	struct block_device *bdev, unsigned block_size,
	unsigned cache_size, unsigned max_held_per_thread);
void dm_block_manager_destroy(struct dm_block_manager *bm);

unsigned dm_bm_block_size(struct dm_block_manager *bm);
dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm);

/*----------------------------------------------------------------*/

/*
 * The validator allows the caller to verify newly-read data and modify
 * the data just before writing, e.g. to calculate checksums.  It's
 * important to be consistent with your use of validators.  The only time
 * you can change validators is if you call dm_bm_write_lock_zero.
 */
struct dm_block_validator {
	const char *name;
	void (*prepare_for_write)(struct dm_block_validator *v, struct dm_block *b, size_t block_size);

	/*
	 * Return 0 if the checksum is valid or < 0 on error.
	 */
	int (*check)(struct dm_block_validator *v, struct dm_block *b, size_t block_size);
};

/*----------------------------------------------------------------*/

/*
 * You can have multiple concurrent readers or a single writer holding a
 * block lock.
 */

/*
 * dm_bm_lock() locks a block and returns through @result a pointer to
 * memory that holds a copy of that block.  If you have write-locked the
 * block then any changes you make to memory pointed to by @result will be
 * written back to the disk sometime after dm_bm_unlock is called.
 */
int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
		    struct dm_block_validator *v,
		    struct dm_block **result);

int dm_bm_write_lock(struct dm_block_manager *bm, dm_block_t b,
		     struct dm_block_validator *v,
		     struct dm_block **result);

/*
 * The *_try_lock variants return -EWOULDBLOCK if the block isn't
 * available immediately.
 */
int dm_bm_read_try_lock(struct dm_block_manager *bm, dm_block_t b,
			struct dm_block_validator *v,
			struct dm_block **result);

/*
 * Use dm_bm_write_lock_zero() when you know you're going to
 * overwrite the block completely.  It saves a disk read.
 */
int dm_bm_write_lock_zero(struct dm_block_manager *bm, dm_block_t b,
			  struct dm_block_validator *v,
			  struct dm_block **result);

int dm_bm_unlock(struct dm_block *b);

/*
 * An optimisation; we often want to copy a block's contents to a new
 * block.  eg, as part of the shadowing operation.  It's far better for
 * bufio to do this move behind the scenes than hold 2 locks and memcpy the
 * data.
 */
int dm_bm_unlock_move(struct dm_block *b, dm_block_t n);

/*
 * It's a common idiom to have a superblock that should be committed last.
 *
 * @superblock should be write-locked on entry. It will be unlocked during
 * this function.  All dirty blocks are guaranteed to be written and flushed
 * before the superblock.
 *
 * This method always blocks.
 */
int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
			   struct dm_block *superblock);

u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor);

/*----------------------------------------------------------------*/

#endif	/* _LINUX_DM_BLOCK_MANAGER_H */
Loading