[RFC] Races in aio call back path.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi all,

I found some races in the AIO call back path which is the root cause for
a few problems in the AIO code. The race is between the aio call back
path ( executed by softirqs ) and the aio_run_iocb().

(1) Suppose a retry returned -EIOCBRETRY and wait has been queued.

Now, whenever the wait event has been completed, the aio_wake_function()
is invoked by the softirqs. aio_wake_function deletes the wait entry
(iocb->ki_wait.task_list) and invokes kick_iocb() to kick the iocb and
queue it back to the run_list.

Consider a situation like:

SOFTIRQ:
	list_del_init(&wait->task_list);
          ## SOFTIRQ gets scheduled or PROCESS executes on another CPU ##
	kick_iocb(iocb);
PROCESS:
	/*
	 * Issue an additional retry to avoid waiting forever if
	 * no waits were queued (e.g. in case of a short read).
	 */
	if (list_empty(&iocb->ki_wait.task_list))
	kiocbSetKicked(iocb);

So, PROCESS might see the deleted(empty) wait list
(iocb->ki_wait.task_list) and will kick and queue it up and it will be
retried again. Now if the next retry is going to happen before SOFTIRQ
gets a chance to run, we are running into problems.

There are two possiblities:

(a) The iocb may get completed.
	If this happens before the SOFTIRQ enters kick_iocb(iocb), we are about
to kick an iocb which is no more valid. This might cause a Kernel Oops
while trying,
	spin_lock_irqsave(&ctx->ctx_lock, flags);
since the iocb->ki_ctx may be invalid.

(b) The iocb might encounter another wait condition.
	This case iocb would be queued for some events. This would cause the
SOFTIRQ hit
	WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
in queue_kicked_iocb().

(2) Similar races occur due to Kicking the iocb.

