diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2024-01-10 10:20:08 -0800 |
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2024-01-10 10:20:08 -0800 |
| commit | 49f4810356f7d4294ad63dc70fe3c65ca3b8ada9 (patch) | |
| tree | 58e6628e4f4154c39da80637910b469ffdab55dd /net | |
| parent | d8c8e595dc31fb639bc4f8a202901afaa15bb13f (diff) | |
| parent | 17419aefcbfd9891863e8b8132f0bca9a6b2984e (diff) | |
Merge tag 'nfsd-6.8' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
Pull nfsd updates from Chuck Lever:
"The bulk of the patches for this release are clean-ups and minor bug
fixes.
There is one significant revert to mention: support for RDMA Read
operations in the server's RPC-over-RDMA transport implementation has
been fixed so it waits for Read completion in a way that avoids tying
up an nfsd thread. This prevents a possible DoS vector if an
RPC-over-RDMA client should become unresponsive during RDMA Read
operations.
As always I am grateful to NFSD contributors, reviewers, and testers"
* tag 'nfsd-6.8' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux: (56 commits)
nfsd: rename nfsd_last_thread() to nfsd_destroy_serv()
SUNRPC: discard sv_refcnt, and svc_get/svc_put
svc: don't hold reference for poolstats, only mutex.
SUNRPC: remove printk when back channel request not found
svcrdma: Implement multi-stage Read completion again
svcrdma: Copy construction of svc_rqst::rq_arg to rdma_read_complete()
svcrdma: Add back svcxprt_rdma::sc_read_complete_q
svcrdma: Add back svc_rdma_recv_ctxt::rc_pages
svcrdma: Clean up comment in svc_rdma_accept()
svcrdma: Remove queue-shortening warnings
svcrdma: Remove pointer addresses shown in dprintk()
svcrdma: Optimize svc_rdma_cc_init()
svcrdma: De-duplicate completion ID initialization helpers
svcrdma: Move the svc_rdma_cc_init() call
svcrdma: Remove struct svc_rdma_read_info
svcrdma: Update the synopsis of svc_rdma_read_special()
svcrdma: Update the synopsis of svc_rdma_read_call_chunk()
svcrdma: Update synopsis of svc_rdma_read_multiple_chunks()
svcrdma: Update synopsis of svc_rdma_copy_inline_range()
svcrdma: Update the synopsis of svc_rdma_read_data_item()
...
Diffstat (limited to 'net')
| -rw-r--r-- | net/sunrpc/auth_gss/svcauth_gss.c | 16 | ||||
| -rw-r--r-- | net/sunrpc/svc.c | 15 | ||||
| -rw-r--r-- | net/sunrpc/svc_xprt.c | 32 | ||||
| -rw-r--r-- | net/sunrpc/svcauth.c | 16 | ||||
| -rw-r--r-- | net/sunrpc/svcsock.c | 14 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 32 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 211 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 450 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 96 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 36 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 2 |
12 files changed, 521 insertions, 410 deletions
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c index 18734e70c5dd..24de94184700 100644 --- a/net/sunrpc/auth_gss/svcauth_gss.c +++ b/net/sunrpc/auth_gss/svcauth_gss.c @@ -866,14 +866,6 @@ svcauth_gss_unwrap_integ(struct svc_rqst *rqstp, u32 seq, struct gss_ctx *ctx) struct xdr_buf databody_integ; struct xdr_netobj checksum; - /* NFS READ normally uses splice to send data in-place. However - * the data in cache can change after the reply's MIC is computed - * but before the RPC reply is sent. To prevent the client from - * rejecting the server-computed MIC in this somewhat rare case, - * do not use splice with the GSS integrity service. - */ - clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags); - /* Did we already verify the signature on the original pass through? */ if (rqstp->rq_deferred) return 0; @@ -948,8 +940,6 @@ svcauth_gss_unwrap_priv(struct svc_rqst *rqstp, u32 seq, struct gss_ctx *ctx) struct xdr_buf *buf = xdr->buf; unsigned int saved_len; - clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags); - if (xdr_stream_decode_u32(xdr, &len) < 0) goto unwrap_failed; if (rqstp->rq_deferred) { @@ -2014,6 +2004,11 @@ svcauth_gss_domain_release(struct auth_domain *dom) call_rcu(&dom->rcu_head, svcauth_gss_domain_release_rcu); } +static rpc_authflavor_t svcauth_gss_pseudoflavor(struct svc_rqst *rqstp) +{ + return svcauth_gss_flavor(rqstp->rq_gssclient); +} + static struct auth_ops svcauthops_gss = { .name = "rpcsec_gss", .owner = THIS_MODULE, @@ -2022,6 +2017,7 @@ static struct auth_ops svcauthops_gss = { .release = svcauth_gss_release, .domain_release = svcauth_gss_domain_release, .set_client = svcauth_gss_set_client, + .pseudoflavor = svcauth_gss_pseudoflavor, }; static int rsi_cache_create_net(struct net *net) diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 3f2ea7a0496f..eb5856e1351d 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -463,7 +463,6 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, return NULL; serv->sv_name = prog->pg_name; serv->sv_program = prog; - kref_init(&serv->sv_refcnt); serv->sv_stats = prog->pg_stats; if (bufsize > RPCSVC_MAXPAYLOAD) bufsize = RPCSVC_MAXPAYLOAD; @@ -564,11 +563,13 @@ EXPORT_SYMBOL_GPL(svc_create_pooled); * protect sv_permsocks and sv_tempsocks. */ void -svc_destroy(struct kref *ref) +svc_destroy(struct svc_serv **servp) { - struct svc_serv *serv = container_of(ref, struct svc_serv, sv_refcnt); + struct svc_serv *serv = *servp; unsigned int i; + *servp = NULL; + dprintk("svc: svc_destroy(%s)\n", serv->sv_program->pg_name); timer_shutdown_sync(&serv->sv_temptimer); @@ -675,7 +676,6 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) if (!rqstp) return ERR_PTR(-ENOMEM); - svc_get(serv); spin_lock_bh(&serv->sv_lock); serv->sv_nrthreads += 1; spin_unlock_bh(&serv->sv_lock); @@ -935,11 +935,6 @@ svc_exit_thread(struct svc_rqst *rqstp) svc_rqst_free(rqstp); - svc_put(serv); - /* That svc_put() cannot be the last, because the thread - * waiting for SP_VICTIM_REMAINS to clear must hold - * a reference. So it is still safe to access pool. - */ clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags); } EXPORT_SYMBOL_GPL(svc_exit_thread); @@ -1305,8 +1300,6 @@ svc_process_common(struct svc_rqst *rqstp) int rc; __be32 *p; - /* Will be turned off by GSS integrity and privacy services */ - set_bit(RQ_SPLICE_OK, &rqstp->rq_flags); /* Will be turned off only when NFSv4 Sessions are used */ set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags); clear_bit(RQ_DROPME, &rqstp->rq_flags); diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 1b71055fc391..b4a85a227bd7 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -1362,29 +1362,36 @@ int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) } EXPORT_SYMBOL_GPL(svc_xprt_names); - /*----------------------------------------------------------------------------*/ static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) { unsigned int pidx = (unsigned int)*pos; - struct svc_serv *serv = m->private; + struct svc_info *si = m->private; dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); + mutex_lock(si->mutex); + if (!pidx) return SEQ_START_TOKEN; - return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); + if (!si->serv) + return NULL; + return pidx > si->serv->sv_nrpools ? NULL + : &si->serv->sv_pools[pidx - 1]; } static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) { struct svc_pool *pool = p; - struct svc_serv *serv = m->private; + struct svc_info *si = m->private; + struct svc_serv *serv = si->serv; dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); - if (p == SEQ_START_TOKEN) { + if (!serv) { + pool = NULL; + } else if (p == SEQ_START_TOKEN) { pool = &serv->sv_pools[0]; } else { unsigned int pidx = (pool - &serv->sv_pools[0]); @@ -1399,6 +1406,9 @@ static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) static void svc_pool_stats_stop(struct seq_file *m, void *p) { + struct svc_info *si = m->private; + + mutex_unlock(si->mutex); } static int svc_pool_stats_show(struct seq_file *m, void *p) @@ -1426,14 +1436,18 @@ static const struct seq_operations svc_pool_stats_seq_ops = { .show = svc_pool_stats_show, }; -int svc_pool_stats_open(struct svc_serv *serv, struct file *file) +int svc_pool_stats_open(struct svc_info *info, struct file *file) { + struct seq_file *seq; int err; err = seq_open(file, &svc_pool_stats_seq_ops); - if (!err) - ((struct seq_file *) file->private_data)->private = serv; - return err; + if (err) + return err; + seq = file->private_data; + seq->private = info; + + return 0; } EXPORT_SYMBOL(svc_pool_stats_open); diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c index aa4429d0b810..1619211f0960 100644 --- a/net/sunrpc/svcauth.c +++ b/net/sunrpc/svcauth.c @@ -160,6 +160,22 @@ svc_auth_unregister(rpc_authflavor_t flavor) } EXPORT_SYMBOL_GPL(svc_auth_unregister); +/** + * svc_auth_flavor - return RPC transaction's RPC_AUTH flavor + * @rqstp: RPC transaction context + * + * Returns an RPC flavor or GSS pseudoflavor. + */ +rpc_authflavor_t svc_auth_flavor(struct svc_rqst *rqstp) +{ + struct auth_ops *aops = rqstp->rq_authop; + + if (!aops->pseudoflavor) + return aops->flavour; + return aops->pseudoflavor(rqstp); +} +EXPORT_SYMBOL_GPL(svc_auth_flavor); + /************************************************** * 'auth_domains' are stored in a hash table indexed by name. * When the last reference to an 'auth_domain' is dropped, diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 998687421fa6..bfb2f78523a8 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1049,18 +1049,14 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) struct rpc_rqst *req = NULL; struct kvec *src, *dst; __be32 *p = (__be32 *)rqstp->rq_arg.head[0].iov_base; - __be32 xid; - __be32 calldir; - - xid = *p++; - calldir = *p; + __be32 xid = *p; if (!bc_xprt) return -EAGAIN; spin_lock(&bc_xprt->queue_lock); req = xprt_lookup_rqst(bc_xprt, xid); if (!req) - goto unlock_notfound; + goto unlock_eagain; memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf)); /* @@ -1077,12 +1073,6 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) rqstp->rq_arg.len = 0; spin_unlock(&bc_xprt->queue_lock); return 0; -unlock_notfound: - printk(KERN_NOTICE - "%s: Got unrecognized reply: " - "calldir 0x%x xpt_bc_xprt %p xid %08x\n", - __func__, ntohl(calldir), - bc_xprt, ntohl(xid)); unlock_eagain: spin_unlock(&bc_xprt->queue_lock); return -EAGAIN; diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index f0d5eeed4c88..f86970733eb0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -256,28 +256,44 @@ out_err: return rc; } +struct workqueue_struct *svcrdma_wq; + void svc_rdma_cleanup(void) { - dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); svc_unreg_xprt_class(&svc_rdma_class); svc_rdma_proc_cleanup(); + if (svcrdma_wq) { + struct workqueue_struct *wq = svcrdma_wq; + + svcrdma_wq = NULL; + destroy_workqueue(wq); + } + + dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); } int svc_rdma_init(void) { + struct workqueue_struct *wq; int rc; - dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); - dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); - dprintk("\tmax_requests : %u\n", svcrdma_max_requests); - dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); - dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); + wq = alloc_workqueue("svcrdma", WQ_UNBOUND, 0); + if (!wq) + return -ENOMEM; rc = svc_rdma_proc_init(); - if (rc) + if (rc) { + destroy_workqueue(wq); return rc; + } - /* Register RDMA with the SVC transport switch */ + svcrdma_wq = wq; svc_reg_xprt_class(&svc_rdma_class); + + dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); + dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); + dprintk("\tmax_requests : %u\n", svcrdma_max_requests); + dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); + dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 7420a2c990c7..c9be6778643b 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -76,15 +76,12 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst, struct svc_rdma_send_ctxt *sctxt) { - struct svc_rdma_recv_ctxt *rctxt; + struct svc_rdma_pcl empty_pcl; int ret; - rctxt = svc_rdma_recv_ctxt_get(rdma); - if (!rctxt) - return -EIO; - - ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf); - svc_rdma_recv_ctxt_put(rdma, rctxt); + pcl_init(&empty_pcl); + ret = svc_rdma_map_reply_msg(rdma, sctxt, &empty_pcl, &empty_pcl, + &rqst->rq_snd_buf); if (ret < 0) return -EIO; diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 3b05f90a3e50..d72953f29258 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -115,13 +115,6 @@ svc_rdma_next_recv_ctxt(struct list_head *list) rc_list); } -static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma, - struct rpc_rdma_cid *cid) -{ - cid->ci_queue_id = rdma->sc_rq_cq->res.id; - cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); -} - static struct svc_rdma_recv_ctxt * svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) { @@ -130,7 +123,7 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) dma_addr_t addr; void *buffer; - ctxt = kmalloc_node(sizeof(*ctxt), GFP_KERNEL, node); + ctxt = kzalloc_node(sizeof(*ctxt), GFP_KERNEL, node); if (!ctxt) goto fail0; buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node); @@ -156,6 +149,7 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) ctxt->rc_recv_sge.length = rdma->sc_max_req_size; ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey; ctxt->rc_recv_buf = buffer; + svc_rdma_cc_init(rdma, &ctxt->rc_cc); return ctxt; fail2: @@ -204,18 +198,11 @@ struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) node = llist_del_first(&rdma->sc_recv_ctxts); if (!node) - goto out_empty; - ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); + return NULL; -out: + ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); ctxt->rc_page_count = 0; return ctxt; - -out_empty: - ctxt = svc_rdma_recv_ctxt_alloc(rdma); - if (!ctxt) - return NULL; - goto out; } /** @@ -227,6 +214,13 @@ out_empty: void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, struct svc_rdma_recv_ctxt *ctxt) { + svc_rdma_cc_release(rdma, &ctxt->rc_cc, DMA_FROM_DEVICE); + + /* @rc_page_count is normally zero here, but error flows + * can leave pages in @rc_pages. + */ + release_pages(ctxt->rc_pages, ctxt->rc_page_count); + pcl_free(&ctxt->rc_call_pcl); pcl_free(&ctxt->rc_read_pcl); pcl_free(&ctxt->rc_write_pcl); @@ -271,13 +265,13 @@ static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma, if (!ctxt) break; - trace_svcrdma_post_recv(ctxt); + trace_svcrdma_post_recv(&ctxt->rc_cid); ctxt->rc_recv_wr.next = recv_chain; recv_chain = &ctxt->rc_recv_wr; rdma->sc_pending_recvs++; } if (!recv_chain) - return false; + return true; ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr); if (ret) @@ -301,10 +295,27 @@ err_free: * svc_rdma_post_recvs - Post initial set of Recv WRs * @rdma: fresh svcxprt_rdma * - * Returns true if successful, otherwise false. + * Return values: + * %true: Receive Queue initialization successful + * %false: memory allocation or DMA error */ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) { + unsigned int total; + + /* For each credit, allocate enough recv_ctxts for one + * posted Receive and one RPC in process. + */ + total = (rdma->sc_max_requests * 2) + rdma->sc_recv_batch; + while (total--) { + struct svc_rdma_recv_ctxt *ctxt; + + ctxt = svc_rdma_recv_ctxt_alloc(rdma); + if (!ctxt) + return false; + llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); + } + return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests); } @@ -373,6 +384,10 @@ void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) { struct svc_rdma_recv_ctxt *ctxt; + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt); + } while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { list_del(&ctxt->rc_list); svc_rdma_recv_ctxt_put(rdma, ctxt); @@ -754,6 +769,122 @@ static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt, return true; } +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_MSG type message + * with a single Read chunk (only the upper layer data payload + * was conveyed via RDMA Read). + */ +static void svc_rdma_read_complete_one(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct svc_rdma_chunk *chunk = pcl_first_chunk(&ctxt->rc_read_pcl); + struct xdr_buf *buf = &rqstp->rq_arg; + unsigned int length; + + /* Split the Receive buffer between the head and tail + * buffers at Read chunk's position. XDR roundup of the + * chunk is not included in either the pagelist or in + * the tail. + */ + buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; + buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; + buf->head[0].iov_len = chunk->ch_position; + + /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). + * + * If the client already rounded up the chunk length, the + * length does not change. Otherwise, the length of the page + * list is increased to include XDR round-up. + * + * Currently these chunks always start at page offset 0, + * thus the rounded-up length never crosses a page boundary. + */ + buf->pages = &rqstp->rq_pages[0]; + length = xdr_align_size(chunk->ch_length); + buf->page_len = length; + buf->len += length; + buf->buflen += length; +} + +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_MSG type message + * with payload in multiple Read chunks and no PZRC. + */ +static void svc_rdma_read_complete_multiple(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct xdr_buf *buf = &rqstp->rq_arg; + + buf->len += ctxt->rc_readbytes; + buf->buflen += ctxt->rc_readbytes; + + buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); + buf->pages = &rqstp->rq_pages[1]; + buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; +} + +/* Finish constructing the RPC Call message in rqstp::rq_arg. + * + * The incoming RPC/RDMA message is an RDMA_NOMSG type message + * (the RPC message body was conveyed via RDMA Read). + */ +static void svc_rdma_read_complete_pzrc(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct xdr_buf *buf = &rqstp->rq_arg; + + buf->len += ctxt->rc_readbytes; + buf->buflen += ctxt->rc_readbytes; + + buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); + buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); + buf->pages = &rqstp->rq_pages[1]; + buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; +} + +static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + unsigned int i; + + /* Transfer the Read chunk pages into @rqstp.rq_pages, replacing + * the rq_pages that were already allocated for this rqstp. + */ + release_pages(rqstp->rq_respages, ctxt->rc_page_count); + for (i = 0; i < ctxt->rc_page_count; i++) + rqstp->rq_pages[i] = ctxt->rc_pages[i]; + + /* Update @rqstp's result send buffer to start after the + * last page in the RDMA Read payload. + */ + rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + + /* Prevent svc_rdma_recv_ctxt_put() from releasing the + * pages in ctxt::rc_pages a second time. + */ + ctxt->rc_page_count = 0; + + /* Finish constructing the RPC Call message. The exact + * procedure for that depends on what kind of RPC/RDMA + * chunks were provided by the client. + */ + rqstp->rq_arg = ctxt->rc_saved_arg; + if (pcl_is_empty(&ctxt->rc_call_pcl)) { + if (ctxt->rc_read_pcl.cl_count == 1) + svc_rdma_read_complete_one(rqstp, ctxt); + else + svc_rdma_read_complete_multiple(rqstp, ctxt); + } else { + svc_rdma_read_complete_pzrc(rqstp, ctxt); + } + + trace_svcrdma_read_finished(&ctxt->rc_cid); +} + /** * svc_rdma_recvfrom - Receive an RPC call * @rqstp: request structure into which to receive an RPC Call @@ -798,8 +929,15 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) rqstp->rq_xprt_ctxt = NULL; - ctxt = NULL; spin_lock(&rdma_xprt->sc_rq_dto_lock); + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); + if (ctxt) { + list_del(&ctxt->rc_list); + spin_unlock(&rdma_xprt->sc_rq_dto_lock); + svc_xprt_received(xprt); + svc_rdma_read_complete(rqstp, ctxt); + goto complete; + } ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); if (ctxt) list_del(&ctxt->rc_list); @@ -831,12 +969,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) svc_rdma_get_inv_rkey(rdma_xprt, ctxt); if (!pcl_is_empty(&ctxt->rc_read_pcl) || - !pcl_is_empty(&ctxt->rc_call_pcl)) { - ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); - if (ret < 0) - goto out_readfail; - } + !pcl_is_empty(&ctxt->rc_call_pcl)) + goto out_readlist; +complete: rqstp->rq_xprt_ctxt = ctxt; rqstp->rq_prot = IPPROTO_MAX; svc_xprt_copy_addrs(rqstp, xprt); @@ -848,12 +984,23 @@ out_err: svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return 0; -out_readfail: - if (ret == -EINVAL) - svc_rdma_send_error(rdma_xprt, ctxt, ret); - svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); - svc_xprt_deferred_close(xprt); - return -ENOTCONN; +out_readlist: + /* This @rqstp is about to be recycled. Save the work + * already done constructing the Call message in rq_arg + * so it can be restored when the RDMA Reads have + * completed. + */ + ctxt->rc_saved_arg = rqstp->rq_arg; + + ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); + if (ret < 0) { + if (ret == -EINVAL) + svc_rdma_send_error(rdma_xprt, ctxt, ret); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); + svc_xprt_deferred_close(xprt); + return ret; + } + return 0; out_backchannel: svc_rdma_handle_bc_reply(rqstp, ctxt); diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index e460e25a1d6d..c00fcce61d1e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -39,6 +39,7 @@ struct svc_rdma_rw_ctxt { struct list_head rw_list; struct rdma_rw_ctx rw_ctx; unsigned int rw_nents; + unsigned int rw_first_sgl_nents; struct sg_table rw_sg_table; struct scatterlist rw_first_sgl[]; }; @@ -53,6 +54,8 @@ svc_rdma_next_ctxt(struct list_head *list) static struct svc_rdma_rw_ctxt * svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) { + struct ib_device *dev = rdma->sc_cm_id->device; + unsigned int first_sgl_nents = dev->attrs.max_send_sge; struct svc_rdma_rw_ctxt *ctxt; struct llist_node *node; @@ -62,32 +65,33 @@ svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) if (node) { ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); } else { - ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), - GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); + ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents), + GFP_KERNEL, ibdev_to_node(dev)); if (!ctxt) goto out_noctx; INIT_LIST_HEAD(&ctxt->rw_list); + ctxt->rw_first_sgl_nents = first_sgl_nents; } ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, ctxt->rw_sg_table.sgl, - SG_CHUNK_SIZE)) + first_sgl_nents)) goto out_free; return ctxt; out_free: kfree(ctxt); out_noctx: - trace_svcrdma_no_rwctx_err(rdma, sges); + trace_svcrdma_rwctx_empty(rdma, sges); return NULL; } static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, struct llist_head *list) { - sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); + sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents); llist_add(&ctxt->rw_node, list); } @@ -135,57 +139,40 @@ static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, ctxt->rw_sg_table.sgl, ctxt->rw_nents, 0, offset, handle, direction); if (unlikely(ret < 0)) { + trace_svcrdma_dma_map_rw_err(rdma, offset, handle, + ctxt->rw_nents, ret); svc_rdma_put_rw_ctxt(rdma, ctxt); - trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret); } return ret; } -/* A chunk context tracks all I/O for moving one Read or Write - * chunk. This is a set of rdma_rw's that handle data movement - * for all segments of one chunk. - * - * These are small, acquired with a single allocator call, and - * no more than one is needed per chunk. They are allocated on - * demand, and not cached. +/** + * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt + * @rdma: controlling transport instance + * @cc: svc_rdma_chunk_ctxt to be initialized */ -struct svc_rdma_chunk_ctxt { - struct rpc_rdma_cid cc_cid; - struct ib_cqe cc_cqe; - struct svcxprt_rdma *cc_rdma; - struct list_head cc_rwctxts; - ktime_t cc_posttime; - int cc_sqecount; - enum ib_wc_status cc_status; - struct completion cc_done; -}; - -static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma, - struct rpc_rdma_cid *cid) +void svc_rdma_cc_init(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc) { - cid->ci_queue_id = rdma->sc_sq_cq->res.id; - cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); -} + struct rpc_rdma_cid *cid = &cc->cc_cid; -static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, - struct svc_rdma_chunk_ctxt *cc) -{ - svc_rdma_cc_cid_init(rdma, &cc->cc_cid); - cc->cc_rdma = rdma; + if (unlikely(!cid->ci_completion_id)) + svc_rdma_send_cid_init(rdma, cid); INIT_LIST_HEAD(&cc->cc_rwctxts); cc->cc_sqecount = 0; } -/* - * The consumed rw_ctx's are cleaned and placed on a local llist so - * that only one atomic llist operation is needed to put them all - * back on the free list. +/** + * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt + * @rdma: controlling transport instance + * @cc: svc_rdma_chunk_ctxt to be released + * @dir: DMA direction */ -static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, - enum dma_data_direction dir) +void svc_rdma_cc_release(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc, + enum dma_data_direction dir) { - struct svcxprt_rdma *rdma = cc->cc_rdma; struct llist_node *first, *last; struct svc_rdma_rw_ctxt *ctxt; LLIST_HEAD(free); @@ -215,6 +202,8 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, * - Stores arguments for the SGL constructor functions */ struct svc_rdma_write_info { + struct svcxprt_rdma *wi_rdma; + const struct svc_rdma_chunk *wi_chunk; /* write state of this chunk */ @@ -227,6 +216,7 @@ struct svc_rdma_write_info { unsigned int wi_next_off; struct svc_rdma_chunk_ctxt wi_cc; + struct work_struct wi_work; }; static struct svc_rdma_write_info * @@ -235,25 +225,33 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, { struct svc_rdma_write_info *info; - info = kmalloc_node(sizeof(*info), GFP_KERNEL, + info = kzalloc_node(sizeof(*info), GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); if (!info) return info; + info->wi_rdma = rdma; info->wi_chunk = chunk; - info->wi_seg_off = 0; - info->wi_seg_no = 0; svc_rdma_cc_init(rdma, &info->wi_cc); info->wi_cc.cc_cqe.done = svc_rdma_write_done; return info; } -static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +static void svc_rdma_write_info_free_async(struct work_struct *work) { - svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); + struct svc_rdma_write_info *info; + + info = container_of(work, struct svc_rdma_write_info, wi_work); + svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE); kfree(info); } +static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +{ + INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async); + queue_work(svcrdma_wq, &info->wi_work); +} + /** * svc_rdma_write_done - Write chunk completion * @cq: controlling Completion Queue @@ -263,16 +261,16 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) */ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) { + struct svcxprt_rdma *rdma = cq->cq_context; struct ib_cqe *cqe = wc->wr_cqe; struct svc_rdma_chunk_ctxt *cc = container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); - struct svcxprt_rdma *rdma = cc->cc_rdma; struct svc_rdma_write_info *info = container_of(cc, struct svc_rdma_write_info, wi_cc); switch (wc->status) { case IB_WC_SUCCESS: - trace_svcrdma_wc_write(wc, &cc->cc_cid); + trace_svcrdma_wc_write(&cc->cc_cid); break; case IB_WC_WR_FLUSH_ERR: trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); @@ -289,39 +287,6 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) svc_rdma_write_info_free(info); } -/* State for pulling a Read chunk. - */ -struct svc_rdma_read_info { - struct svc_rqst *ri_rqst; - struct svc_rdma_recv_ctxt *ri_readctxt; - unsigned int ri_pageno; - unsigned int ri_pageoff; - unsigned int ri_totalbytes; - - struct svc_rdma_chunk_ctxt ri_cc; -}; - -static struct svc_rdma_read_info * -svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) -{ - struct svc_rdma_read_info *info; - - info = kmalloc_node(sizeof(*info), GFP_KERNEL, - ibdev_to_node(rdma->sc_cm_id->device)); - if (!info) - return info; - - svc_rdma_cc_init(rdma, &info->ri_cc); - info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; - return info; -} - -static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) -{ - svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); - kfree(info); -} - /** * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx * @cq: controlling Completion Queue @@ -330,17 +295,27 @@ static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) */ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) { + struct svcxprt_rdma *rdma = cq->cq_context; struct ib_cqe *cqe = wc->wr_cqe; struct svc_rdma_chunk_ctxt *cc = container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); - struct svc_rdma_read_info *info; + struct svc_rdma_recv_ctxt *ctxt; + svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); + + ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc); switch (wc->status) { case IB_WC_SUCCESS: - info = container_of(cc, struct svc_rdma_read_info, ri_cc); - trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes, + trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes, cc->cc_posttime); - break; + + spin_lock(&rdma->sc_rq_dto_lock); + list_a |
