Re: [patch] convert aio event reap to use atomic-op instead of spin_lock

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 4/11/07, Zach Brown <[email protected]> wrote:
First, I'll NAK this and all AIO patches until the patch description
says that it's been run through the regression tests that we've started
collecting in autotest.  They're trivial to run, never fear:

 cd /usr/local
 svn checkout svn://test.kernel.org/autotest/trunk/client autotest
 cd autotest
 bin/autotest tests/aio_dio_bugs/control

I ran through the autotest (with bug fix in the test code).  It passes
the regression tests. I made the following change since last rev:

* struct aio_ring stays unchanged.  I added a cast to atomic_cmpxchg()
* aio_ring_event() has grown too big, I turned that into a function
* take out white space change from previous rev.

open issue that this patch does not try to fix:
* kmap address alias needs a flush_dcache_page

Zach, does this make you more comfortable? (or maybe less iffy is the
right word?)


From: Ken Chen <[email protected]>

Resurrect an old patch that uses atomic operation to update ring buffer
index on AIO event queue.  This work allows further application/libaio
optimization to run fast path io_getevents in user space.

I've also added one more change on top of old implementation that rounds
ring buffer size to power of 2 to allow fast head/tail calculation. With
the new scheme, there is no more modulo operation on them and we simply
increment either pointer index directly.  This scheme also automatically
handles integer wrap nicely without any additional special treatment.

Patch was stress tested and in addition, tested on
autotest:aio_dio_bugs.  The results were all pass:  Output of
autotest:aio_dio_bugs:
ran for 200 seconds without error, passing
AIO read of last block in file succeeded.
aio-free-ring-with-bogus-nr-pages: Success!
aio-io-setup-with-nonwritable-context-pointer: Success!
GOOD aio_dio_bugs completed successfully


Signed-off-by: Ken Chen <[email protected]>
diff --git a/fs/aio.c b/fs/aio.c
index e4598d6..f8b8e45 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -108,8 +108,8 @@ static int aio_setup_ring(struct kioctx *ctx)
 	unsigned long size;
 	int nr_pages;
 
-	/* Compensate for the ring buffer's head/tail overlap entry */
-	nr_events += 2;	/* 1 is required, 2 for good luck */
+	/* round nr_event to next power of 2 */
+	nr_events = roundup_pow_of_two(nr_events);
 
 	size = sizeof(struct aio_ring);
 	size += sizeof(struct io_event) * nr_events;
@@ -118,8 +118,6 @@ static int aio_setup_ring(struct kioctx *ctx)
 	if (nr_pages < 0)
 		return -EINVAL;
 
-	nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
-
 	info->nr = 0;
 	info->ring_pages = info->internal_pages;
 	if (nr_pages > AIO_RING_PAGES) {
@@ -177,14 +175,16 @@ static int aio_setup_ring(struct kioctx *ctx)
 #define AIO_EVENTS_FIRST_PAGE	((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 #define AIO_EVENTS_OFFSET	(AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 
-#define aio_ring_event(info, nr, km) ({					\
-	unsigned pos = (nr) + AIO_EVENTS_OFFSET;			\
-	struct io_event *__event;					\
-	__event = kmap_atomic(						\
-			(info)->ring_pages[pos / AIO_EVENTS_PER_PAGE], km); \
-	__event += pos % AIO_EVENTS_PER_PAGE;				\
-	__event;							\
-})
+static struct io_event *aio_ring_event(struct aio_ring_info *info,
+					unsigned nr, enum km_type km)
+{
+	unsigned pos = (nr & (info->nr - 1)) + AIO_EVENTS_OFFSET;
+	struct io_event *event;
+
+	event = kmap_atomic(info->ring_pages[pos / AIO_EVENTS_PER_PAGE], km);
+	event += pos % AIO_EVENTS_PER_PAGE;
+	return event;
+}
 
 #define put_aio_ring_event(event, km) do {	\
 	struct io_event *__event = (event);	\
@@ -220,7 +220,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 
 	atomic_set(&ctx->users, 1);
 	spin_lock_init(&ctx->ctx_lock);
-	spin_lock_init(&ctx->ring_info.ring_lock);
 	init_waitqueue_head(&ctx->wait);
 
 	INIT_LIST_HEAD(&ctx->active_reqs);