i.e,
SOFTIRQ: [in kick_iocb()]
		if(!kiocbTryKick(iocb)) {
	# If we get scheduled here. And PROCESS reaches as below #
			queue_kicked_iocb(iocb);
PROCESS: [ in aio_run_iocb() ]
		/* will make __queue_kicked_iocb succeed from here on */
		INIT_LIST_HEAD(&iocb->ki_run_list);
		/* we must queue the next iteration ourselves, if it
		 * has already been kicked */
		if (kiocbIsKicked(iocb)) { <- Will be true see ( kicked by kick_iocb)
			__queue_kicked_iocb(iocb);
		}

Thus iocb will get queued and may be retried. This again can cause the
problems described above.

Conclusion
===========

 From this I conclude that the following operations in call back path
for async iocbs should be atomic.

(*) Deletion of the wait queue entry.
(*) Kicking the iocb.
(*) Queue back to run_list. ( which is already atomic with ctx_lock held ).

Also the check for waits being queued in aio_run_iocb() should be done
with the ctx_lock held.

Solution
========
I have created a patch which makes above operations to be done with the
iocb->ki_ctx->ctx_lock held. Also, I have moved the check :

  > if (list_empty(&iocb->ki_wait.task_list))
  >	kiocbSetKicked(iocb);

to be done under the ctx_lock held.

Since we already holds a lock before calling queue_kicked_iocb(), there
is no need for a queue_kicked_iocb anymore. So I have renamed the
__queue_kicked_iocb to queue_kicked_iocb.

Testing
========
I tested the proposed patch in 2.6.13-rc3-mm1 on a 2-way Intel Xeon(with HT) with aio-stress and fsx-linux test cases.
The tests ran fine.


Comments ?

regards,

Suzuki K P
Linux Technology Centre
IBM India Software Labs
Bangalore.

"Sorrow keeps u Human ,Failure Keeps u Humble,Success keeps u Glowing.
   But only God Keeps u Going..... "



--- linux-2.6.13-rc3/fs/aio.c.old	2005-07-27 10:19:21.000000000 -0700
+++ linux-2.6.13-rc3/fs/aio.c	2005-07-27 10:22:27.000000000 -0700
@@ -609,7 +609,7 @@ static void unuse_mm(struct mm_struct *m
  * Should be called with the spin lock iocb->ki_ctx->ctx_lock
  * held
  */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
+static inline int queue_kicked_iocb(struct kiocb *iocb)
 {
 	struct kioctx *ctx = iocb->ki_ctx;
 
@@ -724,13 +724,6 @@ static ssize_t aio_run_iocb(struct kiocb
 			aio_complete(iocb, ret, 0);
 			/* must not access the iocb after this */
 		}
-	} else {
-		/*
-		 * Issue an additional retry to avoid waiting forever if
-		 * no waits were queued (e.g. in case of a short read).
-		 */
-		if (list_empty(&iocb->ki_wait.task_list))
-			kiocbSetKicked(iocb);
 	}
 out:
 	spin_lock_irq(&ctx->ctx_lock);
@@ -741,17 +734,23 @@ out:
 		 * and know that there is more left to go,
 		 * this is where we let go so that a subsequent
 		 * "kick" can start the next iteration
+		 *
+		 * Issue an additional retry to avoid waiting forever if
+		 * no waits were queued (e.g. in case of a short read).
+		 * (Should be done with ctx_lock held.)
 		 */
 
-		/* will make __queue_kicked_iocb succeed from here on */
+		if (list_empty(&iocb->ki_wait.task_list))
+			kiocbSetKicked(iocb);
+		/* will make queue_kicked_iocb succeed from here on */
 		INIT_LIST_HEAD(&iocb->ki_run_list);
 		/* we must queue the next iteration ourselves, if it
 		 * has already been kicked */
 		if (kiocbIsKicked(iocb)) {
-			__queue_kicked_iocb(iocb);
+			queue_kicked_iocb(iocb);
 
 			/*
-			 * __queue_kicked_iocb will always return 1 here, because
+			 * queue_kicked_iocb will always return 1 here, because
 			 * iocb->ki_run_list is empty at this point so it should
 			 * be safe to unconditionally queue the context into the
 			 * work queue.
@@ -870,21 +869,31 @@ static void aio_kick_handler(void *data)
 
 
 /*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
+ * Kicking an async iocb.
+ * The following operations for the async iocbs should be atomic
+ * to avoid races with the aio_run_iocb() code.
+ * (1) Deleting the wait queue entry.
+ * (2) Kicking the iocb.
+ * (3) Queue the iocb back to run_list.
+ * Holds the ctx->ctx_lock to avoid races.
  */
-static void queue_kicked_iocb(struct kiocb *iocb)
+static void kick_async_iocb(struct kiocb *iocb)
 {
  	struct kioctx	*ctx = iocb->ki_ctx;
 	unsigned long flags;
 	int run = 0;
-
-	WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
-
+	
 	spin_lock_irqsave(&ctx->ctx_lock, flags);
-	run = __queue_kicked_iocb(iocb);
+	list_del_init(&iocb->ki_wait.task_list);
+	/* If its already kicked we shouldn't queue it again */
+	if (!kiocbTryKick(iocb)) {
+		run = queue_kicked_iocb(iocb);
+	}
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+
+	/* Activate the aio worker queue if we have successfully queued
+	 * the iocb, so that it can be processed
+	 */
 	if (run)
 		aio_queue_work(ctx);
 }
@@ -901,15 +910,13 @@ void fastcall kick_iocb(struct kiocb *io
 	/* sync iocbs are easy: they can only ever be executing from a 
 	 * single context. */
 	if (is_sync_kiocb(iocb)) {
+		list_del_init(&iocb->ki_wait.task_list);
 		kiocbSetKicked(iocb);
 	        wake_up_process(iocb->ki_obj.tsk);
 		return;
-	}
-
-	/* If its already kicked we shouldn't queue it again */
-	if (!kiocbTryKick(iocb)) {
-		queue_kicked_iocb(iocb);
-	}
+	} else 
+		kick_async_iocb(iocb);
+	
 }
 EXPORT_SYMBOL(kick_iocb);
 
@@ -1461,7 +1468,6 @@ static int aio_wake_function(wait_queue_
 {
 	struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
 
-	list_del_init(&wait->task_list);
 	kick_iocb(iocb);
 	return 1;
 }

[Index of Archives]     [Kernel Newbies]     [Netfilter]     [Bugtraq]     [Photo]     [Gimp]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Video 4 Linux]     [Linux for the blind]
  Powered by Linux