[PATCH] Enable remote RCU callback processing on SMP systems

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Reposting this patch that I'd submitted back on 1/26.  Any thoughts on
the basic approach in addition to the code itself?  Is it possible to
get this applied?  Note that this should have negligible effect on
those not configuring remote rcu callback processing.


The purpose of the patch I'm proposing is to reduce unexpected rcu
callback latencies on selected cpus.  Latencies caused by processing
certain rcu callbacks can be quite substantial, such as those
concerning file_free_rcu() for example.  This was previously mentioned
in the following posts:

	lkml/2005/10/20/62
	lkml/2005/12/20/93


This patch enables specified cpus to be setup for remote rcu callback
processing.  This can be done either at boot time or dynamically.

The cpus setup for remote callback processing do not process their own
rcu callbacks, but do go through the normal quiescent state processing.
There is no locking being added for the remote callback cpus.  The cpus
not setup as remote callback cpus pull the donelists of callbacks from
the next remote callback cpu and process the list.

Signed-off-by: Dimitri Sivanich <[email protected]>

Index: linux/kernel/rcupdate.c
===================================================================
--- linux.orig/kernel/rcupdate.c	2006-01-26 11:42:39.057950013 -0600
+++ linux/kernel/rcupdate.c	2006-01-26 11:42:55.847318916 -0600
@@ -103,6 +103,91 @@ static atomic_t rcu_barrier_cpu_count;
 static struct semaphore rcu_barrier_sema;
 static struct completion rcu_barrier_completion;
 
