Re: [PATCH linux-2.6 01/04] brsem: implement big reader semaphore

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



01_brsem_implement_brsem.patch

	This patch implements big reader semaphore - a rwsem with very
	cheap reader-side operations and very expensive writer-side
	operations.  For details, please read comments at the top of
	kern/brsem.c.

Signed-off-by: Tejun Heo <[email protected]>

 include/linux/brsem.h |  202 +++++++++++
 init/main.c           |    3 
 kernel/Makefile       |    2 
 kernel/brsem.c        |  869 ++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 1075 insertions(+), 1 deletion(-)

Index: linux-work/include/linux/brsem.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux-work/include/linux/brsem.h	2005-09-25 15:42:03.000000000 +0900
@@ -0,0 +1,202 @@
+#ifndef __LINUX_BRSEM_H
+#define __LINUX_BRSEM_H
+
+/*
+ * include/linux/brsem.h		- Big reader rw semaphore
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * Copyright (C) 2005 Tejun Heo <[email protected]>
+ *
+ * See kernel/brsem.c for more information.
+ */
+
+#include <linux/spinlock.h>
+#include <linux/interrupt.h>
+#include <linux/wait.h>
+#include <linux/workqueue.h>
+#include <linux/percpu.h>
+#include <asm/semaphore.h>
+
+DECLARE_PER_CPU(int *, brsem_rcnt_ar);
+
+struct brsem {
+	int idx;
+	spinlock_t lock;
+	long long master_rcnt;
+	wait_queue_head_t read_wait;
+	wait_queue_head_t write_wait;
+	struct semaphore write_mutex;
+	struct work_struct async_work;
+	unsigned flags;
+};
+
+enum {
+	/*
+	 * sem->flags, protected by sem->lock
+	 */
+	BRSEM_F_ALLOCATED		= 0x0001,
+	BRSEM_F_BYPASS			= 0x0002,
+
+	/* Async todo flags */
+	BRSEM_F_ASYNC_UPWRITE		= 0x0100,
+	BRSEM_F_ASYNC_DESTROY		= 0x0200,
+	BRSEM_F_ASYNC_MASK		= 0x0300,
+};
+
+/*
+ * brsem subsys initialization routines, called from init/main.c
+ */
+void brsem_init_early(void);
+void brsem_init(void);
+
+/*
+ * The following initializer and __create_brsem() are for cases where
+ * brsem should be used before brsem_init_early() is finished.
+ */
+#define BRSEM_BYPASS_INITIALIZER	{ .idx = 0, .flags = BRSEM_F_BYPASS }
+
+struct brsem *__create_brsem(struct brsem *sem);
+
+int __brsem_down_read_slow(struct brsem *sem, int interruptible);
+int __brsem_down_read_trylock_slow(struct brsem *sem);
+void __brsem_up_read_slow(struct brsem *sem);
+
+static inline int *__brsem_rcnt(struct brsem *sem)
+{
+	return __get_cpu_var(brsem_rcnt_ar) + sem->idx;
+}
+
+/*
+ * The following *_crit functions can be changed to use
+ * local_irq_disable/enable on architectures where irq switching is
+ * cheaper than bh switching.  See brsem.c for more information.
+ */
+static inline void __brsem_enter_crit(void)
+{
+	local_bh_disable();
+}
+
+static inline void __brsem_leave_crit(void)
+{
+	if (!irqs_disabled())		/* is this cheap on all archs? */
+		local_bh_enable();
+	else
+		__local_bh_enable();
+}
+
+static inline int __brsem_down_read(struct brsem *sem, int interruptible)
+{
+	int *p;
+	might_sleep();
+	__brsem_enter_crit();
+	p = __brsem_rcnt(sem);
+	if (*p - 1 < INT_MAX - 1) {	/* *p != INT_MIN && *p != INT_MAX */
+		(*p)++;
+		__brsem_leave_crit();
+		return 0;
+	}
+	return __brsem_down_read_slow(sem, interruptible);
+}
+
+/*
+ * Public functions
+ */
+
+/**
+ * create_brsem - allocates and initializes a new brsem.  Returns
+ * pointer to the new brsem on success, NULL on failure
+ *
+ * This function may sleep.
+ */
+static inline struct brsem *create_brsem(void)
+{
+	return __create_brsem(NULL);
+}
+
+void destroy_brsem(struct brsem *brsem);
+
+/**
+ * brsem_down_read - read lock the specified brsem
+ *
+ * This function may sleep.
+ */
+static inline void brsem_down_read(struct brsem *sem)
+{
+	__brsem_down_read(sem, 0);
+}
+
+/**
+ * brsem_down_read_interruptible - read lock the specified brsem
+ * (interruptible).  Returns 0 on success and -EINTR if interrupted.
+ *
+ * This function is identical to brsem_down_read except that it may be
+ * interrupted by signal.  This function may sleep.
+ */
+static inline int brsem_down_read_interruptible(struct brsem *sem)
+{
+	return __brsem_down_read(sem, 1);
+}
+
+/**
+ * brsem_down_read_trylock - try to read lock the specified brsem.
+ * Returns 1 on sucess and 0 on failure.
+ *
+ * If the specfied brsem can be acquired without sleeping, it's
+ * acquired and 1 is returned; otherwise, 0 is returned.  This
+ * function doesn't sleep and can be called from anywhere except for
+ * irq handlers.
+ */
+static inline int brsem_down_read_trylock(struct brsem *sem)
+{
+	int *p;
+	BUG_ON(in_irq());
+	__brsem_enter_crit();
+	p = __brsem_rcnt(sem);
+	if (*p - 1 < INT_MAX - 1) {	/* *p != INT_MIN && *p != INT_MAX */
+		(*p)++;
+		__brsem_leave_crit();
+		return 1;
+	}
+	return __brsem_down_read_trylock_slow(sem);
+}
+
+/**
+ * brsem_up_read - read unlock the specified brsem
+ *
+ * This function doesn't sleep and can be called from anywhere except
+ * for irq handlers.
+ */
+static inline void brsem_up_read(struct brsem *sem)
+{
+	int *p;
+	BUG_ON(in_irq());
+	__brsem_enter_crit();
+	p = __brsem_rcnt(sem);
+	if (*p > INT_MIN + 1) {		/* *p != INT_MIN && *p != INT_MIN + 1 */
+		(*p)--;
+		__brsem_leave_crit();
+		return;
+	}
+	__brsem_up_read_slow(sem);
+}
+
+void brsem_down_write(struct brsem *sem);
+int brsem_down_write_interruptible(struct brsem *sem);
+void brsem_up_write(struct brsem *sem);
+void brsem_up_write_async(struct brsem *sem);
+
+#endif /* __LINUX_BRSEM_H */
Index: linux-work/kernel/Makefile
===================================================================
--- linux-work.orig/kernel/Makefile	2005-09-25 15:26:33.000000000 +0900
+++ linux-work/kernel/Makefile	2005-09-25 15:42:03.000000000 +0900
@@ -7,7 +7,7 @@ obj-y     = sched.o fork.o exec_domain.o
 	    sysctl.o capability.o ptrace.o timer.o user.o \
 	    signal.o sys.o kmod.o workqueue.o pid.o \
 	    rcupdate.o intermodule.o extable.o params.o posix-timers.o \