@@ -927,7 +926,7 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2)
 	struct aio_ring	*ring;
 	struct io_event	*event;
 	unsigned long	flags;
-	unsigned long	tail;
+	unsigned	tail;
 	int		ret;
 
 	/*
@@ -969,8 +968,6 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2)
 
 	tail = info->tail;
 	event = aio_ring_event(info, tail, KM_IRQ0);
-	if (++tail >= info->nr)
-		tail = 0;
 
 	event->obj = (u64)(unsigned long)iocb->ki_obj.user;
 	event->data = iocb->ki_user_data;
@@ -986,13 +983,14 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2)
 	 */
 	smp_wmb();	/* make event visible before updating tail */
 
+	tail++;
 	info->tail = tail;
 	ring->tail = tail;
 
 	put_aio_ring_event(event, KM_IRQ0);
 	kunmap_atomic(ring, KM_IRQ1);
 
-	pr_debug("added to ring %p at [%lu]\n", iocb, tail);
+	pr_debug("added to ring %p at [%u]\n", iocb, tail);
 put_rq:
 	/* everything turned out well, dispose of the aiocb. */
 	ret = __aio_put_req(ctx, iocb);
@@ -1007,39 +1005,35 @@ put_rq:
 /* aio_read_evt
  *	Pull an event off of the ioctx's event ring.  Returns the number of 
  *	events fetched (0 or 1 ;-)
- *	FIXME: make this use cmpxchg.
  *	TODO: make the ringbuffer user mmap()able (requires FIXME).
  */
 static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
 {
 	struct aio_ring_info *info = &ioctx->ring_info;
 	struct aio_ring *ring;
-	unsigned long head;
-	int ret = 0;
+	struct io_event *evp;
+	unsigned head;
+	int ret;
 
 	ring = kmap_atomic(info->ring_pages[0], KM_USER0);
 	dprintk("in aio_read_evt h%lu t%lu m%lu\n",
 		 (unsigned long)ring->head, (unsigned long)ring->tail,
 		 (unsigned long)ring->nr);
 
-	if (ring->head == ring->tail)
-		goto out;
-
-	spin_lock(&info->ring_lock);
-
-	head = ring->head % info->nr;
-	if (head != ring->tail) {
-		struct io_event *evp = aio_ring_event(info, head, KM_USER1);
+	do {
+		head = ring->head;
+		if (head == ring->tail) {
+			ret = 0;
+			break;
+		}
+		evp = aio_ring_event(info, head, KM_USER1);
 		*ent = *evp;
-		head = (head + 1) % info->nr;
 		smp_mb(); /* finish reading the event before updatng the head */
-		ring->head = head;
 		ret = 1;
 		put_aio_ring_event(evp, KM_USER1);
-	}
-	spin_unlock(&info->ring_lock);
+	} while (head != atomic_cmpxchg((atomic_t *) &ring->head,
+					head, head + 1));
 
-out:
 	kunmap_atomic(ring, KM_USER0);
 	dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
 		 (unsigned long)ring->head, (unsigned long)ring->tail);
diff --git a/include/linux/aio.h b/include/linux/aio.h
index a30ef13..0cd66a3 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -156,7 +156,7 @@ struct aio_ring {
 	struct io_event		io_events[0];
 }; /* 128 bytes + ring size */
 
-#define aio_ring_avail(info, ring)	(((ring)->head + (info)->nr - 1 - (ring)->tail) % (info)->nr)
+#define aio_ring_avail(info, ring) ((info)->nr + (ring)->head - (ring)->tail)
 
 #define AIO_RING_PAGES	8
 struct aio_ring_info {
@@ -164,7 +164,6 @@ struct aio_ring_info {
 	unsigned long		mmap_size;
 
 	struct page		**ring_pages;
-	spinlock_t		ring_lock;
 	long			nr_pages;
 
 	unsigned		nr, tail;

[Index of Archives]     [Kernel Newbies]     [Netfilter]     [Bugtraq]     [Photo]     [Stuff]     [Gimp]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Video 4 Linux]     [Linux for the blind]     [Linux Resources]
  Powered by Linux