+/*
+ * Routines for setting up cpus for remote callback processing.
+ * This is done to improve determinism on specified cpus.
+ */
+#define rcu_remote_online_cpus(_m_) \
+	cpus_and((_m_), cpu_remotercu_map, cpu_online_map)
+
+static int cpu_remotercu_next = -1;
+static cpumask_t cpu_remotercu_map = CPU_MASK_NONE;
+static spinlock_t cpu_remotercu_lock = SPIN_LOCK_UNLOCKED;
+
+static inline int is_remote_rcu(void) {
+	cpumask_t mask;
+
+	rcu_remote_online_cpus(mask);
+	return cpu_isset(smp_processor_id(), mask);
+}
+
+static inline int rcu_remote_rcus(void)
+{
+	cpumask_t mask;
+
+	rcu_remote_online_cpus(mask);
+	return !cpus_empty(mask);
+}
+
+/* Get the next cpu to do remote callback processing on */
+static inline int rcu_next_remotercu(void)
+{
+	cpumask_t mask;
+	unsigned long flags;
+	int cpu;
+
+	rcu_remote_online_cpus(mask);
+	if (cpus_empty(mask))
+		return -1;
+	spin_lock_irqsave(&cpu_remotercu_lock, flags);
+	cpu_remotercu_next=next_cpu(cpu_remotercu_next, mask);
+	if (cpu_remotercu_next >= NR_CPUS) {
+		cpu_remotercu_next = first_cpu(mask);
+	}
+	cpu = cpu_remotercu_next;
+	spin_unlock_irqrestore(&cpu_remotercu_lock, flags);
+
+	return cpu;
+}
+
+int rcu_set_remote_rcu(int cpu) {
+	unsigned long flags;
+
+	if (cpu < NR_CPUS) {
+		spin_lock_irqsave(&cpu_remotercu_lock, flags);
+		cpu_set(cpu, cpu_remotercu_map);
+		spin_unlock_irqrestore(&cpu_remotercu_lock, flags);
+		return 0;
+	} else
+		return 1;
+}
+EXPORT_SYMBOL(rcu_set_remote_rcu);
+
+void rcu_clear_remote_rcu(int cpu) {
+	unsigned long flags;
+
+	if (cpu < NR_CPUS) {
+		spin_lock_irqsave(&cpu_remotercu_lock, flags);
+		cpu_clear(cpu, cpu_remotercu_map);
+		spin_unlock_irqrestore(&cpu_remotercu_lock, flags);
+	}
+}
+EXPORT_SYMBOL(rcu_clear_remote_rcu);
+
+/* Setup the mask of cpus configured for remote callback processing */
+static int __init rcu_remotercu_cpu_setup(char *str)
+{
+	int cpus[NR_CPUS], i;
+
+	str = get_options(str, ARRAY_SIZE(cpus), cpus);
+	cpus_clear(cpu_remotercu_map);
+	for (i = 1; i <= cpus[0]; i++)
+		rcu_set_remote_rcu(cpus[i]);
+	cpu_remotercu_next = first_cpu(cpu_remotercu_map);
+	return 1;
+}
+
+__setup ("remotercu=", rcu_remotercu_cpu_setup);
 /**
  * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
  * @head: structure to be used for queueing the RCU updates.
@@ -351,6 +436,8 @@ static void rcu_offline_cpu(int cpu)
 	struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
 	struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
 
+	rcu_clear_remote_rcu(cpu);
+
 	__rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
 					&per_cpu(rcu_data, cpu));
 	__rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
@@ -411,14 +498,62 @@ static void __rcu_process_callbacks(stru
 		local_irq_enable();
 	}
 	rcu_check_quiescent_state(rcp, rdp);
-	if (rdp->donelist)
+	/* Prevent remote cpu's from accessing donelist */
+	if (likely(!is_remote_rcu()))
+		rdp->doneself = 0;
+	if (rdp->donelist && !is_remote_rcu()) {
 		rcu_do_batch(rdp);
+	}
+	if (unlikely(is_remote_rcu()))
+		rdp->doneself = 1;
+}
+
+static inline void __rcu_process_remote_callbacks(void)
+{
+	struct rcu_data *rdp;
+	struct rcu_head * list = NULL;
+	struct rcu_head * list_bh = NULL;
+	int cpu;
+
+
+	if (likely(!rcu_remote_rcus() || is_remote_rcu())) {
+		return;
+	}
+	cpu = rcu_next_remotercu();
+	/* Just in case.. */
+	if (unlikely(cpu == -1)) {
+		return;
+	}
+	rdp = &per_cpu(rcu_data, cpu);
+	/*
+	 * The xchg prevents multiple cpus from processing the same
+	 * list simulataneously.
+	 */
+	if (rdp->doneself && (list = xchg(&rdp->donelist, NULL))!=NULL) {
+		rdp->count = 0;
+		rdp->donetail = &rdp->donelist;
+	}
+	rdp = &per_cpu(rcu_bh_data, cpu);
+	if (rdp->doneself && (list_bh = xchg(&rdp->donelist, NULL))!=NULL) {
+		rdp->count = 0;
+		rdp->donetail = &rdp->donelist;
+	}
+
+	while (list) {
+		list->func(list);
+		list = list->next;
+	}
+	while (list_bh) {
+		list_bh->func(list_bh);
+		list_bh = list_bh->next;
+	}
 }
 
 static void rcu_process_callbacks(unsigned long unused)
 {
 	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
 	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+	__rcu_process_remote_callbacks();
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
@@ -447,6 +582,9 @@ static int __rcu_pending(struct rcu_ctrl
 
 int rcu_pending(int cpu)
 {
+	/* If any cpus setup for remote callbacks, schedule tasklet anyway. */
+	if (unlikely(rcu_remote_rcus()) && !is_remote_rcu())
+		tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
 	return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
 		__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
 }
Index: linux/include/linux/rcupdate.h
===================================================================
--- linux.orig/include/linux/rcupdate.h	2006-01-26 11:42:39.065761747 -0600
+++ linux/include/linux/rcupdate.h	2006-01-26 11:42:55.863918850 -0600
@@ -103,6 +103,7 @@ struct rcu_data {
 	struct rcu_head **curtail;
 	struct rcu_head *donelist;
 	struct rcu_head **donetail;
+	int doneself;
 	int cpu;
 	struct rcu_head barrier;
 };

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

[Index of Archives]     [Kernel Newbies]     [Netfilter]     [Bugtraq]     [Photo]     [Stuff]     [Gimp]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Video 4 Linux]     [Linux for the blind]     [Linux Resources]
  Powered by Linux