Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7c272194 authored by Sage Weil's avatar Sage Weil
Browse files

ceph: make readpages fully async



When we get a ->readpages() aop, submit async reads for all page ranges
in the provided page list.  Lock the pages immediately, so that VFS/MM
will block until the reads complete.

Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent c3b92c87
Loading
Loading
Loading
Loading
+115 −70
Original line number Diff line number Diff line
@@ -228,102 +228,147 @@ static int ceph_readpage(struct file *filp, struct page *page)
}

/*
 * Build a vector of contiguous pages from the provided page list.
 * Finish an async read(ahead) op.
 */
static struct page **page_vector_from_list(struct list_head *page_list,
					   unsigned *nr_pages)
static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
{
	struct page **pages;
	struct page *page;
	int next_index, contig_pages = 0;
	struct inode *inode = req->r_inode;
	struct ceph_osd_reply_head *replyhead;
	int rc, bytes;
	int i;

	/* build page vector */
	pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
	if (!pages)
		return ERR_PTR(-ENOMEM);
	/* parse reply */
	replyhead = msg->front.iov_base;
	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
	rc = le32_to_cpu(replyhead->result);
	bytes = le32_to_cpu(msg->hdr.data_len);

	BUG_ON(list_empty(page_list));
	next_index = list_entry(page_list->prev, struct page, lru)->index;
	list_for_each_entry_reverse(page, page_list, lru) {
		if (page->index == next_index) {
			dout("readpages page %d %p\n", contig_pages, page);
			pages[contig_pages] = page;
			contig_pages++;
			next_index++;
		} else {
			break;
	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);

	/* unlock all pages, zeroing any data we didn't read */
	for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
		struct page *page = req->r_pages[i];

		if (bytes < (int)PAGE_CACHE_SIZE) {
			/* zero (remainder of) page */
			int s = bytes < 0 ? 0 : bytes;
			zero_user_segment(page, s, PAGE_CACHE_SIZE);
		}
 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
		     page->index);
		flush_dcache_page(page);
		SetPageUptodate(page);
		unlock_page(page);
		page_cache_release(page);
	}
	*nr_pages = contig_pages;
	return pages;
	kfree(req->r_pages);
}

/*
 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 * the caller (VM) cleans them up.
 * start an async read(ahead) operation.  return nr_pages we submitted
 * a read for on success, or negative error code.
 */
static int ceph_readpages(struct file *file, struct address_space *mapping,
			  struct list_head *page_list, unsigned nr_pages)
static int start_read(struct inode *inode, struct list_head *page_list)
{
	struct inode *inode = file->f_dentry->d_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_osd_client *osdc =
		&ceph_inode_to_client(inode)->client->osdc;
	int rc = 0;
	struct page **pages;
	loff_t offset;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct page *page = list_entry(page_list->prev, struct page, lru);
	struct ceph_osd_request *req;
	u64 off;
	u64 len;
	int i;
	struct page **pages;
	pgoff_t next_index;
	int nr_pages = 0;
	int ret;

	dout("readpages %p file %p nr_pages %d\n",
	     inode, file, nr_pages);

	pages = page_vector_from_list(page_list, &nr_pages);
	if (IS_ERR(pages))
		return PTR_ERR(pages);
	off = page->index << PAGE_CACHE_SHIFT;

	/* guess read extent */
	offset = pages[0]->index << PAGE_CACHE_SHIFT;
	/* count pages */
	next_index = page->index;
	list_for_each_entry_reverse(page, page_list, lru) {
		if (page->index != next_index)
			break;
		nr_pages++;
		next_index++;
	}
	len = nr_pages << PAGE_CACHE_SHIFT;
	rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
				 offset, &len,
				 ci->i_truncate_seq, ci->i_truncate_size,
				 pages, nr_pages, 0);
	if (rc == -ENOENT)
		rc = 0;
	if (rc < 0)
		goto out;
	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
	     off, len);

	for (; !list_empty(page_list) && len > 0;
	     rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
		struct page *page =
			list_entry(page_list->prev, struct page, lru);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
				    off, &len,
				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
				    NULL, 0,
				    ci->i_truncate_seq, ci->i_truncate_size,
				    NULL, false, 1, 0);
	if (!req)
		return -ENOMEM;

	/* build page vector */
	nr_pages = len >> PAGE_CACHE_SHIFT;
	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
	ret = -ENOMEM;
	if (!pages)
		goto out;
	for (i = 0; i < nr_pages; ++i) {
		page = list_entry(page_list->prev, struct page, lru);
		BUG_ON(PageLocked(page));
		list_del(&page->lru);
		
		if (rc < (int)PAGE_CACHE_SIZE) {
			/* zero (remainder of) page */
			int s = rc < 0 ? 0 : rc;
			zero_user_segment(page, s, PAGE_CACHE_SIZE);
		}

		if (add_to_page_cache_lru(page, mapping, page->index,
 		dout("start_read %p adding %p idx %lu\n", inode, page,
		     page->index);
		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
					  GFP_NOFS)) {
			page_cache_release(page);
			dout("readpages %p add_to_page_cache failed %p\n",
			dout("start_read %p add_to_page_cache failed %p\n",
			     inode, page);
			continue;
			nr_pages = i;
			goto out_pages;
		}
		dout("readpages %p adding %p idx %lu\n", inode, page,
		     page->index);
		flush_dcache_page(page);
		SetPageUptodate(page);
		unlock_page(page);
		page_cache_release(page);
		pages[i] = page;
	}
	rc = 0;
	req->r_pages = pages;
	req->r_num_pages = nr_pages;
	req->r_callback = finish_read;
	req->r_inode = inode;

out:
	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
	ret = ceph_osdc_start_request(osdc, req, false);
	if (ret < 0)
		goto out_pages;
	ceph_osdc_put_request(req);
	return nr_pages;

out_pages:
	ceph_release_page_vector(pages, nr_pages);
	kfree(pages);
out:
	ceph_osdc_put_request(req);
	return ret;
}


/*
 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 * the caller (VM) cleans them up.
 */
static int ceph_readpages(struct file *file, struct address_space *mapping,
			  struct list_head *page_list, unsigned nr_pages)
{
	struct inode *inode = file->f_dentry->d_inode;
	int rc = 0;

	dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages);
	while (!list_empty(page_list)) {
		rc = start_read(inode, page_list);
		if (rc < 0)
			goto out;
		BUG_ON(rc == 0);
	}
out:
	dout("readpages %p file %p ret %d\n", inode, file, rc);
	return rc;
}