Re: [take33 10/10] kevent: Kevent based AIO (aio_sendfile()/aio_sendfile_path()).

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Since you are implementing new APIs here, have you considered doing an
aio_sendfilev to be able to send a header with the data ?

Regards
Suparna

On Wed, Jan 17, 2007 at 09:30:35AM +0300, Evgeniy Polyakov wrote:
> 
> Kevent based AIO (aio_sendfile()/aio_sendfile_path()).
> 
> aio_sendfile()/aio_sendfile_path() contains of two major parts: AIO
> state machine and page processing code.
> The former is just a small subsystem, which allows to queue callback
> for theirs invocation in process' context on behalf of pool of kernel
> threads. It allows to queue caches of callbacks to the local thread
> or to any other specified. Each cache of callbacks is processed until
> there are callbacks in it, callbacks can requeue themselfs into the
> same cache.
> 
> Real work is being done in page processing code - code which populates
> pages into VFS cache and then sends pages to the destination socket
> via ->sendpage(). Unlike previous aio_sendfile() implementation, new
> one does not require low-level filesystem specific callbacks (->get_block())
> at all, instead I extended struct address_space_operations to contain new
> member called ->aio_readpages(), which is exactly the same as ->readpage()
> (read: mpage_readpages()) except different BIO allocation and sumbission
> routines. I changed mpage_readpages() to provide mpage_alloc() and
> mpage_bio_submit() to the new function called __mpage_readpages(), which is
> exactly old mpage_readpages() with provided callback invocation instead of
> usage for old functions. mpage_readpages_aio() provides kevent specific
> callbacks, which calls old functions, but with different destructor callbacks,
> which are essentially the same, except that they reschedule AIO processing.
> 
> aio_sendfile_path() is essentially aio_sendfile(), except that it takes
> source filename as parameter and returns opened file descriptor.
> 
> Benchmark of the 100 1MB files transfer (files are in VFS already) using sync
> sendfile() against aio_sendfile_path() shows about 10MB/sec performance win
> (78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server is one-way
> AMD Athlong 64 3500+) for aio_sendfile_path().
> 
> AIO state machine is a base for network AIO (which becomes
> quite trivial), but I will not start implementation until
> roadback of kevent as a whole and AIO implementation become more clear.
> 
> Signed-off-by: Evgeniy Polyakov <[email protected]>
> 
> diff --git a/fs/bio.c b/fs/bio.c
> index 7618bcb..291e7e8 100644
> --- a/fs/bio.c
> +++ b/fs/bio.c
> @@ -120,7 +120,7 @@ void bio_free(struct bio *bio, struct bio_set *bio_set)
>  /*
>   * default destructor for a bio allocated with bio_alloc_bioset()
>   */
> -static void bio_fs_destructor(struct bio *bio)
> +void bio_fs_destructor(struct bio *bio)
>  {
>  	bio_free(bio, fs_bio_set);
>  }
> diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
> index beaf25f..f08c957 100644
> --- a/fs/ext3/inode.c
> +++ b/fs/ext3/inode.c
> @@ -1650,6 +1650,13 @@ ext3_readpages(struct file *file, struct address_space *mapping,
>  	return mpage_readpages(mapping, pages, nr_pages, ext3_get_block);
>  }
> 
> +static int
> +ext3_readpages_aio(struct file *file, struct address_space *mapping,
> +		struct list_head *pages, unsigned nr_pages, void *priv)
> +{
> +	return mpage_readpages_aio(mapping, pages, nr_pages, ext3_get_block, priv);
> +}
> +
>  static void ext3_invalidatepage(struct page *page, unsigned long offset)
>  {
>  	journal_t *journal = EXT3_JOURNAL(page->mapping->host);
> @@ -1768,6 +1775,7 @@ static int ext3_journalled_set_page_dirty(struct page *page)
>  }
> 
>  static const struct address_space_operations ext3_ordered_aops = {
> +	.aio_readpages	= ext3_readpages_aio,
>  	.readpage	= ext3_readpage,
>  	.readpages	= ext3_readpages,
>  	.writepage	= ext3_ordered_writepage,
> diff --git a/fs/mpage.c b/fs/mpage.c
> index 692a3e5..e5ba44b 100644
> --- a/fs/mpage.c
> +++ b/fs/mpage.c
> @@ -102,7 +102,7 @@ static struct bio *mpage_bio_submit(int rw, struct bio *bio)
>  static struct bio *
>  mpage_alloc(struct block_device *bdev,
>  		sector_t first_sector, int nr_vecs,
> -		gfp_t gfp_flags)
> +		gfp_t gfp_flags, void *priv)
>  {
>  	struct bio *bio;
> 
> @@ -116,6 +116,7 @@ mpage_alloc(struct block_device *bdev,
>  	if (bio) {
>  		bio->bi_bdev = bdev;
>  		bio->bi_sector = first_sector;
> +		bio->bi_private = priv;
>  	}
>  	return bio;
>  }
> @@ -175,7 +176,10 @@ map_buffer_to_page(struct page *page, struct buffer_head *bh, int page_block)
>  static struct bio *
>  do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
>  		sector_t *last_block_in_bio, struct buffer_head *map_bh,
> -		unsigned long *first_logical_block, get_block_t get_block)
> +		unsigned long *first_logical_block, get_block_t get_block,
> +		struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector,
> +			int nr_vecs, gfp_t gfp_flags, void *priv),
> +		struct bio *(*submit)(int rw, struct bio *bio), void *priv)
>  {
>  	struct inode *inode = page->mapping->host;
>  	const unsigned blkbits = inode->i_blkbits;
> @@ -302,25 +306,25 @@ do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
>  	 * This page will go to BIO.  Do we need to send this BIO off first?
>  	 */
>  	if (bio && (*last_block_in_bio != blocks[0] - 1))
> -		bio = mpage_bio_submit(READ, bio);
> +		bio = submit(READ, bio);
> 
>  alloc_new:
>  	if (bio == NULL) {
> -		bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
> +		bio = alloc(bdev, blocks[0] << (blkbits - 9),
>  			  	min_t(int, nr_pages, bio_get_nr_vecs(bdev)),
> -				GFP_KERNEL);
> +				GFP_KERNEL, priv);
>  		if (bio == NULL)
>  			goto confused;
>  	}
> 
>  	length = first_hole << blkbits;
>  	if (bio_add_page(bio, page, length, 0) < length) {
> -		bio = mpage_bio_submit(READ, bio);
> +		bio = submit(READ, bio);
>  		goto alloc_new;
>  	}
> 
>  	if (buffer_boundary(map_bh) || (first_hole != blocks_per_page))
> -		bio = mpage_bio_submit(READ, bio);
> +		bio = submit(READ, bio);
>  	else
>  		*last_block_in_bio = blocks[blocks_per_page - 1];
>  out:
> @@ -328,7 +332,7 @@ out:
> 
>  confused:
>  	if (bio)
> -		bio = mpage_bio_submit(READ, bio);
> +		bio = submit(READ, bio);
>  	if (!PageUptodate(page))
>  	        block_read_full_page(page, get_block);
>  	else
> @@ -336,6 +340,48 @@ confused:
>  	goto out;
>  }
> 
> +int
> +__mpage_readpages(struct address_space *mapping, struct list_head *pages,
> +				unsigned nr_pages, get_block_t get_block,
> +		struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector,
> +			int nr_vecs, gfp_t gfp_flags, void *priv),
> +		struct bio *(*submit)(int rw, struct bio *bio),
> +		void *priv)
> +{
> +	struct bio *bio = NULL;
> +	unsigned page_idx;
> +	sector_t last_block_in_bio = 0;
> +	struct pagevec lru_pvec;
> +	struct buffer_head map_bh;
> +	unsigned long first_logical_block = 0;
> +
> +	clear_buffer_mapped(&map_bh);
> +	pagevec_init(&lru_pvec, 0);
> +	for (page_idx = 0; page_idx < nr_pages; page_idx++) {
> +		struct page *page = list_entry(pages->prev, struct page, lru);
> +
> +		prefetchw(&page->flags);
> +		list_del(&page->lru);
> +		if (!add_to_page_cache(page, mapping,
> +					page->index, GFP_KERNEL)) {
> +			bio = do_mpage_readpage(bio, page,
> +					nr_pages - page_idx,
> +					&last_block_in_bio, &map_bh,
> +					&first_logical_block,
> +					get_block, alloc, submit, priv);
> +			if (!pagevec_add(&lru_pvec, page))
> +				__pagevec_lru_add(&lru_pvec);
> +		} else {
> +			page_cache_release(page);
> +		}
> +	}
> +	pagevec_lru_add(&lru_pvec);
> +	BUG_ON(!list_empty(pages));
> +	if (bio)
> +		submit(READ, bio);
> +	return 0;
> +}
> +
>  /**
>   * mpage_readpages - populate an address space with some pages, and
>   *                       start reads against them.
> @@ -386,40 +432,28 @@ int
>  mpage_readpages(struct address_space *mapping, struct list_head *pages,
>  				unsigned nr_pages, get_block_t get_block)
>  {
> -	struct bio *bio = NULL;
> -	unsigned page_idx;
> -	sector_t last_block_in_bio = 0;
> -	struct pagevec lru_pvec;
> -	struct buffer_head map_bh;
> -	unsigned long first_logical_block = 0;
> +	return __mpage_readpages(mapping, pages, nr_pages, get_block,
> +			mpage_alloc, mpage_bio_submit, NULL);
> +}
> +EXPORT_SYMBOL(mpage_readpages);
> 
> -	clear_buffer_mapped(&map_bh);
> -	pagevec_init(&lru_pvec, 0);
> -	for (page_idx = 0; page_idx < nr_pages; page_idx++) {
> -		struct page *page = list_entry(pages->prev, struct page, lru);
> +#ifdef CONFIG_KEVENT_AIO
> +extern struct bio *kaio_mpage_alloc(struct block_device *bdev, sector_t first_sector,
> +		int nr_vecs, gfp_t gfp_flags, void *priv);
> +extern struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio);
> +#else
> +#define kaio_mpage_alloc	mpage_alloc
> +#define kaio_mpage_bio_submit	mpage_bio_submit
> +#endif
> 
> -		prefetchw(&page->flags);
> -		list_del(&page->lru);
> -		if (!add_to_page_cache(page, mapping,
> -					page->index, GFP_KERNEL)) {
> -			bio = do_mpage_readpage(bio, page,
> -					nr_pages - page_idx,
> -					&last_block_in_bio, &map_bh,
> -					&first_logical_block,
> -					get_block);
> -			if (!pagevec_add(&lru_pvec, page))
> -				__pagevec_lru_add(&lru_pvec);
> -		} else {
> -			page_cache_release(page);
> -		}
> -	}
> -	pagevec_lru_add(&lru_pvec);
> -	BUG_ON(!list_empty(pages));
> -	if (bio)
> -		mpage_bio_submit(READ, bio);
> -	return 0;
> +int
> +mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
> +				unsigned nr_pages, get_block_t get_block, void *priv)
> +{
> +	return __mpage_readpages(mapping, pages, nr_pages, get_block,
> +			kaio_mpage_alloc, kaio_mpage_bio_submit, priv);
>  }
> -EXPORT_SYMBOL(mpage_readpages);
> +EXPORT_SYMBOL(mpage_readpages_aio);
> 
>  /*
>   * This isn't called much at all
> @@ -433,7 +467,8 @@ int mpage_readpage(struct page *page, get_block_t get_block)
> 
>  	clear_buffer_mapped(&map_bh);
>  	bio = do_mpage_readpage(bio, page, 1, &last_block_in_bio,
> -			&map_bh, &first_logical_block, get_block);
> +			&map_bh, &first_logical_block, get_block,
> +			mpage_alloc, mpage_bio_submit, NULL);
>  	if (bio)
>  		mpage_bio_submit(READ, bio);
>  	return 0;
> @@ -595,7 +630,7 @@ page_is_mapped:
>  alloc_new:
>  	if (bio == NULL) {
>  		bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
> -				bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH);
> +				bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH, NULL);
>  		if (bio == NULL)
>  			goto confused;
>  	}
> diff --git a/include/linux/mpage.h b/include/linux/mpage.h
> index cc5fb75..accdbdd 100644
> --- a/include/linux/mpage.h
> +++ b/include/linux/mpage.h
> @@ -16,6 +16,8 @@ typedef int (writepage_t)(struct page *page, struct writeback_control *wbc);
> 
>  int mpage_readpages(struct address_space *mapping, struct list_head *pages,
>  				unsigned nr_pages, get_block_t get_block);
> +int mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
> +				unsigned nr_pages, get_block_t get_block, void *priv);
>  int mpage_readpage(struct page *page, get_block_t get_block);
>  int mpage_writepages(struct address_space *mapping,
>  		struct writeback_control *wbc, get_block_t get_block);
> diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c
> new file mode 100644
> index 0000000..d4c1c5f
> --- /dev/null
> +++ b/kernel/kevent/kevent_aio.c
> @@ -0,0 +1,881 @@
> +/*
> + * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
> + * All rights reserved.
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
> + */
> +
> +#include <linux/kernel.h>
> +#include <linux/module.h>
> +#include <linux/types.h>
> +#include <linux/spinlock.h>
> +#include <linux/list.h>
> +#include <linux/kthread.h>
> +#include <linux/slab.h>
> +#include <linux/bio.h>
> +#include <linux/pagemap.h>
> +#include <linux/file.h>
> +#include <linux/swap.h>
> +#include <linux/kevent.h>
> +
> +#define KAIO_CALL_NUM		8
> +#define KAIO_THREAD_NUM		8
> +
> +//#define KEVENT_AIO_DEBUG
> +
> +#ifdef KEVENT_AIO_DEBUG
> +#define dprintk(f, a...) printk(f, ##a)
> +#else
> +#define dprintk(f, a...) do {} while (0)
> +#endif
> +
> +struct kaio_thread
> +{
> +	struct list_head	req_list;
> +	spinlock_t		req_lock;
> +	struct task_struct	*thread;
> +	int			refcnt;
> +	wait_queue_head_t 	wait;
> +};
> +
> +extern struct file_operations kevent_user_fops;
> +static DEFINE_PER_CPU(u32, kaio_req_counter);
> +static DEFINE_PER_CPU(int, kaio_req_cpu);
> +
> +/*
> + * Array of working threads.
> + * It can only be accessed under RCU protection,
> + * so threads reference counters are not atomic.
> + */
> +static struct kaio_thread *kaio_threads[KAIO_THREAD_NUM] __read_mostly;
> +static struct kmem_cache *kaio_req_cache __read_mostly;
> +static struct kmem_cache *kaio_priv_cache __read_mostly;
> +
> +struct kaio_req;
> +typedef int (* kaio_callback)(struct kaio_req *req, int direct);
> +
> +#define KAIO_REQ_PENDING	0
> +
> +/*
> + * Cache of kaio request callbacks.
> + * It is not allowed to change the same cache entry
> + * simultaneously (for example it is forbidden to add entries
> + * in parallel).
> + *
> + * When cache entry is scheduled for execution in one of the threads,
> + * it is forbidden to access it, since it will be freed when
> + * all callbacks have been invoked.
> + *
> + * It is possible to add callbacks into this cache from callbacks itself.
> + */
> +struct kaio_req
> +{
> +	struct list_head	req_entry;
> +	kaio_callback		call[KAIO_CALL_NUM];
> +	int			read_idx, add_idx;
> +	int			cpu;
> +	long			flags;
> +	atomic_t		refcnt;
> +	void			(*destructor)(struct kaio_req *);
> +	void			*priv;
> +};
> +
> +/*
> + * Returns pointer to thread entry for given index.
> + * Must be called under RCU protection.
> + */
> +static struct kaio_thread *kaio_get_thread(int cpu)
> +{
> +	struct kaio_thread *th;
> +
> +	if (cpu == -1) {
> +#if 1
> +		int *cnt = &__get_cpu_var(kaio_req_cpu);
> +		cpu = *cnt;
> +
> +		*cnt = *cnt + 1;
> +		if (*cnt >= KAIO_THREAD_NUM)
> +			*cnt = 0;
> +#else
> +		cpu = 0;
> +#endif
> +	}
> +
> +	if (unlikely(cpu >= KAIO_THREAD_NUM || !kaio_threads[cpu]))
> +		return NULL;
> +
> +	th = kaio_threads[cpu];
> +	th->refcnt++;
> +
> +	return th;
> +}
> +
> +/*
> + * Drops reference counter for given thread.
> + * Must be called under RCU protection.
> + */
> +static inline void kaio_put_thread(struct kaio_thread *th)
> +{
> +	th->refcnt--;
> +}
> +
> +void kaio_schedule_req(struct kaio_req *req)
> +{
> +	struct kaio_thread *th;
> +	unsigned long flags;
> +
> +	rcu_read_lock();
> +	th = kaio_get_thread(req->cpu);
> +	if (!th) {
> +		req->cpu = -1;
> +		th = kaio_get_thread(-1);
> +		BUG_ON(!th);
> +	}
> +
> +	if (!test_and_set_bit(KAIO_REQ_PENDING, &req->flags)) {
> +		spin_lock_irqsave(&th->req_lock, flags);
> +		list_add_tail(&req->req_entry, &th->req_list);
> +		spin_unlock_irqrestore(&th->req_lock, flags);
> +	}
> +
> +	wake_up(&th->wait);
> +
> +	kaio_put_thread(th);
> +	rcu_read_unlock();
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_schedule_req);
> +
> +static inline void kaio_req_get(struct kaio_req *req)
> +{
> +	atomic_inc(&req->refcnt);
> +}
> +
> +static inline int kaio_req_put(struct kaio_req *req)
> +{
> +	if (atomic_dec_and_test(&req->refcnt)) {
> +		dprintk("%s: freeing req: %p, priv: %p.\n", __func__, req, req->priv);
> +		if (req->destructor)
> +			req->destructor(req);
> +		kmem_cache_free(kaio_req_cache, req);
> +		return 1;
> +	}
> +
> +	return 0;
> +}
> +
> +/*
> + * Append a call request into cache.
> + * Returns -EOVERFLOW in case cache is full, and 0 otherwise.
> + */
> +int kaio_append_call(struct kaio_req *req, kaio_callback call)
> +{
> +	if ((req->add_idx + 1 == req->read_idx) ||
> +			((req->add_idx + 1 == KAIO_CALL_NUM) && req->read_idx == 0))
> +		return -EOVERFLOW;
> +
> +	req->call[req->add_idx] = call;
> +
> +	dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
> +			__func__, req, req->read_idx, req->add_idx,
> +			req->call[req->read_idx], req->call[req->add_idx]);
> +	if (++req->add_idx == KAIO_CALL_NUM)
> +		req->add_idx = 0;
> +
> +	kaio_req_get(req);
> +	
> +	return 0;
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_append_call);
> +
> +/*
> + * Adds one call request into given cache.
> + * If cache is NULL or full, allocate new one.
> + */
> +struct kaio_req *kaio_add_call(struct kaio_req *req, kaio_callback call, int cpu, gfp_t gflags)
> +{
> +	if (req && !kaio_append_call(req, call)) {
> +		kaio_schedule_req(req);
> +		return req;
> +	}
> +
> +	req = kmem_cache_alloc(kaio_req_cache, gflags);
> +	if (!req)
> +		return NULL;
> +
> +	memset(req->call, 0, sizeof(req->call));
> +
> +	req->destructor = NULL;
> +	req->cpu = cpu;
> +	req->call[0] = call;
> +	req->add_idx = 1;
> +	req->read_idx = 0;
> +	req->flags = 0;
> +	atomic_set(&req->refcnt, 1);
> +
> +	dprintk("%s: req: %p, call: %p [%p].\n", __func__, req, call, req->call[0]);
> +
> +	return req;
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_add_call);
> +
> +/*
> + * Call appropriate callbacks in cache.
> + * This can only be called by working threads, which means that cache
> + * is filled (probably partially) and are not even accessible from
> + * the originator of requests, which means that cache will be freed
> + * when all callbacks are invoked.
> + *
> + * Callback itself can reschedule new callback into the same cache.
> + *
> + * If callback returns negative value, the whole cache will be freed.
> + * If positive value is returned, then further processing is stopped,
> + * so cache can be queued into the end of the processing FIFO by callback.
> + * If zero is returned, next callback will be invoked if any.
> + */
> +static int kaio_call(struct kaio_req *req)
> +{
> +	int err = -EINVAL;
> +
> +	if (likely(req->add_idx != req->read_idx)) {
> +		dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
> +				__func__, req, req->read_idx, req->add_idx,
> +				req->call[req->read_idx], req->call[0]);
> +		err = (*req->call[req->read_idx])(req, 0);
> +		if (++req->read_idx == KAIO_CALL_NUM)
> +			req->read_idx = 0;
> +
> +		if (kaio_req_put(req))
> +			err = 0;
> +	}
> +	return err;
> +}
> +
> +static int kaio_thread_process(void *data)
> +{
> +	struct kaio_thread *th = data;
> +	unsigned long flags;
> +	struct kaio_req *req, *first;
> +	DECLARE_WAITQUEUE(wait, current);
> +	int err;
> +
> +	add_wait_queue_exclusive(&th->wait, &wait);
> +
> +	while (!kthread_should_stop()) {
> +		first = req = NULL;
> +		do {
> +			req = NULL;
> +			spin_lock_irqsave(&th->req_lock, flags);
> +			if (!list_empty(&th->req_list)) {
> +				req = list_entry(th->req_list.prev, struct kaio_req, req_entry);
> +				if (first != req)
> +					list_del(&req->req_entry);
> +			}
> +			spin_unlock_irqrestore(&th->req_lock, flags);
> +				
> +			if (!first)
> +				first = req;
> +			else if (first == req)
> +				break;
> +
> +			if (req) {
> +				err = 0;
> +				while ((req->read_idx != req->add_idx) && !kthread_should_stop()) {
> +					dprintk("%s: req: %p, read_idx: %d, add_idx: %d, err: %d.\n",
> +							__func__, req, req->read_idx, req->add_idx, err);
> +					err = kaio_call(req);
> +					if (err != 0)
> +						break;
> +				}
> +
> +				if (err > 0) {
> +					spin_lock_irqsave(&th->req_lock, flags);
> +					list_add_tail(&req->req_entry, &th->req_list);
> +					spin_unlock_irqrestore(&th->req_lock, flags);
> +				}
> +			}
> +		} while (req);
> +		__set_current_state(TASK_INTERRUPTIBLE);
> +		schedule_timeout(HZ);
> +		__set_current_state(TASK_RUNNING);
> +	}
> +	
> +	remove_wait_queue(&th->wait, &wait);
> +
> +	return 0;
> +}
> +
> +struct kaio_private
> +{
> +	union {
> +		void 		*sptr;
> +		__u64		sdata;
> +	};
> +	union {
> +		void 		*dptr;
> +		__u64		ddata;
> +	};
> +	__u64			offset, processed;
> +	__u64			count, limit;
> +	struct kevent_user	*kevent_user;
> +};
> +
> +extern void bio_fs_destructor(struct bio *bio);
> +
> +static void kaio_bio_destructor(struct bio *bio)
> +{
> +	dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
> +	bio_fs_destructor(bio);
> +}
> +
> +static int kaio_read_send_pages(struct kaio_req *req, int direct);
> +
> +static int kaio_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, int err)
> +{
> +	const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
> +	struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1;
> +	struct kaio_req *req = bio->bi_private;
> +
> +	if (bio->bi_size)
> +		return 1;
> +
> +	do {
> +		struct page *page = bvec->bv_page;
> +
> +		if (--bvec >= bio->bi_io_vec)
> +			prefetchw(&bvec->bv_page->flags);
> +
> +		if (uptodate) {
> +			SetPageUptodate(page);
> +		} else {
> +			ClearPageUptodate(page);
> +			SetPageError(page);
> +		}
> +		unlock_page(page);
> +	} while (bvec >= bio->bi_io_vec);
> +
> +	dprintk("%s: bio: %p, req: %p, pending: %d.\n",
> +			__func__, bio, req, test_bit(KAIO_REQ_PENDING, &req->flags));
> +
> +	kaio_append_call(req, kaio_read_send_pages);
> +	kaio_req_put(req);
> +	kaio_schedule_req(req);
> +
> +	bio_put(bio);
> +	return 0;
> +}
> +
> +struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio)
> +{
> +	if (bio) {
> +		bio->bi_end_io = kaio_mpage_end_io_read;
> +		dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
> +		submit_bio(READ, bio);
> +	}
> +	return NULL;
> +}
> +
> +struct bio *kaio_mpage_alloc(struct block_device *bdev,
> +		sector_t first_sector, int nr_vecs, gfp_t gfp_flags, void *priv)
> +{
> +	struct bio *bio;
> +
> +	bio = bio_alloc(gfp_flags, nr_vecs);
> +
> +	if (bio == NULL && (current->flags & PF_MEMALLOC)) {
> +		while (!bio && (nr_vecs /= 2))
> +			bio = bio_alloc(gfp_flags, nr_vecs);
> +	}
> +
> +	if (bio) {
> +		struct kaio_req *req = priv;
> +
> +		bio->bi_bdev = bdev;
> +		bio->bi_sector = first_sector;
> +		bio->bi_private = priv;
> +		bio->bi_destructor = kaio_bio_destructor;
> +		kaio_req_get(req);
> +		dprintk("%s: bio: %p, req: %p, num: %d.\n", __func__, bio, priv, nr_vecs);
> +	}
> +	return bio;
> +}
> +
> +static ssize_t kaio_vfs_read_actor(struct kaio_private *priv, struct page *page, size_t len)
> +{
> +	struct socket *sock = priv->dptr;
> +	struct file *file = sock->file;
> +	
> +	return file->f_op->sendpage(file, page, 0, len, &file->f_pos, 1);
> +}
> +
> +static int kaio_vfs_read(struct kaio_private *priv,
> +		ssize_t (*actor)(struct kaio_private *, struct page *, size_t))
> +{
> +	struct address_space *mapping;
> +	struct file *file = priv->sptr;
> +	ssize_t actor_size;
> +	loff_t isize;
> +	int i = 0, pg_num;
> +
> +	mapping = file->f_mapping;
> +	isize = i_size_read(file->f_dentry->d_inode);
> +
> +	if (priv->processed >= isize) {
> +		priv->count = 0;
> +		return 0;
> +	}
> +	priv->count = isize - priv->processed;
> +	pg_num = ALIGN(min_t(u64, isize, priv->count), PAGE_SIZE) >> PAGE_SHIFT;
> +	
> +	dprintk("%s: start: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n",
> +			__func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
> +
> +	for (i=0; i<pg_num && priv->count; ++i) {
> +		struct page *page;
> +		size_t nr = PAGE_CACHE_SIZE;
> +
> +		page = find_get_page(mapping, priv->processed >> PAGE_CACHE_SHIFT);
> +		if (unlikely(page == NULL))
> +			break;
> +		if (!PageUptodate(page)) {
> +			dprintk("%s: %2d: page=%p, processed=%Lu, count=%Lu not uptodate.\n",
> +					__func__, i, page, priv->processed, priv->count);
> +			page_cache_release(page);
> +			break;
> +		}
> +
> +		if (mapping_writably_mapped(mapping))
> +			flush_dcache_page(page);
> +
> +		mark_page_accessed(page);
> +
> +		if (nr + priv->processed > isize)
> +			nr = isize - priv->processed;
> +		if (nr > priv->count)
> +			nr = priv->count;
> +
> +		actor_size = actor(priv, page, nr);
> +		if (actor_size < 0) {
> +			page_cache_release(page);
> +			i = (int)actor_size;
> +			break;
> +		}
> +
> +		page_cache_release(page);
> +
> +		priv->processed += actor_size;
> +		priv->count -= actor_size;
> +	}
> +
> +	if (!priv->count)
> +		i = pg_num;
> +
> +	dprintk("%s: end: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n",
> +			__func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
> +
> +	return i;
> +}
> +
> +static int kaio_read_send_pages(struct kaio_req *req, int direct)
> +{
> +	struct kaio_private *priv = req->priv;
> +	struct file *file = priv->sptr;
> +	struct address_space *mapping = file->f_mapping;
> +	struct page *page;
> +	int err, i, num;
> +	u64 offset;
> +	LIST_HEAD(page_pool);
> +
> +	err = kaio_vfs_read(priv, &kaio_vfs_read_actor);
> +	if (err < 0)
> +		return err;
> +
> +	if (err == 0) {
> +		priv->limit >>= 1;
> +	} else {
> +		if (priv->limit)
> +			priv->limit <<= 1;
> +		else
> +			priv->limit = 8;
> +	}
> +
> +	if (priv->offset < priv->processed)
> +		priv->offset = priv->processed;
> +
> +	if (!priv->count) {
> +		kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
> +		return 0;
> +	}
> +
> +	if (priv->offset >= priv->processed + priv->count) {
> +		kaio_append_call(req, kaio_read_send_pages);
> +		return 0;
> +	}
> +
> +	num = min_t(int, max_sane_readahead(priv->limit),
> +			ALIGN(priv->count, PAGE_SIZE) >> PAGE_SHIFT);
> +
> +	offset = priv->offset;
> +	for (i=0; i<num; ++i) {
> +		page = page_cache_alloc_cold(mapping);
> +		if (!page)
> +			break;
> +
> +		page->index = priv->offset >> PAGE_CACHE_SHIFT;
> +		list_add(&page->lru, &page_pool);
> +
> +		priv->offset += PAGE_CACHE_SIZE;
> +	}
> +
> +	dprintk("%s: submit: req: %p, priv: %p, offset: %Lu, num: %d, limit: %Lu.\n",
> +			__func__, req, priv, offset, i, priv->limit);
> +
> +	err = mapping->a_ops->aio_readpages(file, mapping, &page_pool, i, req);
> +	if (err) {
> +		dprintk("%s: kevent_mpage_readpages failed: err=%d, count=%Lu.\n",
> +				__func__, err, priv->count);
> +		kaio_schedule_req(req);
> +		return err;
> +	}
> +
> +	return 1;
> +}
> +
> +static int kaio_add_kevent(int fd, struct kaio_req *req)
> +{
> +	struct ukevent uk;
> +	struct file *file;
> +	struct kevent_user *u;
> +	int err, need_fput = 0;
> +	u32 *cnt;
> +
> +	file = fget_light(fd, &need_fput);
> +	if (!file) {
> +		err = -EBADF;
> +		goto err_out;
> +	}
> +
> +	if (file->f_op != &kevent_user_fops) {
> +		err = -EINVAL;
> +		goto err_out_fput;
> +	}
> +
> +	u = file->private_data;
> +
> +	memset(&uk, 0, sizeof(struct ukevent));
> +
> +	uk.event = KEVENT_MASK_ALL;
> +	uk.type = KEVENT_AIO;
> +
> +	preempt_disable();
> +	uk.id.raw[0] = smp_processor_id();
> +	cnt = &__get_cpu_var(kaio_req_counter);
> +	uk.id.raw[1] = *cnt;
> +	*cnt = *cnt + 1;
> +	preempt_enable();
> +
> +	uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE;
> +	uk.ptr = req;
> +
> +	err = kevent_user_add_ukevent(&uk, u);
> +	if (err)
> +		goto err_out_fput;
> +
> +	kevent_user_get(u);
> +
> +	fput_light(file, need_fput);
> +
> +	return 0;
> +
> +err_out_fput:
> +	fput_light(file, need_fput);
> +err_out:
> +	return err;
> +}
> +
> +static void kaio_destructor(struct kaio_req *req)
> +{
> +	struct kaio_private *priv = req->priv;
> +	struct socket *sock = priv->dptr;
> +	struct file *file = priv->sptr;
> +
> +	fput(file);
> +	sockfd_put(sock);
> +
> +	kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
> +	kevent_user_put(priv->kevent_user);
> +
> +	kmem_cache_free(kaio_priv_cache, req->priv);
> +}
> +
> +static struct kaio_req *kaio_sendfile(int kevent_fd, int sock_fd, struct file *file, off_t offset, size_t count)
> +{
> +	struct kaio_req *req;
> +	struct socket *sock;
> +	struct kaio_private *priv;
> +	int err;
> +
> +	sock = sockfd_lookup(sock_fd, &err);
> +	if (!sock)
> +		goto err_out_exit;
> +
> +	priv = kmem_cache_alloc(kaio_priv_cache, GFP_KERNEL);
> +	if (!priv)
> +		goto err_out_sput;
> +
> +	priv->sptr = file;
> +	priv->dptr = sock;
> +	priv->offset = offset;
> +	priv->count = min_t(u64, i_size_read(file->f_dentry->d_inode), count);
> +	priv->processed = offset;
> +	priv->limit = 128;
> +
> +	req = kaio_add_call(NULL, &kaio_read_send_pages, -1, GFP_KERNEL);
> +	if (!req)
> +		goto err_out_free;
> +
> +	req->destructor = kaio_destructor;
> +	req->priv = priv;
> +
> +	err = kaio_add_kevent(kevent_fd, req);
> +
> +	dprintk("%s: req: %p, priv: %p, call: %p [%p], offset: %Lu, processed: %Lu, count: %Lu, err: %d.\n",
> +			__func__, req, priv, &kaio_read_send_pages,
> +			kaio_read_send_pages, priv->offset, priv->processed, priv->count, err);
> +
> +	if (err)
> +		goto err_out_remove;
> +
> +	kaio_schedule_req(req);
> +
> +	return req;
> +
> +err_out_remove:
> +	/* It is safe to just free the object since it is guaranteed that it was not
> +	 * queued for processing.
> +	 */
> +	kmem_cache_free(kaio_req_cache, req);
> +err_out_free:
> +	kmem_cache_free(kaio_priv_cache, priv);
> +err_out_sput:
> +	sockfd_put(sock);
> +err_out_exit:
> +	return NULL;
> +
> +}
> +
> +asmlinkage long sys_aio_sendfile(int kevent_fd, int sock_fd, int in_fd, off_t offset, size_t count)
> +{
> +	struct kaio_req *req;
> +	struct file *file;
> +	int err;
> +
> +	file = fget(in_fd);
> +	if (!file) {
> +		err = -EBADF;
> +		goto err_out_exit;
> +	}
> +
> +	req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count);
> +	if (!req) {
> +		err = -EINVAL;
> +		goto err_out_fput;
> +	}
> +
> +	return (long)req;
> +
> +err_out_fput:
> +	fput(file);
> +err_out_exit:
> +	return err;
> +}
> +
> +asmlinkage long sys_aio_sendfile_path(int kevent_fd, int sock_fd, char __user *filename, off_t offset, size_t count)
> +{
> +	char *tmp = getname(filename);
> +	int fd = PTR_ERR(tmp);
> +	int flags = O_RDONLY, err;
> +	struct nameidata nd;
> +	struct file *file;
> +	struct kaio_req *req;
> +
> +	if (force_o_largefile())
> +		flags = O_LARGEFILE;
> +
> +	if (IS_ERR(tmp)) {
> +		err = fd;
> +		goto err_out_exit;
> +	}
> +
> +	fd = get_unused_fd();
> +	if (fd < 0) {
> +		err = fd;
> +		goto err_out_put_name;
> +	}
> +
> +	if ((flags+1) & O_ACCMODE)
> +		flags++;
> +
> +	err = open_namei(AT_FDCWD, tmp, flags, 0400, &nd);
> +	if (err)
> +		goto err_out_fdput;
> +
> +	file = nameidata_to_filp(&nd, flags);
> +	if (!file)
> +		goto err_out_fdput;
> +
> +	/* One reference will be released in sys_close(),
> +	 * second one through req->destructor()
> +	 */
> +	atomic_inc(&file->f_count);
> +
> +	req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count);
> +	if (!req) {
> +		err = -EINVAL;
> +		goto err_out_fput;
> +	}
> +
> +	fd_install(fd, file);
> +
> +	return fd;
> +
> +err_out_fput:
> +	fput(file);
> +	fput(file);
> +err_out_fdput:
> +	put_unused_fd(fd);
> +err_out_put_name:
> +	putname(tmp);
> +err_out_exit:
> +	return err;
> +}
> +
> +static int kevent_aio_callback(struct kevent *k)
> +{
> +	struct kaio_req *req = k->event.ptr;
> +	struct kaio_private *priv = req->priv;
> +
> +	if (!priv->count) {
> +		__u32 *processed = (__u32 *)&priv->processed;
> +		k->event.ret_data[0] = processed[0];
> +		k->event.ret_data[1] = processed[1];
> +		return 1;
> +	}
> +
> +	return 0;
> +}
> +
> +int kevent_aio_enqueue(struct kevent *k)
> +{
> +	int err;
> +	struct kaio_req *req = k->event.ptr;
> +	struct kaio_private *priv = req->priv;
> +
> +	err = kevent_storage_enqueue(&k->user->st, k);
> +	if (err)
> +		goto err_out_exit;
> +
> +	priv->kevent_user = k->user;
> +	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE)
> +		kevent_requeue(k);
> +
> +	return 0;
> +
> +err_out_exit:
> +	return err;
> +}
> +
> +int kevent_aio_dequeue(struct kevent *k)
> +{
> +	kevent_storage_dequeue(k->st, k);
> +
> +	return 0;
> +}
> +
> +static void kaio_thread_stop(struct kaio_thread *th)
> +{
> +	kthread_stop(th->thread);
> +	kfree(th);
> +}
> +
> +static int kaio_thread_start(struct kaio_thread **thp, int num)
> +{
> +	struct kaio_thread *th;
> +
> +	th = kzalloc(sizeof(struct kaio_thread), GFP_KERNEL);
> +	if (!th)
> +		return -ENOMEM;
> +
> +	th->refcnt = 1;
> +	spin_lock_init(&th->req_lock);
> +	INIT_LIST_HEAD(&th->req_list);
> +	init_waitqueue_head(&th->wait);
> +
> +	th->thread = kthread_run(kaio_thread_process, th, "kaio/%d", num);
> +	if (IS_ERR(th->thread)) {
> +		int err = PTR_ERR(th->thread);
> +		kfree(th);
> +		return err;
> +	}
> +
> +	*thp = th;
> +	wmb();
> +
> +	return 0;
> +}
> +
> +static int __init kevent_init_aio(void)
> +{
> +	struct kevent_callbacks sc = {
> +		.callback = &kevent_aio_callback,
> +		.enqueue = &kevent_aio_enqueue,
> +		.dequeue = &kevent_aio_dequeue,
> +		.flags = 0,
> +	};
> +	int err, i;
> +
> +	kaio_req_cache = kmem_cache_create("kaio_req", sizeof(struct kaio_req),
> +			0, SLAB_PANIC, NULL, NULL);
> +	kaio_priv_cache = kmem_cache_create("kaio_priv", sizeof(struct kaio_private),
> +			0, SLAB_PANIC, NULL, NULL);
> +
> +	memset(kaio_threads, 0, sizeof(kaio_threads));
> +
> +	for (i=0; i<KAIO_THREAD_NUM; ++i) {
> +		err = kaio_thread_start(&kaio_threads[i], i);
> +		if (err)
> +			goto err_out_stop;
> +	}
> +
> +	err = kevent_add_callbacks(&sc, KEVENT_AIO);
> +	if (err)
> +		goto err_out_stop;
> +
> +	return 0;
> +
> +err_out_stop:
> +	while (--i >= 0) {
> +		struct kaio_thread *th = kaio_threads[i];
> +
> +		kaio_threads[i] = NULL;
> +		wmb();
> +
> +		kaio_thread_stop(th);
> +	}
> +	return err;
> +}
> +module_init(kevent_init_aio);
> 
> -
> To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
> the body of a message to [email protected]
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

-- 
Suparna Bhattacharya ([email protected])
Linux Technology Center
IBM Software Lab, India

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

[Index of Archives]     [Kernel Newbies]     [Netfilter]     [Bugtraq]     [Photo]     [Stuff]     [Gimp]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Video 4 Linux]     [Linux for the blind]     [Linux Resources]
  Powered by Linux