From 8618e30bc14b06bfafa0f164cca7b0e06451f88a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 8 Oct 2012 20:37:30 -0700 Subject: rbd: let con_work() handle backoff Both ceph_fault() and con_work() include handling for imposing a delay before doing further processing on a faulted connection. The latter is used only if ceph_fault() is unable to. Instead, just let con_work() always be responsible for implementing the delay. After setting up the delay value, set the BACKOFF flag on the connection unconditionally and call queue_con() to ensure con_work() will get called to handle it. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cad0d17ec45e..973c16c20c42 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2398,24 +2398,8 @@ static void ceph_fault(struct ceph_connection *con) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; - con->ops->get(con); - if (queue_delayed_work(ceph_msgr_wq, &con->work, - round_jiffies_relative(con->delay))) { - dout("fault queued %p delay %lu\n", con, con->delay); - } else { - con->ops->put(con); - dout("fault failed to queue %p delay %lu, backoff\n", - con, con->delay); - /* - * In many cases we see a socket state change - * while con_work is running and end up - * queuing (non-delayed) work, such that we - * can't backoff with a delay. Set a flag so - * that when con_work restarts we schedule the - * delay then. - */ - set_bit(CON_FLAG_BACKOFF, &con->flags); - } + set_bit(CON_FLAG_BACKOFF, &con->flags); + queue_con(con); } out_unlock: -- cgit v1.2.3 From 802c6d967fbdcd2cbc91b917425661bb8bbfaade Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 8 Oct 2012 20:37:30 -0700 Subject: rbd: define common queue_con_delay() This patch defines a single function, queue_con_delay() to call queue_delayed_work() for a connection. It basically generalizes what was previously queue_con() by adding the delay argument. queue_con() is now a simple helper that passes 0 for its delay. queue_con_delay() returns 0 if it queued work or an errno if it did not for some reason. If con_work() finds the BACKOFF flag set for a connection, it now calls queue_con_delay() to handle arranging to start again after a delay. Note about connection reference counts: con_work() only ever gets called as a work item function. At the time that work is scheduled, a reference to the connection is acquired, and the corresponding con_work() call is then responsible for dropping that reference before it returns. Previously, the backoff handling inside con_work() silently handed off its reference to delayed work it scheduled. Now that queue_con_delay() is used, a new reference is acquired for the newly-scheduled work, and the original reference is dropped by the con->ops->put() call at the end of the function. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 973c16c20c42..66f6f56bcb23 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2244,22 +2244,33 @@ bad_tag: /* - * Atomically queue work on a connection. Bump @con reference to - * avoid races with connection teardown. + * Atomically queue work on a connection after the specified delay. + * Bump @con reference to avoid races with connection teardown. + * Returns 0 if work was queued, or an error code otherwise. */ -static void queue_con(struct ceph_connection *con) +static int queue_con_delay(struct ceph_connection *con, unsigned long delay) { if (!con->ops->get(con)) { - dout("queue_con %p ref count 0\n", con); - return; + dout("%s %p ref count 0\n", __func__, con); + + return -ENOENT; } - if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { - dout("queue_con %p - already queued\n", con); + if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { + dout("%s %p - already queued\n", __func__, con); con->ops->put(con); - } else { - dout("queue_con %p\n", con); + + return -EBUSY; } + + dout("%s %p %lu\n", __func__, con, delay); + + return 0; +} + +static void queue_con(struct ceph_connection *con) +{ + (void) queue_con_delay(con, 0); } /* @@ -2294,14 +2305,11 @@ restart: if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); - if (queue_delayed_work(ceph_msgr_wq, &con->work, - round_jiffies_relative(con->delay))) { - dout("con_work %p backoff %lu\n", con, con->delay); - mutex_unlock(&con->mutex); - return; - } else { + ret = queue_con_delay(con, round_jiffies_relative(con->delay)); + if (ret) { dout("con_work %p FAILED to back off %lu\n", con, con->delay); + BUG_ON(ret == -ENOENT); set_bit(CON_FLAG_BACKOFF, &con->flags); } goto done; -- cgit v1.2.3 From 9478554ae5d21d65e948a3eff4ee2a8ad30d70e9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 9 Oct 2012 13:50:17 -0700 Subject: rbd: define rbd_update_mapping_size() Encapsulate the code that handles updating the size of a mapping after an rbd image has been refreshed. This is done in anticipation of the next patch, which will make this common code for format 1 and 2 images. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index bb3d9be3b1b4..b64125d1d7bd 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1716,6 +1716,19 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) __rbd_remove_snap_dev(snap); } +static void rbd_update_mapping_size(struct rbd_device *rbd_dev) +{ + sector_t size; + + if (rbd_dev->mapping.snap_id != CEPH_NOSNAP) + return; + + size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; + dout("setting size to %llu sectors", (unsigned long long) size); + rbd_dev->mapping.size = (u64) size; + set_capacity(rbd_dev->disk, size); +} + /* * only read the first part of the ondisk header, without the snaps info */ @@ -1730,17 +1743,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) down_write(&rbd_dev->header_rwsem); - /* resized? */ - if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) { - sector_t size = (sector_t) h.image_size / SECTOR_SIZE; - - if (size != (sector_t) rbd_dev->mapping.size) { - dout("setting size to %llu sectors", - (unsigned long long) size); - rbd_dev->mapping.size = (u64) size; - set_capacity(rbd_dev->disk, size); - } - } + /* Update image size, and check for resize of mapped image */ + rbd_dev->header.image_size = h.image_size; + rbd_update_mapping_size(rbd_dev); /* rbd_dev->header.object_prefix shouldn't change */ kfree(rbd_dev->header.snap_sizes); -- cgit v1.2.3 From 117973fb4c91f3fd913127577e9f71b3aa6cb556 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 31 Aug 2012 17:29:55 -0500 Subject: rbd: define rbd_dev_v2_refresh() Define a new function rbd_dev_v2_refresh() to update/refresh the snapshot context for a format version 2 rbd image. This function will update anything that is not fixed for the life of an rbd image--at the moment this is mainly the snapshot context and (for a base mapping) the size. Update rbd_refresh_header() so it selects which function to use based on the image format. Rename __rbd_refresh_header() to be rbd_dev_v1_refresh() to be consistent with the naming of its version 2 counterpart. Similarly rename rbd_refresh_header() to be rbd_dev_refresh(). Unrelated--we use rbd_image_format_valid() here. Delete the other use of it, which was primarily put in place to ensure that function was referenced at the time it was defined. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 55 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b64125d1d7bd..f11b839166ef 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -268,7 +268,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) put_device(&rbd_dev->dev); } -static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver); +static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); +static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -1304,7 +1305,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", rbd_dev->header_name, (unsigned long long) notify_id, (unsigned int) opcode); - rc = rbd_refresh_header(rbd_dev, &hver); + rc = rbd_dev_refresh(rbd_dev, &hver); if (rc) pr_warning(RBD_DRV_NAME "%d got notification but failed to " " update snaps: %d\n", rbd_dev->major, rc); @@ -1732,7 +1733,7 @@ static void rbd_update_mapping_size(struct rbd_device *rbd_dev) /* * only read the first part of the ondisk header, without the snaps info */ -static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) +static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) { int ret; struct rbd_image_header h; @@ -1773,12 +1774,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) return ret; } -static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) +static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) { int ret; + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - ret = __rbd_refresh_header(rbd_dev, hver); + if (rbd_dev->image_format == 1) + ret = rbd_dev_v1_refresh(rbd_dev, hver); + else + ret = rbd_dev_v2_refresh(rbd_dev, hver); mutex_unlock(&ctl_mutex); return ret; @@ -1938,7 +1943,7 @@ static ssize_t rbd_image_refresh(struct device *dev, struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); int ret; - ret = rbd_refresh_header(rbd_dev, NULL); + ret = rbd_dev_refresh(rbd_dev, NULL); return ret < 0 ? ret : size; } @@ -2402,6 +2407,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, return ERR_PTR(-EINVAL); } +static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) +{ + int ret; + __u8 obj_order; + + down_write(&rbd_dev->header_rwsem); + + /* Grab old order first, to see if it changes */ + + obj_order = rbd_dev->header.obj_order, + ret = rbd_dev_v2_image_size(rbd_dev); + if (ret) + goto out; + if (rbd_dev->header.obj_order != obj_order) { + ret = -EIO; + goto out; + } + rbd_update_mapping_size(rbd_dev); + + ret = rbd_dev_v2_snap_context(rbd_dev, hver); + dout("rbd_dev_v2_snap_context returned %d\n", ret); + if (ret) + goto out; + ret = rbd_dev_snaps_update(rbd_dev); + dout("rbd_dev_snaps_update returned %d\n", ret); + if (ret) + goto out; + ret = rbd_dev_snaps_register(rbd_dev); + dout("rbd_dev_snaps_register returned %d\n", ret); +out: + up_write(&rbd_dev->header_rwsem); + + return ret; +} + /* * Scan the rbd device's current snapshot list and compare it to the * newly-received snapshot context. Remove any existing snapshots @@ -2564,7 +2604,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev) do { ret = rbd_req_sync_watch(rbd_dev); if (ret == -ERANGE) { - rc = rbd_refresh_header(rbd_dev, NULL); + rc = rbd_dev_refresh(rbd_dev, NULL); if (rc < 0) return rc; } @@ -3045,7 +3085,6 @@ static ssize_t rbd_add(struct bus_type *bus, rc = rbd_dev_probe(rbd_dev); if (rc < 0) goto err_out_client; - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); /* no need to lock here, as rbd_dev is not registered yet */ rc = rbd_dev_snaps_update(rbd_dev); -- cgit v1.2.3 From d889140c4a1c5edb6a7bd90392b9d878bfaccfb6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 9 Oct 2012 13:50:17 -0700 Subject: rbd: implement feature checks Version 2 images have two sets of feature bit fields. The first indicates features possibly used by the image. The second indicates features that the client *must* support in order to use the image. When an image (or snapshot) is first examined, we need to make sure that the local implementation supports the image's required features. If not, fail the probe for the image. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index f11b839166ef..0f260a6e97c4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -70,6 +70,14 @@ #define RBD_IMAGE_ID_LEN_MAX 64 #define RBD_OBJ_PREFIX_LEN_MAX 64 +/* Feature bits */ + +#define RBD_FEATURE_LAYERING 1 + +/* Features supported by this (client software) implementation. */ + +#define RBD_FEATURES_ALL (0) + /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. @@ -2226,6 +2234,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, __le64 features; __le64 incompat; } features_buf = { 0 }; + u64 incompat; int ret; ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, @@ -2236,6 +2245,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); if (ret < 0) return ret; + + incompat = le64_to_cpu(features_buf.incompat); + if (incompat & ~RBD_FEATURES_ALL) + return -ENOTSUPP; + *snap_features = le64_to_cpu(features_buf.features); dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", @@ -2977,7 +2991,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) if (ret < 0) goto out_err; - /* Get the features for the image */ + /* Get the and check features for the image */ ret = rbd_dev_v2_features(rbd_dev); if (ret < 0) -- cgit v1.2.3 From 35152979e6181b1fbb4b61c3ff641c14df53ad66 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 31 Aug 2012 17:29:55 -0500 Subject: rbd: activate v2 image support Now that v2 images support is fully implemented, have rbd_dev_v2_probe() return 0 to indicate a successful probe. (Note that an image that implements layering will fail the probe early because of the feature chekc.) Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 0f260a6e97c4..8f56d37637a7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3014,7 +3014,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) dout("discovered version 2 image, header name is %s\n", rbd_dev->header_name); - return -ENOTSUPP; + return 0; out_err: kfree(rbd_dev->header_name); rbd_dev->header_name = NULL; -- cgit v1.2.3 From 0f9831a89310cebba52d3f526e6cc5c2e403e6f1 Mon Sep 17 00:00:00 2001 From: David Zafman Date: Thu, 18 Oct 2012 14:01:43 -0700 Subject: ceph: fix dentry reference leak in encode_fh() Call to d_find_alias() needs a corresponding dput() This fixes http://tracker.newdream.net/issues/3271 Signed-off-by: David Zafman Reviewed-by: Sage Weil --- fs/ceph/export.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 8e1b60e557b6..862887004d20 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -90,6 +90,8 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, *max_len = handle_length; type = 255; } + if (dentry) + dput(dentry); return type; } -- cgit v1.2.3 From 7246240c7c186542f73af4fadc744d66440f616f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Oct 2012 08:49:41 -0700 Subject: libceph: avoid NULL kref_put from NULL alloc_msg return The ceph_on_in_msg_alloc() method calls the ->alloc_msg() helper which may return NULL. It also drops con->mutex while it allocates a message, which means that the connection state may change (e.g., get closed). If that happens, we clean up and bail out. Avoid calling ceph_msg_put() on a NULL return value and triggering a crash. This was observed when an ->alloc_msg() call races with a timeout that resends a zillion messages and resets the connection, and ->alloc_msg() returns NULL (because the request was resent to another target). Fixes http://tracker.newdream.net/issues/3342 Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 66f6f56bcb23..1041114453db 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2742,7 +2742,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); if (con->state != CON_STATE_OPEN) { - ceph_msg_put(msg); + if (msg) + ceph_msg_put(msg); return -EAGAIN; } con->in_msg = msg; -- cgit v1.2.3 From b000056a5a8d3f5a4a9fb80184a7ec14f86a43d4 Mon Sep 17 00:00:00 2001 From: David Zafman Date: Thu, 25 Oct 2012 10:23:46 -0700 Subject: ceph: Fix NULL ptr crash in strlen() set_request_path_attr() checks for NULL ptr before calling strlen() This fixes http://tracker.newdream.net/issues/3404 Signed-off-by: David Zafman Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1bcf712655d9..62d2342eb267 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, } else if (rpath || rino) { *ino = rino; *ppath = rpath; - *pathlen = strlen(rpath); + *pathlen = rpath ? strlen(rpath) : 0; dout(" path %.*s\n", *pathlen, rpath); } -- cgit v1.2.3 From b213e0b1a62637b2a9395a34349b13d73ca2b90a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 10 Oct 2012 21:19:13 -0700 Subject: rbd: fix bug in rbd_dev_id_put() In rbd_dev_id_put(), there's a loop that's intended to determine the maximum device id in use. But it isn't doing that at all, the effect of how it's written is to simply use the just-put id number, which ignores whole purpose of this function. Fix the bug. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8f56d37637a7..4a16464ad5bb 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2680,8 +2680,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev) struct rbd_device *rbd_dev; rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_id > max_id) - max_id = rbd_id; + if (rbd_dev->dev_id > max_id) + max_id = rbd_dev->dev_id; } spin_unlock(&rbd_dev_list_lock); -- cgit v1.2.3 From a0ea3a40fd20b8c66381f747c454f89d6d1f50d4 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 10 Oct 2012 21:19:13 -0700 Subject: rbd: zero return code in rbd_dev_image_id() When rbd_dev_probe() calls rbd_dev_image_id() it expects to get a 0 return code if successful, but it is getting a positive value. The reason is that rbd_dev_image_id() returns the value it gets from rbd_req_sync_exec(), which returns the number of bytes read in as a result of the request. (This ultimately comes from ceph_copy_from_page_vector() in rbd_req_sync_op()). Force the return value to 0 when successful in rbd_dev_image_id(). Do the same in rbd_dev_v2_object_prefix(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin Reviewed-by: Dan Mick --- drivers/block/rbd.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4a16464ad5bb..94d613208c4f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2207,6 +2207,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); if (ret < 0) goto out; + ret = 0; /* rbd_req_sync_exec() can return positive */ p = reply_buf; rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, @@ -2900,6 +2901,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); if (ret < 0) goto out; + ret = 0; /* rbd_req_sync_exec() can return positive */ p = response; rbd_dev->image_id = ceph_extract_encoded_string(&p, -- cgit v1.2.3 From be466c1cc36621590ef17b05a6d342dfd33f7280 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 22 Oct 2012 11:31:26 -0500 Subject: rbd: fix read-only option name The name of the "read-only" mapping option was inadvertently changed in this commit: f84344f3 rbd: separate mapping info in rbd_dev Revert that hunk to return it to what it should be. Signed-off-by: Alex Elder Reviewed-by: Dan Mick Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 94d613208c4f..5d9e2cc5c51e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -397,7 +397,7 @@ enum { static match_table_t rbd_opts_tokens = { /* int args above */ /* string args above */ - {Opt_read_only, "mapping.read_only"}, + {Opt_read_only, "read_only"}, {Opt_read_only, "ro"}, /* Alternate spelling */ {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ -- cgit v1.2.3 From 13f4042c05b6a1a638ccab3f0cabdb84993803a2 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 10 Oct 2012 18:59:29 -0700 Subject: rbd: kill rbd_req_{read,write}() Both rbd_req_read() and rbd_req_write() are simple wrapper routines for rbd_do_op(), and each is only called once. Replace each wrapper call with a direct call to rbd_do_op(), and get rid of the wrapper functions. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 60 ++++++++++++----------------------------------------- 1 file changed, 13 insertions(+), 47 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 5d9e2cc5c51e..8f2c39ad3bb0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1210,41 +1210,6 @@ done: return ret; } -/* - * Request async osd write - */ -static int rbd_req_write(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ofs, len, bio, coll, coll_index); -} - -/* - * Request async osd read - */ -static int rbd_req_read(struct request *rq, - struct rbd_device *rbd_dev, - u64 snapid, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - return rbd_do_op(rq, rbd_dev, NULL, - snapid, - CEPH_OSD_OP_READ, - CEPH_OSD_FLAG_READ, - ofs, len, bio, coll, coll_index); -} - /* * Request sync osd read */ @@ -1550,21 +1515,22 @@ static void rbd_rq_fn(struct request_queue *q) goto next_seg; } - /* init OSD command: write or read */ if (do_write) - rbd_req_write(rq, rbd_dev, - snapc, - ofs, - op_size, bio, - coll, cur_seg); + (void) rbd_do_op(rq, rbd_dev, + snapc, CEPH_NOSNAP, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ONDISK, + ofs, op_size, bio, + coll, cur_seg); else - rbd_req_read(rq, rbd_dev, - rbd_dev->mapping.snap_id, - ofs, - op_size, bio, - coll, cur_seg); - + (void) rbd_do_op(rq, rbd_dev, + NULL, rbd_dev->mapping.snap_id, + CEPH_OSD_OP_READ, + CEPH_OSD_FLAG_READ, + ofs, op_size, bio, + coll, cur_seg); next_seg: size -= op_size; ofs += op_size; -- cgit v1.2.3 From ff2e4bb5b32f89c455979a4222a9b78007cde254 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 10 Oct 2012 18:59:29 -0700 Subject: rbd: drop rbd_do_op() opcode and flags The only callers of rbd_do_op() are in rbd_rq_fn(), where call one is used for writes and the other used for reads. The request passed to rbd_do_op() already encodes the I/O direction, and that information can be used inside the function to set the opcode and flags value (rather than passing them in as arguments). So get rid of the opcode and flags arguments to rbd_do_op(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8f2c39ad3bb0..a29c6d2a49ad 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1164,7 +1164,6 @@ static int rbd_do_op(struct request *rq, struct rbd_device *rbd_dev, struct ceph_snap_context *snapc, u64 snapid, - int opcode, int flags, u64 ofs, u64 len, struct bio *bio, struct rbd_req_coll *coll, @@ -1176,6 +1175,8 @@ static int rbd_do_op(struct request *rq, int ret; struct ceph_osd_req_op *ops; u32 payload_len; + int opcode; + int flags; seg_name = rbd_segment_name(rbd_dev, ofs); if (!seg_name) @@ -1183,7 +1184,15 @@ static int rbd_do_op(struct request *rq, seg_len = rbd_segment_length(rbd_dev, ofs, len); seg_ofs = rbd_segment_offset(rbd_dev, ofs); - payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); + if (rq_data_dir(rq) == WRITE) { + opcode = CEPH_OSD_OP_WRITE; + flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; + payload_len = seg_len; + } else { + opcode = CEPH_OSD_OP_READ; + flags = CEPH_OSD_FLAG_READ; + payload_len = 0; + } ret = -ENOMEM; ops = rbd_create_rw_ops(1, opcode, payload_len); @@ -1519,16 +1528,11 @@ static void rbd_rq_fn(struct request_queue *q) if (do_write) (void) rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, ofs, op_size, bio, coll, cur_seg); else (void) rbd_do_op(rq, rbd_dev, NULL, rbd_dev->mapping.snap_id, - CEPH_OSD_OP_READ, - CEPH_OSD_FLAG_READ, ofs, op_size, bio, coll, cur_seg); next_seg: -- cgit v1.2.3 From 4634246db8cb2e5117ef7c682efcc383fa3354f8 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 10 Oct 2012 18:59:29 -0700 Subject: rbd: consolidate rbd_do_op() calls The two calls to rbd_do_op() from rbd_rq_fn() differ only in the value passed for the snapshot id and the snapshot context. For reads the snapshot always comes from the mapping, and for writes the snapshot id is always CEPH_NOSNAP. The snapshot context is always null for reads. For writes, the snapshot context always comes from the rbd header, but it is acquired under protection of header semaphore and could change thereafter, so we can't simply use what's available inside rbd_do_op(). Eliminate the snapid parameter from rbd_do_op(), and set it based on the I/O direction inside that function instead. Always pass the snapshot context acquired in the caller, but reset it to a null pointer inside rbd_do_op() if the operation is a read. As a result, there is no difference in the read and write calls to rbd_do_op() made in rbd_rq_fn(), so just call it unconditionally. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index a29c6d2a49ad..d0328835bbd9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1163,7 +1163,6 @@ done: static int rbd_do_op(struct request *rq, struct rbd_device *rbd_dev, struct ceph_snap_context *snapc, - u64 snapid, u64 ofs, u64 len, struct bio *bio, struct rbd_req_coll *coll, @@ -1177,6 +1176,7 @@ static int rbd_do_op(struct request *rq, u32 payload_len; int opcode; int flags; + u64 snapid; seg_name = rbd_segment_name(rbd_dev, ofs); if (!seg_name) @@ -1187,10 +1187,13 @@ static int rbd_do_op(struct request *rq, if (rq_data_dir(rq) == WRITE) { opcode = CEPH_OSD_OP_WRITE; flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; + snapid = CEPH_NOSNAP; payload_len = seg_len; } else { opcode = CEPH_OSD_OP_READ; flags = CEPH_OSD_FLAG_READ; + snapc = NULL; + snapid = rbd_dev->mapping.snap_id; payload_len = 0; } @@ -1518,24 +1521,13 @@ static void rbd_rq_fn(struct request_queue *q) kref_get(&coll->kref); bio = bio_chain_clone(&rq_bio, &next_bio, &bp, op_size, GFP_ATOMIC); - if (!bio) { + if (bio) + (void) rbd_do_op(rq, rbd_dev, snapc, + ofs, op_size, + bio, coll, cur_seg); + else rbd_coll_end_req_index(rq, coll, cur_seg, -ENOMEM, op_size); - goto next_seg; - } - - /* init OSD command: write or read */ - if (do_write) - (void) rbd_do_op(rq, rbd_dev, - snapc, CEPH_NOSNAP, - ofs, op_size, bio, - coll, cur_seg); - else - (void) rbd_do_op(rq, rbd_dev, - NULL, rbd_dev->mapping.snap_id, - ofs, op_size, bio, - coll, cur_seg); -next_seg: size -= op_size; ofs += op_size; -- cgit v1.2.3 From db2388b6ee40a949084e4cdddc3b0a4357068a62 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 20 Oct 2012 22:17:27 -0500 Subject: rbd: verify rbd image order value This adds a verification that an rbd image's object order is within the upper and lower bounds supported by this implementation. It must be at least 9 (SECTOR_SHIFT), because the Linux bio system assumes that minimum granularity. It also must be less than 32 (at the moment anyway) because there exist spots in the code that store the size of a "segment" (object backing an rbd image) in a signed int variable, which can be 32 bits including the sign. We should be able to relax this limit once we've verified the code uses 64-bit types where needed. Note that the CLI tool already limits the order to the range 12-25. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d0328835bbd9..4734446c3b5b 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -533,6 +533,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) return false; + /* The bio layer requires at least sector-sized I/O */ + + if (ondisk->options.order < SECTOR_SHIFT) + return false; + + /* If we use u64 in a few spots we may be able to loosen this */ + + if (ondisk->options.order > 8 * sizeof (int) - 1) + return false; + /* * The size of a snapshot header has to fit in a size_t, and * that limits the number of snapshots. -- cgit v1.2.3 From d4b125e9eb43babd14538ba61718e3db71a98d29 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 3 Jul 2012 16:01:19 -0500 Subject: rbd: increase maximum snapshot name length Change RBD_MAX_SNAP_NAME_LEN to be based on NAME_MAX. That is a practical limit for the length of a snapshot name (based on the presence of a directory using the name under /sys/bus/rbd to represent the snapshot). The /sys entry is created by prefixing it with "snap_"; define that prefix symbolically, and take its length into account in defining the snapshot name length limit. Enforce the limit in rbd_add_parse_args(). Also delete a dout() call in that function that was not meant to be committed. Signed-off-by: Alex Elder Reviewed-by: Dan Mick Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4734446c3b5b..4858d925b95e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -61,7 +61,10 @@ #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ -#define RBD_MAX_SNAP_NAME_LEN 32 +#define RBD_SNAP_DEV_NAME_PREFIX "snap_" +#define RBD_MAX_SNAP_NAME_LEN \ + (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) + #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ #define RBD_MAX_OPT_LEN 1024 @@ -2063,7 +2066,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap, dev->type = &rbd_snap_device_type; dev->parent = parent; dev->release = rbd_snap_dev_release; - dev_set_name(dev, "snap_%s", snap->name); + dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); dout("%s: registering device for snapshot %s\n", __func__, snap->name); ret = device_register(dev); @@ -2797,8 +2800,13 @@ static char *rbd_add_parse_args(struct rbd_device *rbd_dev, if (!rbd_dev->image_name) goto out_err; - /* Snapshot name is optional */ + /* Snapshot name is optional; default is to use "head" */ + len = next_token(&buf); + if (len > RBD_MAX_SNAP_NAME_LEN) { + err_ptr = ERR_PTR(-ENAMETOOLONG); + goto out_err; + } if (!len) { buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ len = sizeof (RBD_SNAP_HEAD_NAME) - 1; @@ -2809,8 +2817,6 @@ static char *rbd_add_parse_args(struct rbd_device *rbd_dev, memcpy(snap_name, buf, len); *(snap_name + len) = '\0'; -dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len); - return snap_name; out_err: -- cgit v1.2.3 From e5cfeed281a842a37c9da84bad2911c9b470347e Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 20 Oct 2012 22:17:27 -0500 Subject: rbd: simplify rbd_merge_bvec() The aim of this patch is to make what's going on rbd_merge_bvec() a bit more obvious than it was before. This was an issue when a recent btrfs bug led us to question whether the merge function was working correctly. Use "obj" rather than "chunk" to indicate the units whose boundaries we care about we call (rados) "objects". Signed-off-by: Alex Elder Reviewed-by: Dan Mick --- drivers/block/rbd.c | 51 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4858d925b95e..76fbfa120064 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1566,22 +1566,41 @@ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, struct bio_vec *bvec) { struct rbd_device *rbd_dev = q->queuedata; - unsigned int chunk_sectors; - sector_t sector; - unsigned int bio_sectors; - int max; - - chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); - sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); - bio_sectors = bmd->bi_size >> SECTOR_SHIFT; - - max = (chunk_sectors - ((sector & (chunk_sectors - 1)) - + bio_sectors)) << SECTOR_SHIFT; - if (max < 0) - max = 0; /* bio_add cannot handle a negative return */ - if (max <= bvec->bv_len && bio_sectors == 0) - return bvec->bv_len; - return max; + sector_t sector_offset; + sector_t sectors_per_obj; + sector_t obj_sector_offset; + int ret; + + /* + * Find how far into its rbd object the partition-relative + * bio start sector is to offset relative to the enclosing + * device. + */ + sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; + sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); + obj_sector_offset = sector_offset & (sectors_per_obj - 1); + + /* + * Compute the number of bytes from that offset to the end + * of the object. Account for what's already used by the bio. + */ + ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; + if (ret > bmd->bi_size) + ret -= bmd->bi_size; + else + ret = 0; + + /* + * Don't send back more than was asked for. And if the bio + * was empty, let the whole thing through because: "Note + * that a block device *must* allow a single page to be + * added to an empty bio." + */ + rbd_assert(bvec->bv_len <= PAGE_SIZE); + if (ret > (int) bvec->bv_len || !bmd->bi_size) + ret = (int) bvec->bv_len; + + return ret; } static void rbd_free_disk(struct rbd_device *rbd_dev) -- cgit v1.2.3 From 069a4b5690a952e74157fd489833c71c73f012b3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 22 Oct 2012 11:31:27 -0500 Subject: rbd: kill rbd_device->rbd_opts The rbd_device structure has an embedded rbd_options structure. Such a structure is needed to work with the generic ceph argument parsing code, but there's no need to keep it around once argument parsing is done. Use a local variable to hold the rbd options used in parsing in rbd_get_client(), and just transfer its content (it's just a read_only flag) into the field in the rbd_mapping sub-structure that requires that information. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin Reviewed-by: Dan Mick --- drivers/block/rbd.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 76fbfa120064..c800047f5835 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -184,7 +184,6 @@ struct rbd_device { struct gendisk *disk; /* blkdev's gendisk and rq */ u32 image_format; /* Either 1 or 2 */ - struct rbd_options rbd_opts; struct rbd_client *rbd_client; char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ @@ -456,18 +455,24 @@ static int parse_rbd_opts_token(char *c, void *private) static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, size_t mon_addr_len, char *options) { - struct rbd_options *rbd_opts = &rbd_dev->rbd_opts; + struct rbd_options rbd_opts; struct ceph_options *ceph_opts; struct rbd_client *rbdc; - rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; + /* Initialize all rbd options to the defaults */ + + rbd_opts.read_only = RBD_READ_ONLY_DEFAULT; ceph_opts = ceph_parse_options(options, mon_addr, mon_addr + mon_addr_len, - parse_rbd_opts_token, rbd_opts); + parse_rbd_opts_token, &rbd_opts); if (IS_ERR(ceph_opts)) return PTR_ERR(ceph_opts); + /* Record the parsed rbd options */ + + rbd_dev->mapping.read_only = rbd_opts.read_only; + rbdc = rbd_client_find(ceph_opts); if (rbdc) { /* using an existing client */ @@ -685,7 +690,6 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) rbd_dev->mapping.size = rbd_dev->header.image_size; rbd_dev->mapping.features = rbd_dev->header.features; rbd_dev->mapping.snap_exists = false; - rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only; ret = 0; } else { ret = snap_by_name(rbd_dev, snap_name); -- cgit v1.2.3 From 0ed7285e0001b960c888e5455ae982025210ed3d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 29 Oct 2012 11:01:42 -0700 Subject: libceph: fix osdmap decode error paths Ensure that we set the err value correctly so that we do not pass a 0 value to ERR_PTR and confuse the calling code. (In particular, osd_client.c handle_map() will BUG(!newmap)). Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 5433fb0eb3c6..f552aa48fd9e 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -645,10 +645,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, max, bad); while (max--) { ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); + err = -ENOMEM; pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) goto bad; pi->id = ceph_decode_32(p); + err = -EINVAL; ev = ceph_decode_8(p); /* encoding version */ if (ev > CEPH_PG_POOL_VERSION) { pr_warning("got unknown v %d > %d of ceph_pg_pool\n", @@ -664,8 +666,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) __insert_pg_pool(&map->pg_pools, pi); } - if (version >= 5 && __decode_pool_names(p, end, map) < 0) - goto bad; + if (version >= 5) { + err = __decode_pool_names(p, end, map); + if (err < 0) { + dout("fail to decode pool names"); + goto bad; + } + } ceph_decode_32_safe(p, end, map->pool_max, bad); @@ -745,7 +752,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) return map; bad: - dout("osdmap_decode fail\n"); + dout("osdmap_decode fail err %d\n", err); ceph_osdmap_destroy(map); return ERR_PTR(err); } @@ -839,6 +846,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (ev > CEPH_PG_POOL_VERSION) { pr_warning("got unknown v %d > %d of ceph_pg_pool\n", ev, CEPH_PG_POOL_VERSION); + err = -EINVAL; goto bad; } pi = __lookup_pg_pool(&map->pg_pools, pool); @@ -855,8 +863,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, if (err < 0) goto bad; } - if (version >= 5 && __decode_pool_names(p, end, map) < 0) - goto bad; + if (version >= 5) { + err = __decode_pool_names(p, end, map); + if (err < 0) + goto bad; + } /* old_pool */ ceph_decode_32_safe(p, end, len, bad); @@ -932,15 +943,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, (void) __remove_pg_mapping(&map->pg_temp, pgid); /* insert */ - if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { - err = -EINVAL; + err = -EINVAL; + if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) goto bad; - } + err = -ENOMEM; pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); - if (!pg) { - err = -ENOMEM; + if (!pg) goto bad; - } pg->pgid = pgid; pg->len = pglen; for (j = 0; j < pglen; j++) -- cgit v1.2.3 From f7760dad286829682a8d36f4563ab20a65732414 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 20 Oct 2012 22:17:27 -0500 Subject: rbd: simplify rbd_rq_fn() When processing a request, rbd_rq_fn() makes clones of the bio's in the request's bio chain and submits the results to osd's to be satisfied. If a request bio straddles the boundary between objects backing the rbd image, it must be represented by two cloned bio's, one for the first part (at the end of one object) and one for the second (at the beginning of the next object). This has been handled by a function bio_chain_clone(), which includes an interface only a mother could love, and which has been found to have other problems. This patch defines two new fairly generic bio functions (one which replaces bio_chain_clone()) to help out the situation, and then revises rbd_rq_fn() to make use of them. First, bio_clone_range() clones a portion of a single bio, starting at a given offset within the bio and including only as many bytes as requested. As a convenience, a request to clone the entire bio is passed directly to bio_clone(). Second, bio_chain_clone_range() performs a similar function, producing a chain of cloned bio's covering a sub-range of the source chain. No bio_pair structures are used, and if successful the result will represent exactly the specified range. Using bio_chain_clone_range() makes bio_rq_fn() a little easier to understand, because it avoids the need to pass very much state information between consecutive calls. By avoiding the need to track a bio_pair structure, it also eliminates the problem described here: http://tracker.newdream.net/issues/2933 Note that a block request (and therefore the complete length of a bio chain processed in rbd_rq_fn()) is an unsigned int, while the result of rbd_segment_length() is u64. This change makes this range trunctation explicit, and trips a bug if the the segment boundary is too far off. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 231 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 152 insertions(+), 79 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c800047f5835..cc06c55875b9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } /* - * bio_chain_clone - clone a chain of bios up to a certain length. - * might return a bio_pair that will need to be released. + * Clone a portion of a bio, starting at the given byte offset + * and continuing for the number of bytes indicated. */ -static struct bio *bio_chain_clone(struct bio **old, struct bio **next, - struct bio_pair **bp, - int len, gfp_t gfpmask) -{ - struct bio *old_chain = *old; - struct bio *new_chain = NULL; - struct bio *tail; - int total = 0; - - if (*bp) { - bio_pair_release(*bp); - *bp = NULL; - } +static struct bio *bio_clone_range(struct bio *bio_src, + unsigned int offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio_vec *bv; + unsigned int resid; + unsigned short idx; + unsigned int voff; + unsigned short end_idx; + unsigned short vcnt; + struct bio *bio; - while (old_chain && (total < len)) { - struct bio *tmp; + /* Handle the easy case for the caller */ - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); - if (!tmp) - goto err_out; - gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ + if (!offset && len == bio_src->bi_size) + return bio_clone(bio_src, gfpmask); - if (total + old_chain->bi_size > len) { - struct bio_pair *bp; + if (WARN_ON_ONCE(!len)) + return NULL; + if (WARN_ON_ONCE(len > bio_src->bi_size)) + return NULL; + if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) + return NULL; - /* - * this split can only happen with a single paged bio, - * split_bio will BUG_ON if this is not the case - */ - dout("bio_chain_clone split! total=%d remaining=%d" - "bi_size=%u\n", - total, len - total, old_chain->bi_size); + /* Find first affected segment... */ - /* split the bio. We'll release it either in the next - call, or it will have to be released outside */ - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); - if (!bp) - goto err_out; + resid = offset; + __bio_for_each_segment(bv, bio_src, idx, 0) { + if (resid < bv->bv_len) + break; + resid -= bv->bv_len; + } + voff = resid; - __bio_clone(tmp, &bp->bio1); + /* ...and the last affected segment */ - *next = &bp->bio2; - } else { - __bio_clone(tmp, old_chain); - *next = old_chain->bi_next; - } + resid += len; + __bio_for_each_segment(bv, bio_src, end_idx, idx) { + if (resid <= bv->bv_len) + break; + resid -= bv->bv_len; + } + vcnt = end_idx - idx + 1; + + /* Build the clone */ + + bio = bio_alloc(gfpmask, (unsigned int) vcnt); + if (!bio) + return NULL; /* ENOMEM */ - tmp->bi_bdev = NULL; - tmp->bi_next = NULL; - if (new_chain) - tail->bi_next = tmp; - else - new_chain = tmp; - tail = tmp; - old_chain = old_chain->bi_next; + bio->bi_bdev = bio_src->bi_bdev; + bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); + bio->bi_rw = bio_src->bi_rw; + bio->bi_flags |= 1 << BIO_CLONED; - total += tmp->bi_size; + /* + * Copy over our part of the bio_vec, then update the first + * and last (or only) entries. + */ + memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], + vcnt * sizeof (struct bio_vec)); + bio->bi_io_vec[0].bv_offset += voff; + if (vcnt > 1) { + bio->bi_io_vec[0].bv_len -= voff; + bio->bi_io_vec[vcnt - 1].bv_len = resid; + } else { + bio->bi_io_vec[0].bv_len = len; } - rbd_assert(total == len); + bio->bi_vcnt = vcnt; + bio->bi_size = len; + bio->bi_idx = 0; + + return bio; +} + +/* + * Clone a portion of a bio chain, starting at the given byte offset + * into the first bio in the source chain and continuing for the + * number of bytes indicated. The result is another bio chain of + * exactly the given length, or a null pointer on error. + * + * The bio_src and offset parameters are both in-out. On entry they + * refer to the first source bio and the offset into that bio where + * the start of data to be cloned is located. + * + * On return, bio_src is updated to refer to the bio in the source + * chain that contains first un-cloned byte, and *offset will + * contain the offset of that byte within that bio. + */ +static struct bio *bio_chain_clone_range(struct bio **bio_src, + unsigned int *offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio *bi = *bio_src; + unsigned int off = *offset; + struct bio *chain = NULL; + struct bio **end; + + /* Build up a chain of clone bios up to the limit */ + + if (!bi || off >= bi->bi_size || !len) + return NULL; /* Nothing to clone */ - *old = old_chain; + end = &chain; + while (len) { + unsigned int bi_size; + struct bio *bio; + + if (!bi) + goto out_err; /* EINVAL; ran out of bio's */ + bi_size = min_t(unsigned int, bi->bi_size - off, len); + bio = bio_clone_range(bi, off, bi_size, gfpmask); + if (!bio) + goto out_err; /* ENOMEM */ + + *end = bio; + end = &bio->bi_next; + + off += bi_size; + if (off == bi->bi_size) { + bi = bi->bi_next; + off = 0; + } + len -= bi_size; + } + *bio_src = bi; + *offset = off; - return new_chain; + return chain; +out_err: + bio_chain_put(chain); -err_out: - dout("bio_chain_clone with err\n"); - bio_chain_put(new_chain); return NULL; } @@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq, req_data->coll_index = coll_index; } - dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, - (unsigned long long) ofs, (unsigned long long) len); + dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", + object_name, (unsigned long long) ofs, + (unsigned long long) len, coll, coll_index); osdc = &rbd_dev->rbd_client->client->osdc; req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, @@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q) { struct rbd_device *rbd_dev = q->queuedata; struct request *rq; - struct bio_pair *bp = NULL; while ((rq = blk_fetch_request(q))) { struct bio *bio; - struct bio *rq_bio, *next_bio = NULL; bool do_write; unsigned int size; - u64 op_size = 0; u64 ofs; int num_segs, cur_seg = 0; struct rbd_req_coll *coll; struct ceph_snap_context *snapc; + unsigned int bio_offset; dout("fetched request\n"); @@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q) /* deduce our operation (read, write) */ do_write = (rq_data_dir(rq) == WRITE); - - size = blk_rq_bytes(rq); - ofs = blk_rq_pos(rq) * SECTOR_SIZE; - rq_bio = rq->bio; if (do_write && rbd_dev->mapping.read_only) { __blk_end_request_all(rq, -EROFS); continue; @@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q) up_read(&rbd_dev->header_rwsem); + size = blk_rq_bytes(rq); + ofs = blk_rq_pos(rq) * SECTOR_SIZE; + bio = rq->bio; + dout("%s 0x%x bytes at 0x%llx\n", do_write ? "write" : "read", size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); @@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q) continue; } + bio_offset = 0; do { - /* a bio clone to be passed down to OSD req */ + u64 limit = rbd_segment_length(rbd_dev, ofs, size); + unsigned int chain_size; + struct bio *bio_chain; + + BUG_ON(limit > (u64) UINT_MAX); + chain_size = (unsigned int) limit; dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); - op_size = rbd_segment_length(rbd_dev, ofs, size); + kref_get(&coll->kref); - bio = bio_chain_clone(&rq_bio, &next_bio, &bp, - op_size, GFP_ATOMIC); - if (bio) + + /* Pass a cloned bio chain via an osd request */ + + bio_chain = bio_chain_clone_range(&bio, + &bio_offset, chain_size, + GFP_ATOMIC); + if (bio_chain) (void) rbd_do_op(rq, rbd_dev, snapc, - ofs, op_size, - bio, coll, cur_seg); + ofs, chain_size, + bio_chain, coll, cur_seg); else rbd_coll_end_req_index(rq, coll, cur_seg, - -ENOMEM, op_size); - size -= op_size; - ofs += op_size; + -ENOMEM, chain_size); + size -= chain_size; + ofs += chain_size; cur_seg++; - rq_bio = next_bio; } while (size > 0); kref_put(&coll->kref, rbd_coll_release); - if (bp) - bio_pair_release(bp); spin_lock_irq(q->queue_lock); ceph_put_snap_context(snapc); @@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q) /* * a queue callback. Makes sure that we don't create a bio that spans across * multiple osd objects. One exception would be with a single page bios, - * which we handle later at bio_chain_clone + * which we handle later at bio_chain_clone_range() */ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, struct bio_vec *bvec) -- cgit v1.2.3 From 41f38c2b2f8b66b176a0e548ef06294343a7bfa2 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 25 Oct 2012 23:34:40 -0500 Subject: rbd: remove snapshots on error in rbd_add() If rbd_dev_snaps_update() has ever been called for an rbd device structure there could be snapshot structures on its snaps list. In rbd_add(), this function is called but a subsequent error path neglected to clean up any of these snapshots. Add a call to rbd_remove_all_snaps() in the appropriate spot to remedy this. Change a couple of error labels to be a little clearer while there. Drop the leading underscores from the function name; there's nothing special about that function that they might signify. As suggested in review, the leading underscores in __rbd_remove_snap_dev() have been removed as well. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cc06c55875b9..c7681d46cf86 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -228,7 +228,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); static void rbd_dev_release(struct device *dev); -static void __rbd_remove_snap_dev(struct rbd_snap *snap); +static void rbd_remove_snap_dev(struct rbd_snap *snap); static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); @@ -1787,13 +1787,13 @@ static int rbd_read_header(struct rbd_device *rbd_dev, return ret; } -static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) +static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) { struct rbd_snap *snap; struct rbd_snap *next; list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) - __rbd_remove_snap_dev(snap); + rbd_remove_snap_dev(snap); } static void rbd_update_mapping_size(struct rbd_device *rbd_dev) @@ -2146,7 +2146,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap) return ret; } -static void __rbd_remove_snap_dev(struct rbd_snap *snap) +static void rbd_remove_snap_dev(struct rbd_snap *snap) { list_del(&snap->node); if (device_is_registered(&snap->dev)) @@ -2569,7 +2569,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) if (rbd_dev->mapping.snap_id == snap->id) rbd_dev->mapping.snap_exists = false; - __rbd_remove_snap_dev(snap); + rbd_remove_snap_dev(snap); dout("%ssnap id %llu has been removed\n", rbd_dev->mapping.snap_id == snap->id ? "mapped " : "", @@ -3179,11 +3179,11 @@ static ssize_t rbd_add(struct bus_type *bus, /* no need to lock here, as rbd_dev is not registered yet */ rc = rbd_dev_snaps_update(rbd_dev); if (rc) - goto err_out_header; + goto err_out_probe; rc = rbd_dev_set_mapping(rbd_dev, snap_name); if (rc) - goto err_out_header; + goto err_out_snaps; /* generate unique id: find highest unique id, add one */ rbd_dev_id_get(rbd_dev); @@ -3247,7 +3247,9 @@ err_out_blkdev: unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); -err_out_header: +err_out_snaps: + rbd_remove_all_snaps(rbd_dev); +err_out_probe: rbd_header_free(&rbd_dev->header); err_out_client: kfree(rbd_dev->header_name); @@ -3345,7 +3347,7 @@ static ssize_t rbd_remove(struct bus_type *bus, goto done; } - __rbd_remove_all_snaps(rbd_dev); + rbd_remove_all_snaps(rbd_dev); rbd_bus_del_dev(rbd_dev); done: -- cgit v1.2.3 From 86992098e7fdb97d01feb51495a952b264a55b7c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 25 Oct 2012 23:34:41 -0500 Subject: rbd: make pool_id a 64 bit value If a format 2 image has a parent, its pool id will be specified using a 64-bit value. Change the pool id we save for an image to match that. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c7681d46cf86..ef82e9091f4d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -197,7 +197,7 @@ struct rbd_device { size_t image_name_len; char *header_name; char *pool_name; - int pool_id; + u64 pool_id; struct ceph_osd_event *watch_event; struct ceph_osd_request *watch_request; @@ -1113,7 +1113,7 @@ static int rbd_do_request(struct request *rq, layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); layout->fl_stripe_count = cpu_to_le32(1); layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); + layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id); ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, req, ops); rbd_assert(ret == 0); @@ -1982,7 +1982,7 @@ static ssize_t rbd_pool_id_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%d\n", rbd_dev->pool_id); + return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id); } static ssize_t rbd_name_show(struct device *dev, @@ -3170,7 +3170,7 @@ static ssize_t rbd_add(struct bus_type *bus, rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); if (rc < 0) goto err_out_client; - rbd_dev->pool_id = rc; + rbd_dev->pool_id = (u64) rc; rc = rbd_dev_probe(rbd_dev); if (rc < 0) -- cgit v1.2.3 From 971f839a7670197366c04e99472943532caeb0dc Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 25 Oct 2012 23:34:41 -0500 Subject: rbd: move snap info out of rbd_mapping struct Moving the snap_id and snap_name fields into the separate rbd_mapping structure was misguided. (And in time, perhaps we'll do away with that structure altogether...) Move these fields back into struct rbd_device. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ef82e9091f4d..7d28ce33056f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -166,8 +166,6 @@ struct rbd_snap { }; struct rbd_mapping { - char *snap_name; - u64 snap_id; u64 size; u64 features; bool snap_exists; @@ -199,6 +197,9 @@ struct rbd_device { char *pool_name; u64 pool_id; + char *snap_name; + u64 snap_id; + struct ceph_osd_event *watch_event; struct ceph_osd_request *watch_request; @@ -669,7 +670,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) list_for_each_entry(snap, &rbd_dev->snaps, node) { if (!strcmp(snap_name, snap->name)) { - rbd_dev->mapping.snap_id = snap->id; + rbd_dev->snap_id = snap->id; rbd_dev->mapping.size = snap->size; rbd_dev->mapping.features = snap->features; @@ -686,7 +687,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, sizeof (RBD_SNAP_HEAD_NAME))) { - rbd_dev->mapping.snap_id = CEPH_NOSNAP; + rbd_dev->snap_id = CEPH_NOSNAP; rbd_dev->mapping.size = rbd_dev->header.image_size; rbd_dev->mapping.features = rbd_dev->header.features; rbd_dev->mapping.snap_exists = false; @@ -698,7 +699,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) rbd_dev->mapping.snap_exists = true; rbd_dev->mapping.read_only = true; } - rbd_dev->mapping.snap_name = snap_name; + rbd_dev->snap_name = snap_name; done: return ret; } @@ -1278,7 +1279,7 @@ static int rbd_do_op(struct request *rq, opcode = CEPH_OSD_OP_READ; flags = CEPH_OSD_FLAG_READ; snapc = NULL; - snapid = rbd_dev->mapping.snap_id; + snapid = rbd_dev->snap_id; payload_len = 0; } @@ -1561,7 +1562,7 @@ static void rbd_rq_fn(struct request_queue *q) down_read(&rbd_dev->header_rwsem); - if (rbd_dev->mapping.snap_id != CEPH_NOSNAP && + if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->mapping.snap_exists) { up_read(&rbd_dev->header_rwsem); dout("request for non-existent snapshot"); @@ -1800,7 +1801,7 @@ static void rbd_update_mapping_size(struct rbd_device *rbd_dev) { sector_t size; - if (rbd_dev->mapping.snap_id != CEPH_NOSNAP) + if (rbd_dev->snap_id != CEPH_NOSNAP) return; size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; @@ -2011,7 +2012,7 @@ static ssize_t rbd_snap_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name); + return sprintf(buf, "%s\n", rbd_dev->snap_name); } static ssize_t rbd_image_refresh(struct device *dev, @@ -2567,12 +2568,11 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) /* Existing snapshot not in the new snap context */ - if (rbd_dev->mapping.snap_id == snap->id) + if (rbd_dev->snap_id == snap->id) rbd_dev->mapping.snap_exists = false; rbd_remove_snap_dev(snap); dout("%ssnap id %llu has been removed\n", - rbd_dev->mapping.snap_id == snap->id ? - "mapped " : "", + rbd_dev->snap_id == snap->id ? "mapped " : "", (unsigned long long) snap->id); /* Done with this list entry; advance */ @@ -3256,7 +3256,7 @@ err_out_client: rbd_put_client(rbd_dev); kfree(rbd_dev->image_id); err_out_args: - kfree(rbd_dev->mapping.snap_name); + kfree(rbd_dev->snap_name); kfree(rbd_dev->image_name); kfree(rbd_dev->pool_name); err_out_mem: @@ -3309,7 +3309,7 @@ static void rbd_dev_release(struct device *dev) rbd_header_free(&rbd_dev->header); /* done with the id, and with the rbd_dev */ - kfree(rbd_dev->mapping.snap_name); + kfree(rbd_dev->snap_name); kfree(rbd_dev->image_id); kfree(rbd_dev->header_name); kfree(rbd_dev->pool_name); -- cgit v1.2.3 From daba5fdb4c469838dcee4b8dd4fecf7be69fa218 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 26 Oct 2012 17:25:23 -0500 Subject: rbd: rename snap_exists field A Boolean field "snap_exists" in an rbd mapping is used to indicate whether a mapped snapshot has been removed from an image's snapshot context, to stop sending requests for that snapshot as soon as we know it's gone. Generalize the interpretation of this field so it applies to non-snapshot (i.e. "head") mappings. That is, define its value to be false until the mapping has been set, and then define it to be true for both snapshot mappings or head mappings. Rename the field "exists" to reflect the broader interpretation. The rbd_mapping structure