Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5be0389d authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: re-send AIO write request when getting -EOLDSNAP error



When receiving -EOLDSNAP from OSD, we need to re-send corresponding
write request. Due to locking issue, we can send new request inside
another OSD request's complete callback. So we use worker to re-send
request for AIO write.

Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent c8fe9b17
Loading
Loading
Loading
Loading
+86 −4
Original line number Diff line number Diff line
@@ -554,9 +554,17 @@ struct ceph_aio_request {
	struct list_head osd_reqs;
	unsigned num_reqs;
	atomic_t pending_reqs;
	struct timespec mtime;
	struct ceph_cap_flush *prealloc_cf;
};

struct ceph_aio_work {
	struct work_struct work;
	struct ceph_osd_request *req;
};

static void ceph_aio_retry_work(struct work_struct *work);

static void ceph_aio_complete(struct inode *inode,
			      struct ceph_aio_request *aio_req)
{
@@ -614,10 +622,19 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req,
	     inode, rc, osd_data->length);

	if (rc == -EOLDSNAPC) {
		BUG_ON(1);
		struct ceph_aio_work *aio_work;
		BUG_ON(!aio_req->write);

		aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
		if (aio_work) {
			INIT_WORK(&aio_work->work, ceph_aio_retry_work);
			aio_work->req = req;
			queue_work(ceph_inode_to_client(inode)->wb_wq,
				   &aio_work->work);
			return;
		}

	if (!aio_req->write) {
		rc = -ENOMEM;
	} else if (!aio_req->write) {
		if (rc == -ENOENT)
			rc = 0;
		if (rc >= 0 && osd_data->length > rc) {
@@ -653,6 +670,69 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req,
	return;
}

static void ceph_aio_retry_work(struct work_struct *work)
{
	struct ceph_aio_work *aio_work =
		container_of(work, struct ceph_aio_work, work);
	struct ceph_osd_request *orig_req = aio_work->req;
	struct ceph_aio_request *aio_req = orig_req->r_priv;
	struct inode *inode = orig_req->r_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc;
	struct ceph_osd_request *req;
	int ret;

	spin_lock(&ci->i_ceph_lock);
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
			list_last_entry(&ci->i_cap_snaps,
					struct ceph_cap_snap,
					ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
	}
	spin_unlock(&ci->i_ceph_lock);

	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
			false, GFP_NOFS);
	if (IS_ERR(req)) {
		ret = PTR_ERR(req);
		req = orig_req;
		goto out;
	}

	req->r_flags =	CEPH_OSD_FLAG_ORDERSNAP |
			CEPH_OSD_FLAG_ONDISK |
			CEPH_OSD_FLAG_WRITE;
	req->r_base_oloc = orig_req->r_base_oloc;
	req->r_base_oid = orig_req->r_base_oid;

	req->r_ops[0] = orig_req->r_ops[0];
	osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);

	ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
				snapc, CEPH_NOSNAP, &aio_req->mtime);

	ceph_put_snap_context(snapc);
	ceph_osdc_put_request(orig_req);

	req->r_callback = ceph_aio_complete_req;
	req->r_inode = inode;
	req->r_priv = aio_req;

	ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
	if (ret < 0) {
		BUG_ON(ret == -EOLDSNAPC);
		req->r_result = ret;
		ceph_aio_complete_req(req, NULL);
	}

	kfree(aio_work);
}

/*
 * Write commit request unsafe callback, called to tell us when a
 * request is unsafe (that is, in flight--has been handed to the
@@ -772,6 +852,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
				aio_req->write = write;
				INIT_LIST_HEAD(&aio_req->osd_reqs);
				if (write) {
					aio_req->mtime = mtime;
					swap(aio_req->prealloc_cf, *pcf);
				}
			}
@@ -867,6 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
				ret = ceph_osdc_start_request(req->r_osdc,
							      req, false);
			if (ret < 0) {
				BUG_ON(ret == -EOLDSNAPC);
				req->r_result = ret;
				ceph_aio_complete_req(req, NULL);
			}