-	    kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o
+	    kthread.o wait.o kfifo.o sys_ni.o posix-cpu-timers.o brsem.o
 
 obj-$(CONFIG_FUTEX) += futex.o
 obj-$(CONFIG_GENERIC_ISA_DMA) += dma.o
Index: linux-work/kernel/brsem.c
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux-work/kernel/brsem.c	2005-09-25 15:42:03.000000000 +0900
@@ -0,0 +1,869 @@
+/*
+ * kernel/brsem.c			- Big reader rw semaphore
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * Copyright (C) 2005 Tejun Heo <[email protected]>
+ *
+ * This file implements big-reader rw semaphore.
+ *
+ * Read locking and unlocking are very cheap - no shared word, no
+ * atomic operation, no memory barrier, just local bh enable/disable,
+ * some cpu-local arithmetics and conditionals.  Of course, we can't
+ * cheat the mother nature and writers take all the overhead plus some
+ * extra.  Write locking and unlocking involve issuing workqueue works
+ * to all active CPUs and waiting for them.
+ *
+ * DO NOT use brsem if updates are frequent.  brsem is intended to be
+ * used for things like file system remount <-> open/write
+ * synchronization or cpu hotplug synchronizaion, where writer side is
+ * _very_ rare while the reader side can be frequent.
+ *
+ * brsem is semantically identical to rwsem except for the followings.
+ *
+ * a. All non-sleeping functions - destroy_brsem,
+ *    brsem_down_read_trylock, brsem_up_read and brsem_up_write_async
+ *    - cannot be called from irq handlers.  They can be called from
+ *    bh or while irq and/or bh are disabled (in_softirq ||
+ *    irqs_disabled) but cannot be called from irq handlers (in_irq).
+ *
+ *    This is because brsem uses bh disable/enable to achieve
+ *    exclusion on local cpu.  This choice is made because switching
+ *    local irq is quite expensive on some architectures.  On
+ *    architectures where local irq switching is cheaper than bh
+ *    switching, changing __brsem_enter/leave_crit to use
+ *    local_irq_disable/enable would be a good idea.  (maybe
+ *    __ARCH_CHEAP_LOCAL_IRQ_OPS)
+ *
+ * b. brsem_up_write needs to sleep.  If write unlocking needs to be
+ *    done while in_atomic, brsem_up_write_async should be used.
+ *
+ *    A dedicated workqueue is used for brsem_up_write_async as
+ *    otherwise deadlock can occur if a work on the same workqueue
+ *    releases a brsem while holding a spin and then tries to regrab
+ *    it after releasing the spin.
+ *
+ *    Note: destroy_brsem also operates asynchronously using the same
+ *    brsem workqueue.
+ *
+ * brsem is different from rcu in that
+ *
+ * a. being a sempahore not a spinlock, readers and writers can sleep
+ *    while holding it.
+ *
+ * b. it actually synchronizes and can be used where transient stale
+ *    data cannot be tolerated or working around is too cumbersome.
+ *
+ * brsem is different from seqlock in that
+ *
+ * a. both readers and writers can sleep while holding it.
+ *
+ * b. it actually synchronizes and can be used where reader side retry
+ *    is impossible or impractical.
+ *
+ * On UP machines, brsem can be trivially replaced by rwsem with
+ * simple macro redirections once rwsem supports interruptible down
+ * operations.
+ */
+
+#include <linux/brsem.h>
+
+#include <linux/idr.h>
+#include <linux/slab.h>
+#include <linux/vmalloc.h>
+#include <linux/notifier.h>
+#include <linux/cpu.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <asm/bitops.h>
+#include <asm/atomic.h>
+
+enum {
+	BRSEM_INITIAL_RCNT_ARLEN	= 32,
+};
+
+static int initial_rcnt_ar[BRSEM_INITIAL_RCNT_ARLEN] __initdata = { INT_MIN };
+
+static int brsem_initialized;
+static spinlock_t brsem_alloc_lock = SPIN_LOCK_UNLOCKED;
+static int brsem_len = BRSEM_INITIAL_RCNT_ARLEN;
+static DEFINE_IDR(brsem_idr);
+
+DEFINE_PER_CPU(int *, brsem_rcnt_ar) = initial_rcnt_ar;
+static DEFINE_PER_CPU(int, brsem_rcnt_arlen) = BRSEM_INITIAL_RCNT_ARLEN;
+
+static struct workqueue_struct *brsem_async_wq;
+
+static void do_async_jobs(void *data);
+
+#define is_bypass(sem) \
+	unlikely((sem)->idx == 0 && (sem)->flags & BRSEM_F_BYPASS)
+
+#define check_idx(sem) \
+	BUG_ON((sem)->idx <= 0 || (sem)->idx >= brsem_len)
+
+static inline void spin_lock_crit(spinlock_t *spin)
+{
+	__brsem_enter_crit();
+	spin_lock(spin);
+}
+
+static inline void spin_unlock_crit(spinlock_t *spin)
+{
+	spin_unlock(spin);
+	__brsem_leave_crit();
+}
+
+/*
+ * Call on all cpus
+ */
+struct coac_work_arg {
+	void (*func)(void *);
+	void *data;
+	atomic_t remaining;
+	struct completion completion;
+};
+
+static void coac_work_fn(void *data)
+{
+	struct coac_work_arg *arg = data;
+
+	arg->func(arg->data);
+
+	smp_mb__before_atomic_dec();
+
+	if (atomic_dec_and_test(&arg->remaining))
+		complete(&arg->completion);
+}
+
+static void coac_schedule_work_per_cpu(void *data)
+{
+	struct work_struct *works = data;
+
+	schedule_work(works + smp_processor_id());
+}
+
+static void call_on_all_cpus(void (*func)(void *), void *data)
+{
+	static DECLARE_MUTEX(coac_mutex); /* serializes uses of coac_works */
+	static struct work_struct coac_works[NR_CPUS];
+	struct coac_work_arg coac_arg;
+	int cpu, local_cpu = -1;
+
+	coac_arg.func = func;
+	coac_arg.data = data;
+	atomic_set(&coac_arg.remaining, num_online_cpus());
+	init_completion(&coac_arg.completion);
+
+	for_each_online_cpu(cpu) {
+		struct work_struct *w = coac_works + cpu;
+		INIT_WORK(w, coac_work_fn, &coac_arg);
+	}
+
+	down(&coac_mutex);
+	lock_cpu_hotplug();
+
+	/*
+	 * If we're on keventd, scheduling work and waiting for it
+	 * will deadlock.  In such cases, @func is invoked directly.
+	 * Note that, if we're not on keventd, we cannot call @func
+	 * directly as we aren't bound to specific cpu and @func
+	 * cannot be called with preemption disabled.
+	 */
+	preempt_disable();
+	if (current_is_keventd())
+		local_cpu = smp_processor_id();
+
+	smp_call_function(coac_schedule_work_per_cpu, coac_works, 1, 0);
+
+	if (local_cpu >= 0) {
+		preempt_enable();
+		func(data);
+		if (atomic_dec_and_test(&coac_arg.remaining))
+			smp_mb__after_atomic_dec();
+		else
+			wait_for_completion(&coac_arg.completion);
+	} else {
+		coac_schedule_work_per_cpu(coac_works);
+		preempt_enable();
+		wait_for_completion(&coac_arg.completion);
+	}
+
+	unlock_cpu_hotplug();
+	up(&coac_mutex);
+}
+
+static void *alloc_rcnt_ar(size_t size)
+{
+	if (size <= PAGE_SIZE)
+		return kmalloc(size, GFP_KERNEL);
+	else
+		return vmalloc(size);
+}
+
+static void free_rcnt_ar(void *ptr, size_t size)
+{
+	if (size <= PAGE_SIZE)
+		kfree(ptr);
+	else
+		vfree(ptr);
+}
+
+/*
+ * While expanding rcnt_ar's, substitution should be done locally on
+ * each cpu.  Note that rcnt_ar's are also allocated by each cpu.
+ * This is simpler to implement and NUMA friendly.
+ *
+ * Once expanded, rcnt_ar's are never shrinked, not even when
+ * expansion on other cpus fail.  Per-cpu rcnt_ar_len's are kept to
+ * maintain integrity and avoid unnecessary reallocation.  Note that
+ * per-cpu rcnt_ar_len's are also necessary when processing cpu
+ * hot-plug events.
+ */
+struct expand_brsem_arg {
+	int new_len;
+	atomic_t failed;
+};
+
+static void expand_brsem_cpucb(void *data)
+{
+	struct expand_brsem_arg *ebarg = data;
+	int len, new_len;
+	size_t size, new_size;
+	int *rcnt_ar, *new_rcnt_ar;
+
+	len = __get_cpu_var(brsem_rcnt_arlen);
+	size = sizeof(rcnt_ar[0]) * len;
+	rcnt_ar = __get_cpu_var(brsem_rcnt_ar);
+
+	new_len = ebarg->new_len;
+	if (len >= new_len)
+		return;
+	new_size = sizeof(new_rcnt_ar[0]) * new_len;
+	new_rcnt_ar = alloc_rcnt_ar(new_size);
+	if (!new_rcnt_ar)
+		goto fail;
+
+	memset((void *)new_rcnt_ar + size, 0, new_size - size);
+
+	__brsem_enter_crit();
+	memcpy(new_rcnt_ar, rcnt_ar, size);
+	__get_cpu_var(brsem_rcnt_ar) = new_rcnt_ar;
+	__get_cpu_var(brsem_rcnt_arlen) = new_len;
+	__brsem_leave_crit();
+
+	free_rcnt_ar(rcnt_ar, size);
+
+	return;
+ fail:
+	atomic_inc(&ebarg->failed);
+}
+
+static int expand_brsem(int target_idx)
+{
+	static DECLARE_MUTEX(expand_mutex);
+	int new_len, res;
+	struct expand_brsem_arg ebarg;
+
+	/*
+	 * brsem rcnt_ar's cannot be expanded until brsem is fully
+	 * initialized.  If the following BUG_ON is ever hit, bump
+	 * BRSEM_INITIAL_RCNT_ARLEN.
+	 */
+	BUG_ON(!brsem_initialized);
+
+	down(&expand_mutex);
+
+	new_len = brsem_len;
+	while (new_len <= target_idx) {
+		new_len <<= 1;
+		if (new_len < 0) {
+			/* Duh... wrapped around? */
+			WARN_ON(1);
+			res = -EBUSY;
+			goto out;
+		}
+	}
+
+	res = 0;
+	if (new_len <= brsem_len)
+		goto out;
+
+	ebarg.new_len = new_len;
+	atomic_set(&ebarg.failed, 0);
+
+	call_on_all_cpus(expand_brsem_cpucb, &ebarg);
+
+	res = -ENOMEM;
+	if (atomic_read(&ebarg.failed))
+		goto out;
+
+	brsem_len = new_len;
+	res = 0;
+ out:
+	up(&expand_mutex);
+	return res;
+}
+
+struct brsem *__create_brsem(struct brsem *sem)
+{
+	struct brsem *allocated_sem = NULL;
+	int idx, res;
+
+	if (!sem) {
+		sem = allocated_sem = kzalloc(sizeof(*sem), GFP_KERNEL);
+		if (!sem)
+			goto out;
+	}
+
+	do {
+		res = idr_pre_get(&brsem_idr, GFP_KERNEL);
+		if (res < 0)
+			goto out_free;
+		spin_lock(&brsem_alloc_lock);
+		res = idr_get_new_above(&brsem_idr, sem, 1, &idx);
+		spin_unlock(&brsem_alloc_lock);
+	} while (res == -EAGAIN);
+
+	if (res < 0)
+		goto out_free;
+
+	while (idx >= brsem_len) {
+		res = expand_brsem(idx);
+		if (res < 0)
+			goto out_idr_remove;
+	}
+
+	sem->idx = idx;
+	spin_lock_init(&sem->lock);
+	init_waitqueue_head(&sem->read_wait);
+	init_waitqueue_head(&sem->write_wait);
+	init_MUTEX(&sem->write_mutex);
+	INIT_WORK(&sem->async_work, do_async_jobs, sem);
+	sem->flags |= allocated_sem ? BRSEM_F_ALLOCATED : 0;
+
+	goto out;
+
+ out_idr_remove:
+	spin_lock(&brsem_alloc_lock);
+	idr_remove(&brsem_idr, idx);
+	spin_unlock(&brsem_alloc_lock);
+ out_free:
+	kfree(allocated_sem);
+	sem = NULL;
+ out:
+	return sem;
+}
+
+/*
+ * This is the heart and soul of brsem write operations.  sync_brsem()
+ * does two things atomically (w.r.t. each cpu) - collecting all
+ * cpu-local reader counts into sem->master_rcnt, and resetting or
+ * locking those rcnts.
+ *
+ * Locking per-cpu rcnts is done by setting them to INT_MIN.  All
+ * reader-side fast path functions fall back to slow path when they
+ * encounter INT_MIN, allowing inter-cpu synchronization and rwsem
+ * semantics to be implemented in slow path proper.
+ */
+struct sync_brsem_arg {
+	struct brsem *sem;
+	int write_locking;
+};
+
+static void sync_brsem_cpucb(void *data)
+{
+	struct sync_brsem_arg *sbarg = data;
+	struct brsem *sem = sbarg->sem;
+	int *p = __brsem_rcnt(sem);
+
+	__brsem_enter_crit();
+
+	/* collect rcnt */
+	if (*p != 0 && *p != INT_MIN) {
+		spin_lock(&sem->lock);
+		sem->master_rcnt += *p;
+		spin_unlock(&sem->lock);
+	}
+
+	/* lock or unlock rcnt */
+	*p = sbarg->write_locking ? INT_MIN : 0;
+
+	__brsem_leave_crit();
+}
+
+static void sync_brsem(struct brsem *sem, int write_locking)
+{
+	int cpu;
+
+	if (brsem_initialized) {
+		struct sync_brsem_arg sbarg = { sem, write_locking };
+		call_on_all_cpus(sync_brsem_cpucb, &sbarg);
+		return;
+	}
+
+	/*
+	 * Workqueue is not operational yet.  Fortunately, we're still
+	 * single threaded.  Sync manually.  Note that lockings are
+	 * not necessary here.  They're done just for consistency.
+	 */
+	lock_cpu_hotplug();
+	spin_lock_crit(&sem->lock);
+	for_each_online_cpu(cpu) {
+		int *p = per_cpu(brsem_rcnt_ar, cpu) + sem->idx;
+		if (*p != INT_MIN)
+			sem->master_rcnt += *p;
+		*p = write_locking ? INT_MIN : 0;
+	}
+	spin_unlock_crit(&sem->lock);
+	unlock_cpu_hotplug();
+}
+
+static void do_destroy_brsem(struct brsem *sem)
+{
+	check_idx(sem);
+
+	sync_brsem(sem, 0);
+	BUG_ON(sem->master_rcnt != 0);
+
+	spin_lock(&brsem_alloc_lock);
+
+	BUG_ON(idr_find(&brsem_idr, sem->idx) != sem);
+	idr_remove(&brsem_idr, sem->idx);
+
+	spin_unlock(&brsem_alloc_lock);
+
+	sem->idx = 0;
+
+	if (sem->flags & BRSEM_F_ALLOCATED)
+		kfree(sem);
+}
+
+static void do_async_jobs(void *data)
+{
+	struct brsem *sem = data;
+	unsigned flags;
+
+	spin_lock_crit(&sem->lock);
+	flags = sem->flags & BRSEM_F_ASYNC_MASK;
+	sem->flags ^= flags;
+	spin_unlock_crit(&sem->lock);
+
+	if (flags & BRSEM_F_ASYNC_UPWRITE)
+		brsem_up_write(sem);
+
+	if (flags & BRSEM_F_ASYNC_DESTROY)
+		do_destroy_brsem(sem);
+}
+
+static void queue_async(struct brsem *sem, unsigned todo)
+{
+	spin_lock_crit(&sem->lock);
+
+	BUG_ON(todo & ~BRSEM_F_ASYNC_MASK || sem->flags & todo);
+	sem->flags |= todo;
+
+	queue_work(brsem_async_wq, &sem->async_work);
+
+	/* sem->lock must be released after the last reference */
+	spin_unlock_crit(&sem->lock);
+}
+
+/**
+ * destroy_brsem - destroy and free the specified brsem
+ *
+ * This function doesn't sleep and can be called from anywhere except
+ * for irq handlers.  See comment at the top of this file for more
+ * information regarding async operations.
+ */
+void destroy_brsem(struct brsem *sem)
+{
+	queue_async(sem, BRSEM_F_ASYNC_DESTROY);
+}
+
+int __brsem_down_read_slow(struct brsem *sem, int interruptible)
+{
+	int *p;
+	DEFINE_WAIT(wait);
+
+	if (is_bypass(sem))
+		goto out;
+	check_idx(sem);
+ retry:
+	p = __brsem_rcnt(sem);
+	switch (*p) {
+	case INT_MIN:
+		/* writer(s) present */
+		prepare_to_wait(&sem->read_wait, &wait,
+				interruptible ? TASK_INTERRUPTIBLE
+					      : TASK_UNINTERRUPTIBLE);
+		__brsem_leave_crit();
+
+		schedule();
+		finish_wait(&sem->read_wait, &wait);
+
+		if (interruptible && signal_pending(current))
+			return -EINTR;
+
+		__brsem_enter_crit();
+		goto retry;
+
+	case INT_MAX:
+		/* local rcnt overflow, dump into master rcnt */
+		spin_lock(&sem->lock);
+		sem->master_rcnt += *p;
+		*p = 1;
+		spin_unlock(&sem->lock);
+		break;
+
+	default:
+		(*p)++;
+	}
+ out:
+	__brsem_leave_crit();
+	return 0;
+}
+
+int __brsem_down_read_trylock_slow(struct brsem *sem)
+{
+	int *p = __brsem_rcnt(sem);
+	int res = 1;
+
+	if (is_bypass(sem))
+		goto out;
+	check_idx(sem);
+
+	if (*p == INT_MIN) {
+		/* writer(s) present */
+		res = 0;
+	} else {
+		/* local rcnt overflow, dump into master rcnt */
+		spin_lock(&sem->lock);
+		sem->master_rcnt += *p;
+		*p = 1;
+		spin_unlock(&sem->lock);
+	}
+ out:
+	__brsem_leave_crit();
+	return res;
+}
+
+void __brsem_up_read_slow(struct brsem *sem)
+{
+	int *p = __brsem_rcnt(sem);
+
+	if (is_bypass(sem))
+		goto out;
+	check_idx(sem);
+
+	spin_lock(&sem->lock);
+
+	if (*p == INT_MIN) {
+		/* writer(s) present.  */
+		if (--sem->master_rcnt == 0)
+			wake_up(&sem->write_wait);
+	} else {
+		/* local rcnt underflow, dump into master rcnt */
+		sem->master_rcnt += *p;
+		*p = -1;
+	}
+
+	spin_unlock(&sem->lock);
+ out:
+	__brsem_leave_crit();
+}
+
+static int __brsem_down_write(struct brsem *sem, int interruptible)
+{
+	int res;
+
+	if (is_bypass(sem))
+		return 0;
+	check_idx(sem);
+
+	if (interruptible) {
+		res = down_interruptible(&sem->write_mutex);
+		if (res < 0)
+			return res;
+	} else
+		down(&sem->write_mutex);
+
+	sync_brsem(sem, 1);
+
+	spin_lock_crit(&sem->lock);
+
+	if (sem->master_rcnt) {
+		/* there are still readers left, wait for them */
+		DEFINE_WAIT(wait);
+
+		BUG_ON(sem->master_rcnt < 0);
+
+		prepare_to_wait(&sem->write_wait, &wait,
+				interruptible ? TASK_INTERRUPTIBLE
+					      : TASK_UNINTERRUPTIBLE);
+		spin_unlock_crit(&sem->lock);
+		schedule();
+		finish_wait(&sem->write_wait, &wait);
+
+		if (interruptible && signal_pending(current)) {
+			sync_brsem(sem, 0);
+			wake_up_all(&sem->read_wait);
+			up(&sem->write_mutex);
+			return -EINTR;
+		}
+		/* we got the lock */
+	} else
+		spin_unlock_crit(&sem->lock);
+
+	return 0;
+}
+
+/**
+ * brsem_down_write - write lock the specified brsem
+ *
+ * This function may sleep.
+ */
+void brsem_down_write(struct brsem *sem)
+{
+	int res = __brsem_down_write(sem, 0);
+	BUG_ON(res != 0);
+}
+
+/**
+ * brsem_down_write_interruptible - write lock the specified brsem
+ * (interruptible).  Returns 0 on success and -EINTR if interrupted.
+ *
+ * This function is identical to brsem_down_write except that it may
+ * be interrupted by signal.  This function may sleep.
+ */
+int brsem_down_write_interruptible(struct brsem *sem)
+{
+	return __brsem_down_write(sem, 1);
+}
+
+/**
+ * brsem_up_write - write unlock the specified brsem
+ *
+ * This function may sleep.
+ */
+void brsem_up_write(struct brsem *sem)
+{
+	if (is_bypass(sem))
+		return;
+	check_idx(sem);
+
+	BUG_ON(sem->master_rcnt);
+	sync_brsem(sem, 0);
+	wake_up_all(&sem->read_wait);
+	up(&sem->write_mutex);
+}
+
+/**
+ * brsem_up_write_async - write unlock the specified brsem asynchronously
+ *
+ * This function schedules write unlock of the specified brsem.  It
+ * can be called from anywhere except irq handlers.  See comment at
+ * the top of this file for more information regarding async
+ * operations.
+ */
+void brsem_up_write_async(struct brsem *sem)
+{
+	queue_async(sem, BRSEM_F_ASYNC_UPWRITE);
+}
+
+static int __cpuinit brsem_cpu_callback(struct notifier_block *nfb,
+					unsigned long action, void *data)
+{
+	int cpu = (unsigned long)data, online_cpu;
+	int *rcnt_ar, *new_rcnt_ar, len, i;
+	struct brsem *sem;
+
+	switch (action) {
+	case CPU_UP_PREPARE:
+		/*
+		 * Using any online cpu's rcnt_arlen as reference is
+		 * safe because it is protected with cpu hotplug lock.
+		 */
+		online_cpu = any_online_cpu(CPU_MASK_ALL);
+		rcnt_ar = per_cpu(brsem_rcnt_ar, online_cpu);
+		len = per_cpu(brsem_rcnt_arlen, online_cpu);
+
+		new_rcnt_ar = alloc_rcnt_ar(sizeof(new_rcnt_ar[0]) * len);
+		if (!new_rcnt_ar)
+			return NOTIFY_BAD;
+
+		/*
+		 * Transition between INT_MIN and any other value is
+		 * protected by cpu hotplug lock.
+		 */
+		for (i = 0; i < len; i++)
+			new_rcnt_ar[i] = rcnt_ar[i] == INT_MIN ? INT_MIN : 0;
+
+		BUG_ON(per_cpu(brsem_rcnt_ar, cpu) ||
+		       per_cpu(brsem_rcnt_arlen, cpu));
+
+		per_cpu(brsem_rcnt_ar, cpu) = new_rcnt_ar;
+		per_cpu(brsem_rcnt_arlen, cpu) = len;
+		break;
+
+#ifdef CONFIG_HOTPLUG_CPU
+	case CPU_UP_CANCELED:
+	case CPU_DEAD:
+		rcnt_ar = per_cpu(brsem_rcnt_ar, cpu);
+		len = per_cpu(brsem_rcnt_arlen, cpu);
+		if (rcnt_ar == NULL)
+			break;
+
+		per_cpu(brsem_rcnt_ar, cpu) = NULL;
+		per_cpu(brsem_rcnt_arlen, cpu) = 0;
+
+		for (i = 0; i < len; i++) {
+			if (rcnt_ar[i] == 0 || rcnt_ar[i] == INT_MIN)
+				continue;
+
+			spin_lock(&brsem_alloc_lock);
+			sem = idr_find(&brsem_idr, i);
+			spin_unlock(&brsem_alloc_lock);
+
+			BUG_ON(!sem || sem->idx != i);
+
+			/*
+			 * All inter-cpu synchronizations occur while
+			 * rcnt_ar is INT_MIN, so the following
+			 * doesn't interfere with any.
+			 */
+			spin_lock_crit(&sem->lock);
+			sem->master_rcnt += rcnt_ar[i];
+			spin_unlock_crit(&sem->lock);
+		}
+
+		free_rcnt_ar(rcnt_ar, sizeof(rcnt_ar[0]) * len);
+		break;
+#endif
+	}
+
+	return NOTIFY_OK;
+}
+
+static struct notifier_block brsem_cpu_notifier =
+	{ brsem_cpu_callback, NULL, 0 };
+
+/*
+ * brsem is initialized in three stages to make it useable as early as
+ * possible in the booting process.
+ *
+ * 1. per_cpu area setup
+ *
+ *  This happens _very_ early in the booting process.  Once this is
+ *  done, all statically allocated brsems initialized with
+ *  BRSEM_BYPASS_INITIALIZER can be passed to brsem functions.  At
+ *  this stage, these brsems don't do anything.  All operations on
+ *  them succeed unconditionally.
+ *
+ *  As only one thread runs on only one cpu in this stage, bypassing
+ *  locking mechanism doesn't cause any problem.
+ *
+ *  brsems cannot be initialized with __create_brsem() or created with
+ *  create_brsem() in this stage.
+ *
+ * 2. brsem_init_early
+ *
+ *  This is done as soon as slab allocator is online.  CPU notifier is
+ *  installed and brsem_idr is initialized.  Workqueue is not working
+ *  yet but the kernel runs only one thread until this stage making
+ *  access to rcnt_ar's of other cpus safe.
+ *
+ *  In this stage, brsem properly operates (not of much use as we're
+ *  still single threaded) and brsems can be initialized or allocated.
+ *
+ * 3. brsem_init
+ *
+ *  After workqueue is ready, brsem_init() is called and brsem becomes
+ *  fully operational.
+ *
+ * Unless brsems are needed before before stage 2, users of brsem
+ * don't have to worry about initialization.  Simply calling
+ * create_brsem() works.
+ *
+ * However, if a brsem is needed before stage 2, it needs to be
+ * allocated manually (either statically or with alloc_bootmem) and
+ * initialized with BRSEM_BYPASS_INITIALIZER before the first use.
+ * The brsem must be initialized with __create_brsem() once stage 2 is
+ * reached.  The initialization can be done at anytime after stage 2
+ * but doing it while things are still single-threaded is strongly
+ * recommended.
+ *
+ * *CAUTION* When initializing a brsem with __create_brsem(), the
+ * brsem MUST NOT have any writer or reader.  brsem doesn't keep track
+ * of its holders while bypassing and initializing it while holders
+ * are present will screw brsem when they release the brsem.
+ */
+void __init brsem_init_early(void)
+{
+	int cpu;
+
+	/*
+	 * per-cpu initializer linked initial rcnt_ar to all cpus.
+	 * Bang all except cpu0.
+	 */
+	for (cpu = 1; cpu < NR_CPUS; cpu++) {
+		per_cpu(brsem_rcnt_ar, cpu) = NULL;
+		per_cpu(brsem_rcnt_arlen, cpu) = 0;
+	}
+
+	register_cpu_notifier(&brsem_cpu_notifier);
+	idr_init(&brsem_idr);
+}
+
+static void __init dummy_cpucb(void *data)
+{
+	/* nothing */
+}
+
+void __init brsem_init(void)
+{
+	int *rcnt_ar, *new_rcnt_ar;
+	size_t size;
+
+	/* Replace cpu0's __initdata rcnt_ar with kmalloc'ed one */
+	rcnt_ar = per_cpu(brsem_rcnt_ar, 0);
+
+	size = sizeof(new_rcnt_ar[0]) * BRSEM_INITIAL_RCNT_ARLEN;
+	new_rcnt_ar = kmalloc(size, GFP_KERNEL);
+	BUG_ON(!new_rcnt_ar);
+
+	memcpy(new_rcnt_ar, rcnt_ar, size);
+	per_cpu(brsem_rcnt_ar, 0) = new_rcnt_ar;
+
+	/* Create async workqueue */
+	brsem_async_wq = create_singlethread_workqueue("brsem");
+	BUG_ON(!brsem_async_wq);
+
+	/* CPU's are all online now and workqueue is working */
+	brsem_initialized = 1;
+
+	/* Make sure other cpus see above change */
+	call_on_all_cpus(dummy_cpucb, NULL);
+}
+
+EXPORT_SYMBOL(__create_brsem);
+EXPORT_SYMBOL(destroy_brsem);
+EXPORT_SYMBOL(__brsem_down_read_slow);
+EXPORT_SYMBOL(__brsem_down_read_trylock_slow);
+EXPORT_SYMBOL(__brsem_up_read_slow);
+EXPORT_SYMBOL(brsem_down_write);
+EXPORT_SYMBOL(brsem_down_write_interruptible);
+EXPORT_SYMBOL(brsem_up_write);
Index: linux-work/init/main.c
===================================================================
--- linux-work.orig/init/main.c	2005-09-25 15:26:33.000000000 +0900
+++ linux-work/init/main.c	2005-09-25 15:42:03.000000000 +0900
@@ -35,6 +35,7 @@
 #include <linux/kernel_stat.h>
 #include <linux/security.h>
 #include <linux/workqueue.h>
+#include <linux/brsem.h>
 #include <linux/profile.h>
 #include <linux/rcupdate.h>
 #include <linux/moduleparam.h>
@@ -511,6 +512,7 @@ asmlinkage void __init start_kernel(void
 	kmem_cache_init();
 	setup_per_cpu_pageset();
 	numa_policy_init();
+	brsem_init_early();
 	if (late_time_init)
 		late_time_init();
 	calibrate_delay();
@@ -605,6 +607,7 @@ static void __init do_basic_setup(void)
 {
 	/* drivers will send hotplug events */
 	init_workqueues();
+	brsem_init();
 	usermodehelper_init();
 	driver_init();
 

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

[Index of Archives]     [Kernel Newbies]     [Netfilter]     [Bugtraq]     [Photo]     [Gimp]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Video 4 Linux]     [Linux for the blind]
  Powered by Linux