diff options
Diffstat (limited to 'net/sunrpc/svc_xprt.c')
| -rw-r--r-- | net/sunrpc/svc_xprt.c | 987 |
1 files changed, 509 insertions, 478 deletions
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index d16a8b423c20..6973184ff667 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0-only /* * linux/net/sunrpc/svc_xprt.c * @@ -5,9 +6,9 @@ */ #include <linux/sched.h> +#include <linux/sched/mm.h> #include <linux/errno.h> #include <linux/freezer.h> -#include <linux/kthread.h> #include <linux/slab.h> #include <net/sock.h> #include <linux/sunrpc/addr.h> @@ -15,6 +16,7 @@ #include <linux/sunrpc/svc_xprt.h> #include <linux/sunrpc/svcsock.h> #include <linux/sunrpc/xprt.h> +#include <linux/sunrpc/bc_xprt.h> #include <linux/module.h> #include <linux/netdevice.h> #include <trace/events/sunrpc.h> @@ -28,13 +30,13 @@ module_param(svc_rpc_per_connection_limit, uint, 0644); static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); -static void svc_age_temp_xprts(unsigned long closure); +static void svc_age_temp_xprts(struct timer_list *t); static void svc_delete_xprt(struct svc_xprt *xprt); /* apparently the "standard" is that clients close * idle connections after 5 minutes, servers after * 6 minutes - * http://www.connectathon.org/talks96/nfstcp.pdf + * http://nfsv4bat.org/Documents/ConnectAThon/1996/nfstcp.pdf */ static int svc_conn_age_period = 6*60; @@ -44,7 +46,6 @@ static LIST_HEAD(svc_xprt_class_list); /* SMP locking strategy: * - * svc_pool->sp_lock protects most of the fields of that pool. * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. * when both need to be taken (rare), svc_serv->sv_lock is first. * The "service mutex" protects svc_serv->sv_nrthread. @@ -72,13 +73,18 @@ static LIST_HEAD(svc_xprt_class_list); * that no other thread will be using the transport or will * try to set XPT_DEAD. */ + +/** + * svc_reg_xprt_class - Register a server-side RPC transport class + * @xcl: New transport class to be registered + * + * Returns zero on success; otherwise a negative errno is returned. + */ int svc_reg_xprt_class(struct svc_xprt_class *xcl) { struct svc_xprt_class *cl; int res = -EEXIST; - dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); - INIT_LIST_HEAD(&xcl->xcl_list); spin_lock(&svc_xprt_class_lock); /* Make sure there isn't already a class with the same name */ @@ -94,17 +100,30 @@ out: } EXPORT_SYMBOL_GPL(svc_reg_xprt_class); +/** + * svc_unreg_xprt_class - Unregister a server-side RPC transport class + * @xcl: Transport class to be unregistered + * + */ void svc_unreg_xprt_class(struct svc_xprt_class *xcl) { - dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); spin_lock(&svc_xprt_class_lock); list_del_init(&xcl->xcl_list); spin_unlock(&svc_xprt_class_lock); } EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); -/* - * Format the transport list for printing +/** + * svc_print_xprts - Format the transport list for printing + * @buf: target buffer for formatted address + * @maxlen: length of target buffer + * + * Fills in @buf with a string containing a list of transport names, each name + * terminated with '\n'. If the buffer is too small, some entries may be + * missing, but it is guaranteed that all lines in the output buffer are + * complete. + * + * Returns positive length of the filled-in string. */ int svc_print_xprts(char *buf, int maxlen) { @@ -117,9 +136,9 @@ int svc_print_xprts(char *buf, int maxlen) list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { int slen; - sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); - slen = strlen(tmpstr); - if (len + slen > maxlen) + slen = snprintf(tmpstr, sizeof(tmpstr), "%s %d\n", + xcl->xcl_name, xcl->xcl_max_payload); + if (slen >= sizeof(tmpstr) || len + slen >= maxlen) break; len += slen; strcat(buf, tmpstr); @@ -129,6 +148,21 @@ int svc_print_xprts(char *buf, int maxlen) return len; } +/** + * svc_xprt_deferred_close - Close a transport + * @xprt: transport instance + * + * Used in contexts that need to defer the work of shutting down + * the transport to an nfsd thread. + */ +void svc_xprt_deferred_close(struct svc_xprt *xprt) +{ + trace_svc_xprt_close(xprt); + if (!test_and_set_bit(XPT_CLOSE, &xprt->xpt_flags)) + svc_xprt_enqueue(xprt); +} +EXPORT_SYMBOL_GPL(svc_xprt_deferred_close); + static void svc_xprt_free(struct kref *kref) { struct svc_xprt *xprt = @@ -136,12 +170,14 @@ static void svc_xprt_free(struct kref *kref) struct module *owner = xprt->xpt_class->xcl_owner; if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) svcauth_unix_info_release(xprt); - put_net(xprt->xpt_net); + put_cred(xprt->xpt_cred); + put_net_track(xprt->xpt_net, &xprt->ns_tracker); /* See comment on corresponding get in xs_setup_bc_tcp(): */ if (xprt->xpt_bc_xprt) xprt_put(xprt->xpt_bc_xprt); if (xprt->xpt_bc_xps) xprt_switch_put(xprt->xpt_bc_xps); + trace_svc_xprt_free(xprt); xprt->xpt_ops->xpo_free(xprt); module_put(owner); } @@ -165,66 +201,27 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl, kref_init(&xprt->xpt_ref); xprt->xpt_server = serv; INIT_LIST_HEAD(&xprt->xpt_list); - INIT_LIST_HEAD(&xprt->xpt_ready); INIT_LIST_HEAD(&xprt->xpt_deferred); INIT_LIST_HEAD(&xprt->xpt_users); mutex_init(&xprt->xpt_mutex); spin_lock_init(&xprt->xpt_lock); set_bit(XPT_BUSY, &xprt->xpt_flags); - rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); - xprt->xpt_net = get_net(net); + xprt->xpt_net = get_net_track(net, &xprt->ns_tracker, GFP_ATOMIC); + strcpy(xprt->xpt_remotebuf, "uninitialized"); } EXPORT_SYMBOL_GPL(svc_xprt_init); -static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, - struct svc_serv *serv, - struct net *net, - const int family, - const unsigned short port, - int flags) -{ - struct sockaddr_in sin = { - .sin_family = AF_INET, - .sin_addr.s_addr = htonl(INADDR_ANY), - .sin_port = htons(port), - }; -#if IS_ENABLED(CONFIG_IPV6) - struct sockaddr_in6 sin6 = { - .sin6_family = AF_INET6, - .sin6_addr = IN6ADDR_ANY_INIT, - .sin6_port = htons(port), - }; -#endif - struct sockaddr *sap; - size_t len; - - switch (family) { - case PF_INET: - sap = (struct sockaddr *)&sin; - len = sizeof(sin); - break; -#if IS_ENABLED(CONFIG_IPV6) - case PF_INET6: - sap = (struct sockaddr *)&sin6; - len = sizeof(sin6); - break; -#endif - default: - return ERR_PTR(-EAFNOSUPPORT); - } - - return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags); -} - -/* - * svc_xprt_received conditionally queues the transport for processing - * by another thread. The caller must hold the XPT_BUSY bit and must +/** + * svc_xprt_received - start next receiver thread + * @xprt: controlling transport + * + * The caller must hold the XPT_BUSY bit and must * not thereafter touch transport data. * * Note: XPT_DATA only gets cleared when a read-attempt finds no (or * insufficient) data. */ -static void svc_xprt_received(struct svc_xprt *xprt) +void svc_xprt_received(struct svc_xprt *xprt) { if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) { WARN_ONCE(1, "xprt=0x%p already busy!", xprt); @@ -232,14 +229,15 @@ static void svc_xprt_received(struct svc_xprt *xprt) } /* As soon as we clear busy, the xprt could be closed and - * 'put', so we need a reference to call svc_enqueue_xprt with: + * 'put', so we need a reference to call svc_xprt_enqueue with: */ svc_xprt_get(xprt); smp_mb__before_atomic(); clear_bit(XPT_BUSY, &xprt->xpt_flags); - xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt); + svc_xprt_enqueue(xprt); svc_xprt_put(xprt); } +EXPORT_SYMBOL_GPL(svc_xprt_received); void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new) { @@ -250,9 +248,9 @@ void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new) svc_xprt_received(new); } -int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name, - struct net *net, const int family, - const unsigned short port, int flags) +static int _svc_xprt_create(struct svc_serv *serv, const char *xprt_name, + struct net *net, struct sockaddr *sap, + size_t len, int flags, const struct cred *cred) { struct svc_xprt_class *xcl; @@ -268,11 +266,15 @@ int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name, goto err; spin_unlock(&svc_xprt_class_lock); - newxprt = __svc_xpo_create(xcl, serv, net, family, port, flags); + newxprt = xcl->xcl_ops->xpo_create(serv, net, sap, len, flags); if (IS_ERR(newxprt)) { + trace_svc_xprt_create_err(serv->sv_programs->pg_name, + xcl->xcl_name, sap, len, + newxprt); module_put(xcl->xcl_owner); return PTR_ERR(newxprt); } + newxprt->xpt_cred = get_cred(cred); svc_add_new_perm_xprt(serv, newxprt); newport = svc_xprt_local_port(newxprt); return newport; @@ -284,24 +286,95 @@ int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name, return -EPROTONOSUPPORT; } -int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, - struct net *net, const int family, - const unsigned short port, int flags) +/** + * svc_xprt_create_from_sa - Add a new listener to @serv from socket address + * @serv: target RPC service + * @xprt_name: transport class name + * @net: network namespace + * @sap: socket address pointer + * @flags: SVC_SOCK flags + * @cred: credential to bind to this transport + * + * Return local xprt port on success or %-EPROTONOSUPPORT on failure + */ +int svc_xprt_create_from_sa(struct svc_serv *serv, const char *xprt_name, + struct net *net, struct sockaddr *sap, + int flags, const struct cred *cred) { + size_t len; int err; - dprintk("svc: creating transport %s[%d]\n", xprt_name, port); - err = _svc_create_xprt(serv, xprt_name, net, family, port, flags); + switch (sap->sa_family) { + case AF_INET: + len = sizeof(struct sockaddr_in); + break; +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: + len = sizeof(struct sockaddr_in6); + break; +#endif + default: + return -EAFNOSUPPORT; + } + + err = _svc_xprt_create(serv, xprt_name, net, sap, len, flags, cred); if (err == -EPROTONOSUPPORT) { request_module("svc%s", xprt_name); - err = _svc_create_xprt(serv, xprt_name, net, family, port, flags); + err = _svc_xprt_create(serv, xprt_name, net, sap, len, flags, + cred); } - if (err) - dprintk("svc: transport %s not found, err %d\n", - xprt_name, err); + return err; } -EXPORT_SYMBOL_GPL(svc_create_xprt); +EXPORT_SYMBOL_GPL(svc_xprt_create_from_sa); + +/** + * svc_xprt_create - Add a new listener to @serv + * @serv: target RPC service + * @xprt_name: transport class name + * @net: network namespace + * @family: network address family + * @port: listener port + * @flags: SVC_SOCK flags + * @cred: credential to bind to this transport + * + * Return local xprt port on success or %-EPROTONOSUPPORT on failure + */ +int svc_xprt_create(struct svc_serv *serv, const char *xprt_name, + struct net *net, const int family, + const unsigned short port, int flags, + const struct cred *cred) +{ + struct sockaddr_in sin = { + .sin_family = AF_INET, + .sin_addr.s_addr = htonl(INADDR_ANY), + .sin_port = htons(port), + }; +#if IS_ENABLED(CONFIG_IPV6) + struct sockaddr_in6 sin6 = { + .sin6_family = AF_INET6, + .sin6_addr = IN6ADDR_ANY_INIT, + .sin6_port = htons(port), + }; +#endif + struct sockaddr *sap; + + switch (family) { + case PF_INET: + sap = (struct sockaddr *)&sin; + break; +#if IS_ENABLED(CONFIG_IPV6) + case PF_INET6: + sap = (struct sockaddr *)&sin6; + break; +#endif + default: + return -EAFNOSUPPORT; + } + + return svc_xprt_create_from_sa(serv, xprt_name, net, sap, flags, cred); +} +EXPORT_SYMBOL_GPL(svc_xprt_create); /* * Copy the local and remote xprt addresses to the rqstp structure @@ -357,15 +430,32 @@ static void svc_xprt_release_slot(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) { atomic_dec(&xprt->xpt_nr_rqsts); + smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */ svc_xprt_enqueue(xprt); } } -static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) +static bool svc_xprt_ready(struct svc_xprt *xprt) { - if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) + unsigned long xpt_flags; + + /* + * If another cpu has recently updated xpt_flags, + * sk_sock->flags, xpt_reserved, or xpt_nr_rqsts, we need to + * know about it; otherwise it's possible that both that cpu and + * this one could call svc_xprt_enqueue() without either + * svc_xprt_enqueue() recognizing that the conditions below + * are satisfied, and we could stall indefinitely: + */ + smp_rmb(); + xpt_flags = READ_ONCE(xprt->xpt_flags); + + trace_svc_xprt_enqueue(xprt, xpt_flags); + if (xpt_flags & BIT(XPT_BUSY)) + return false; + if (xpt_flags & (BIT(XPT_CONN) | BIT(XPT_CLOSE) | BIT(XPT_HANDSHAKE))) return true; - if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) { + if (xpt_flags & (BIT(XPT_DATA) | BIT(XPT_DEFERRED))) { if (xprt->xpt_ops->xpo_has_wspace(xprt) && svc_xprt_slots_in_range(xprt)) return true; @@ -375,100 +465,33 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) return false; } -void svc_xprt_do_enqueue(struct svc_xprt *xprt) +/** + * svc_xprt_enqueue - Queue a transport on an idle nfsd thread + * @xprt: transport with data pending + * + */ +void svc_xprt_enqueue(struct svc_xprt *xprt) { struct svc_pool *pool; - struct svc_rqst *rqstp = NULL; - int cpu; - bool queued = false; - if (!svc_xprt_has_something_to_do(xprt)) - goto out; + if (!svc_xprt_ready(xprt)) + return; /* Mark transport as busy. It will remain in this state until * the provider calls svc_xprt_received. We update XPT_BUSY * atomically because it also guards against trying to enqueue * the transport twice. */ - if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { - /* Don't enqueue transport while already enqueued */ - dprintk("svc: transport %p busy, not enqueued\n", xprt); - goto out; - } - - cpu = get_cpu(); - pool = svc_pool_for_cpu(xprt->xpt_server, cpu); - - atomic_long_inc(&pool->sp_stats.packets); - -redo_search: - /* find a thread for this xprt */ - rcu_read_lock(); - list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { - /* Do a lockless check first */ - if (test_bit(RQ_BUSY, &rqstp->rq_flags)) - continue; - - /* - * Once the xprt has been queued, it can only be dequeued by - * the task that intends to service it. All we can do at that - * point is to try to wake this thread back up so that it can - * do so. - */ - if (!queued) { - spin_lock_bh(&rqstp->rq_lock); - if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) { - /* already busy, move on... */ - spin_unlock_bh(&rqstp->rq_lock); - continue; - } - - /* this one will do */ - rqstp->rq_xprt = xprt; - svc_xprt_get(xprt); - spin_unlock_bh(&rqstp->rq_lock); - } - rcu_read_unlock(); + if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) + return; - atomic_long_inc(&pool->sp_stats.threads_woken); - wake_up_process(rqstp->rq_task); - put_cpu(); - goto out; - } - rcu_read_unlock(); + pool = svc_pool_for_cpu(xprt->xpt_server); - /* - * We didn't find an idle thread to use, so we need to queue the xprt. - * Do so and then search again. If we find one, we can't hook this one - * up to it directly but we can wake the thread up in the hopes that it - * will pick it up once it searches for a xprt to service. - */ - if (!queued) { - queued = true; - dprintk("svc: transport %p put into queue\n", xprt); - spin_lock_bh(&pool->sp_lock); - list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); - pool->sp_stats.sockets_queued++; - spin_unlock_bh(&pool->sp_lock); - goto redo_search; - } - rqstp = NULL; - put_cpu(); -out: - trace_svc_xprt_do_enqueue(xprt, rqstp); -} -EXPORT_SYMBOL_GPL(svc_xprt_do_enqueue); + percpu_counter_inc(&pool->sp_sockets_queued); + xprt->xpt_qtime = ktime_get(); + lwq_enqueue(&xprt->xpt_ready, &pool->sp_xprts); -/* - * Queue up a transport with data pending. If there are idle nfsd - * processes, wake 'em up. - * - */ -void svc_xprt_enqueue(struct svc_xprt *xprt) -{ - if (test_bit(XPT_BUSY, &xprt->xpt_flags)) - return; - xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt); + svc_pool_wake_idle_thread(pool); } EXPORT_SYMBOL_GPL(svc_xprt_enqueue); @@ -479,22 +502,9 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) { struct svc_xprt *xprt = NULL; - if (list_empty(&pool->sp_sockets)) - goto out; - - spin_lock_bh(&pool->sp_lock); - if (likely(!list_empty(&pool->sp_sockets))) { - xprt = list_first_entry(&pool->sp_sockets, - struct svc_xprt, xpt_ready); - list_del_init(&xprt->xpt_ready); + xprt = lwq_dequeue(&pool->sp_xprts, struct svc_xprt, xpt_ready); + if (xprt) svc_xprt_get(xprt); - - dprintk("svc: transport %p dequeued, inuse=%d\n", - xprt, kref_read(&xprt->xpt_ref)); - } - spin_unlock_bh(&pool->sp_lock); -out: - trace_svc_xprt_dequeue(xprt); return xprt; } @@ -510,28 +520,39 @@ out: */ void svc_reserve(struct svc_rqst *rqstp, int space) { + struct svc_xprt *xprt = rqstp->rq_xprt; + space += rqstp->rq_res.head[0].iov_len; - if (space < rqstp->rq_reserved) { - struct svc_xprt *xprt = rqstp->rq_xprt; + if (xprt && space < rqstp->rq_reserved) { atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); rqstp->rq_reserved = space; - + smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */ svc_xprt_enqueue(xprt); } } EXPORT_SYMBOL_GPL(svc_reserve); +static void free_deferred(struct svc_xprt *xprt, struct svc_deferred_req *dr) +{ + if (!dr) + return; + + xprt->xpt_ops->xpo_release_ctxt(xprt, dr->xprt_ctxt); + kfree(dr); +} + static void svc_xprt_release(struct svc_rqst *rqstp) { struct svc_xprt *xprt = rqstp->rq_xprt; - rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); + xprt->xpt_ops->xpo_release_ctxt(xprt, rqstp->rq_xprt_ctxt); + rqstp->rq_xprt_ctxt = NULL; - kfree(rqstp->rq_deferred); + free_deferred(xprt, rqstp->rq_deferred); rqstp->rq_deferred = NULL; - svc_free_res_pages(rqstp); + svc_rqst_release_pages(rqstp); rqstp->rq_res.page_len = 0; rqstp->rq_res.page_base = 0; @@ -552,7 +573,10 @@ static void svc_xprt_release(struct svc_rqst *rqstp) svc_xprt_put(xprt); } -/* +/** + * svc_wake_up - Wake up a service thread for non-transport work + * @serv: RPC service + * * Some svc_serv's will have occasional work to do, even when a xprt is not * waiting to be serviced. This function is there to "kick" a task in one of * those services so that it can wake up and do that work. Note that we only @@ -561,28 +585,10 @@ static void svc_xprt_release(struct svc_rqst *rqstp) */ void svc_wake_up(struct svc_serv *serv) { - struct svc_rqst *rqstp; - struct svc_pool *pool; - - pool = &serv->sv_pools[0]; - - rcu_read_lock(); - list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { - /* skip any that aren't queued */ - if (test_bit(RQ_BUSY, &rqstp->rq_flags)) - continue; - rcu_read_unlock(); - dprintk("svc: daemon %p woken up.\n", rqstp); - wake_up_process(rqstp->rq_task); - trace_svc_wake_up(rqstp->rq_task->pid); - return; - } - rcu_read_unlock(); + struct svc_pool *pool = &serv->sv_pools[0]; - /* No free entries available */ set_bit(SP_TASK_PENDING, &pool->sp_flags); - smp_wmb(); - trace_svc_wake_up(0); + svc_pool_wake_idle_thread(pool); } EXPORT_SYMBOL_GPL(svc_wake_up); @@ -601,7 +607,8 @@ int svc_port_is_privileged(struct sockaddr *sin) } /* - * Make sure that we don't have too many active connections. If we have, + * Make sure that we don't have too many connections that have not yet + * demonstrated that they have access to the NFS server. If we have, * something must be dropped. It's not clear what will happen if we allow * "too many" connections, but when dealing with network-facing software, * we have to code defensively. Here we do that by imposing hard limits. @@ -613,34 +620,26 @@ int svc_port_is_privileged(struct sockaddr *sin) * The only somewhat efficient mechanism would be if drop old * connections from the same IP first. But right now we don't even * record the client IP in svc_sock. - * - * single-threaded services that expect a lot of clients will probably - * need to set sv_maxconn to override the default value which is based - * on the number of threads */ static void svc_check_conn_limits(struct svc_serv *serv) { - unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : - (serv->sv_nrthreads+3) * 20; - - if (serv->sv_tmpcnt > limit) { - struct svc_xprt *xprt = NULL; + if (serv->sv_tmpcnt > XPT_MAX_TMP_CONN) { + struct svc_xprt *xprt = NULL, *xprti; spin_lock_bh(&serv->sv_lock); if (!list_empty(&serv->sv_tempsocks)) { - /* Try to help the admin */ - net_notice_ratelimited("%s: too many open connections, consider increasing the %s\n", - serv->sv_name, serv->sv_maxconn ? - "max number of connections" : - "number of threads"); /* * Always select the oldest connection. It's not fair, - * but so is life + * but nor is life. */ - xprt = list_entry(serv->sv_tempsocks.prev, - struct svc_xprt, - xpt_list); - set_bit(XPT_CLOSE, &xprt->xpt_flags); - svc_xprt_get(xprt); + list_for_each_entry_reverse(xprti, &serv->sv_tempsocks, + xpt_list) { + if (!test_bit(XPT_PEER_VALID, &xprti->xpt_flags)) { + xprt = xprti; + set_bit(XPT_CLOSE, &xprt->xpt_flags); + svc_xprt_get(xprt); + break; + } + } } spin_unlock_bh(&serv->sv_lock); @@ -651,39 +650,30 @@ static void svc_check_conn_limits(struct svc_serv *serv) } } -static int svc_alloc_arg(struct svc_rqst *rqstp) +static bool svc_alloc_arg(struct svc_rqst *rqstp) { - struct svc_serv *serv = rqstp->rq_server; - struct xdr_buf *arg; - int pages; - int i; + struct xdr_buf *arg = &rqstp->rq_arg; + unsigned long pages, filled, ret; + + pages = rqstp->rq_maxpages; + for (filled = 0; filled < pages; filled = ret) { + ret = alloc_pages_bulk(GFP_KERNEL, pages, rqstp->rq_pages); + if (ret > filled) + /* Made progress, don't sleep yet */ + continue; - /* now allocate needed pages. If we get a failure, sleep briefly */ - pages = (serv->sv_max_mesg + 2 * PAGE_SIZE) >> PAGE_SHIFT; - if (pages > RPCSVC_MAXPAGES) { - pr_warn_once("svc: warning: pages=%u > RPCSVC_MAXPAGES=%lu\n", - pages, RPCSVC_MAXPAGES); - /* use as many pages as possible */ - pages = RPCSVC_MAXPAGES; - } - for (i = 0; i < pages ; i++) - while (rqstp->rq_pages[i] == NULL) { - struct page *p = alloc_page(GFP_KERNEL); - if (!p) { - set_current_state(TASK_INTERRUPTIBLE); - if (signalled() || kthread_should_stop()) { - set_current_state(TASK_RUNNING); - return -EINTR; - } - schedule_timeout(msecs_to_jiffies(500)); - } - rqstp->rq_pages[i] = p; + set_current_state(TASK_IDLE); + if (svc_thread_should_stop(rqstp)) { + set_current_state(TASK_RUNNING); + return false; } - rqstp->rq_page_end = &rqstp->rq_pages[i]; - rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ + trace_svc_alloc_arg_err(pages, ret); + memalloc_retry_wait(GFP_KERNEL); + } + rqstp->rq_page_end = &rqstp->rq_pages[pages]; + rqstp->rq_pages[pages] = NULL; /* this might be seen in nfsd_splice_actor() */ /* Make arg->head point to first page and arg->pages point to rest */ - arg = &rqstp->rq_arg; arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); arg->head[0].iov_len = PAGE_SIZE; arg->pages = rqstp->rq_pages + 1; @@ -692,89 +682,66 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) arg->page_len = (pages-2)*PAGE_SIZE; arg->len = (pages-1)*PAGE_SIZE; arg->tail[0].iov_len = 0; - return 0; + + rqstp->rq_xid = xdr_zero; + return true; } static bool -rqst_should_sleep(struct svc_rqst *rqstp) +svc_thread_should_sleep(struct svc_rqst *rqstp) { struct svc_pool *pool = rqstp->rq_pool; /* did someone call svc_wake_up? */ - if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) + if (test_bit(SP_TASK_PENDING, &pool->sp_flags)) return false; /* was a socket queued? */ - if (!list_empty(&pool->sp_sockets)) + if (!lwq_empty(&pool->sp_xprts)) return false; /* are we shutting down? */ - if (signalled() || kthread_should_stop()) + if (svc_thread_should_stop(rqstp)) return false; - /* are we freezing? */ - if (freezing(current)) - return false; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (svc_is_backchannel(rqstp)) { + if (!lwq_empty(&rqstp->rq_server->sv_cb_list)) + return false; + } +#endif return true; } -static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) +static void svc_thread_wait_for_work(struct svc_rqst *rqstp) { - struct svc_xprt *xprt; - struct svc_pool *pool = rqstp->rq_pool; - long time_left = 0; - - /* rq_xprt should be clear on entry */ - WARN_ON_ONCE(rqstp->rq_xprt); - - /* Normally we will wait up to 5 seconds for any required - * cache information to be provided. - */ - rqstp->rq_chandle.thread_wait = 5*HZ; - - xprt = svc_xprt_dequeue(pool); - if (xprt) { - rqstp->rq_xprt = xprt; - - /* As there is a shortage of threads and this request - * had to be queued, don't allow the thread to wait so - * long for cache updates. - */ - rqstp->rq_chandle.thread_wait = 1*HZ; - clear_bit(SP_TASK_PENDING, &pool->sp_flags); - return xprt; - } - - /* - * We have to be able to interrupt this wait - * to bring down the daemons ... - */ - set_current_state(TASK_INTERRUPTIBLE); - clear_bit(RQ_BUSY, &rqstp->rq_flags); - smp_mb(); - - if (likely(rqst_should_sleep(rqstp))) - time_left = schedule_timeout(timeout); - else + struct svc_pool *pool = rqstp->rq_pool; + + if (svc_thread_should_sleep(rqstp)) { + set_current_state(TASK_IDLE | TASK_FREEZABLE); + llist_add(&rqstp->rq_idle, &pool->sp_idle_threads); + if (likely(svc_thread_should_sleep(rqstp))) + schedule(); + + while (!llist_del_first_this(&pool->sp_idle_threads, + &rqstp->rq_idle)) { + /* Work just became available. This thread can only + * handle it after removing rqstp from the idle + * list. If that attempt failed, some other thread + * must have queued itself after finding no + * work to do, so that thread has taken responsibly + * for this new work. This thread can safely sleep + * until woken again. + */ + schedule(); + set_current_state(TASK_IDLE | TASK_FREEZABLE); + } __set_current_state(TASK_RUNNING); - + } else { + cond_resched(); + } try_to_freeze(); - - spin_lock_bh(&rqstp->rq_lock); - set_bit(RQ_BUSY, &rqstp->rq_flags); - spin_unlock_bh(&rqstp->rq_lock); - - xprt = rqstp->rq_xprt; - if (xprt != NULL) - return xprt; - - if (!time_left) - atomic_long_inc(&pool->sp_stats.threads_timedout); - - if (signalled() || kthread_should_stop()) - return ERR_PTR(-EINTR); - return ERR_PTR(-EAGAIN); } static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) @@ -785,8 +752,7 @@ static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt serv->sv_tmpcnt++; if (serv->sv_temptimer.function == NULL) { /* setup timer to age temp transports */ - setup_timer(&serv->sv_temptimer, svc_age_temp_xprts, - (unsigned long)serv); + serv->sv_temptimer.function = svc_age_temp_xprts; mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); } @@ -794,13 +760,12 @@ static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt svc_xprt_received(newxpt); } -static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) +static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) { struct svc_serv *serv = rqstp->rq_server; int len = 0; if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { - dprintk("svc_recv: found XPT_CLOSE\n"); if (test_and_clear_bit(XPT_KILL_TEMP, &xprt->xpt_flags)) xprt->xpt_ops->xpo_kill_temp_xprt(xprt); svc_delete_xprt(xprt); @@ -816,153 +781,148 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) __module_get(xprt->xpt_class->xcl_owner); svc_check_conn_limits(xprt->xpt_server); newxpt = xprt->xpt_ops->xpo_accept(xprt); - if (newxpt) + if (newxpt) { + newxpt->xpt_cred = get_cred(xprt->xpt_cred); svc_add_new_temp_xprt(serv, newxpt); - else + trace_svc_xprt_accept(newxpt, serv->sv_name); + } else { module_put(xprt->xpt_class->xcl_owner); + } + svc_xprt_received(xprt); + } else if (test_bit(XPT_HANDSHAKE, &xprt->xpt_flags)) { + xprt->xpt_ops->xpo_handshake(xprt); + svc_xprt_received(xprt); } else if (svc_xprt_reserve_slot(rqstp, xprt)) { /* XPT_DATA|XPT_DEFERRED case: */ - dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", - rqstp, rqstp->rq_pool->sp_id, xprt, - kref_read(&xprt->xpt_ref)); rqstp->rq_deferred = svc_deferred_dequeue(xprt); if (rqstp->rq_deferred) len = svc_deferred_recv(rqstp); else len = xprt->xpt_ops->xpo_recvfrom(rqstp); - dprintk("svc: got len=%d\n", len); rqstp->rq_reserved = serv->sv_max_mesg; atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); - } - /* clear XPT_BUSY: */ - svc_xprt_received(xprt); + if (len <= 0) + goto out; + + trace_svc_xdr_recvfrom(&rqstp->rq_arg); + + clear_bit(XPT_OLD, &xprt->xpt_flags); + + rqstp->rq_chandle.defer = svc_defer; + + if (serv->sv_stats) + serv->sv_stats->netcnt++; + percpu_counter_inc(&rqstp->rq_pool->sp_messages_arrived); + rqstp->rq_stime = ktime_get(); + svc_process(rqstp); + } else + svc_xprt_received(xprt); + out: - trace_svc_handle_xprt(xprt, len); - return len; + rqstp->rq_res.len = 0; + svc_xprt_release(rqstp); } -/* - * Receive the next request on any transport. This code is carefully - * organised not to touch any cachelines in the shared svc_serv - * structure, only cachelines in the local svc_pool. - */ -int svc_recv(struct svc_rqst *rqstp, long timeout) +static void svc_thread_wake_next(struct svc_rqst *rqstp) { - struct svc_xprt *xprt = NULL; - struct svc_serv *serv = rqstp->rq_server; - int len, err; + if (!svc_thread_should_sleep(rqstp)) + /* More work pending after I dequeued some, + * wake another worker + */ + svc_pool_wake_idle_thread(rqstp->rq_pool); +} - dprintk("svc: server %p waiting for data (to = %ld)\n", - rqstp, timeout); +/** + * svc_recv - Receive and process the next request on any transport + * @rqstp: an idle RPC service thread + * + * This code is carefully organised not to touch any cachelines in + * the shared svc_serv structure, only cachelines in the local + * svc_pool. + */ +void svc_recv(struct svc_rqst *rqstp) +{ + struct svc_pool *pool = rqstp->rq_pool; - if (rqstp->rq_xprt) - printk(KERN_ERR - "svc_recv: service %p, transport not NULL!\n", - rqstp); + if (!svc_alloc_arg(rqstp)) + return; - err = svc_alloc_arg(rqstp); - if (err) - goto out; + svc_thread_wait_for_work(rqstp); - try_to_freeze(); - cond_resched(); - err = -EINTR; - if (signalled() || kthread_should_stop()) - goto out; + clear_bit(SP_TASK_PENDING, &pool->sp_flags); - xprt = svc_get_next_xprt(rqstp, timeout); - if (IS_ERR(xprt)) { - err = PTR_ERR(xprt); - goto out; + if (svc_thread_should_stop(rqstp)) { + svc_thread_wake_next(rqstp); + return; } - len = svc_handle_xprt(rqstp, xprt); + rqstp->rq_xprt = svc_xprt_dequeue(pool); + if (rqstp->rq_xprt) { + struct svc_xprt *xprt = rqstp->rq_xprt; - /* No data, incomplete (TCP) read, or accept() */ - err = -EAGAIN; - if (len <= 0) - goto out_release; + svc_thread_wake_next(rqstp); + /* Normally we will wait up to 5 seconds for any required + * cache information to be provided. When there are no + * idle threads, we reduce the wait time. + */ + if (pool->sp_idle_threads.first) + rqstp->rq_chandle.thread_wait = 5 * HZ; + else + rqstp->rq_chandle.thread_wait = 1 * HZ; - clear_bit(XPT_OLD, &xprt->xpt_flags); + trace_svc_xprt_dequeue(rqstp); + svc_handle_xprt(rqstp, xprt); + } - if (xprt->xpt_ops->xpo_secure_port(rqstp)) - set_bit(RQ_SECURE, &rqstp->rq_flags); - else - clear_bit(RQ_SECURE, &rqstp->rq_flags); - rqstp->rq_chandle.defer = svc_defer; - rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (svc_is_backchannel(rqstp)) { + struct svc_serv *serv = rqstp->rq_server; + struct rpc_rqst *req; - if (serv->sv_stats) - serv->sv_stats->netcnt++; - trace_svc_recv(rqstp, len); - return len; -out_release: - rqstp->rq_res.len = 0; - svc_xprt_release(rqstp); -out: - trace_svc_recv(rqstp, err); - return err; + req = lwq_dequeue(&serv->sv_cb_list, + struct rpc_rqst, rq_bc_list); + if (req) { + svc_thread_wake_next(rqstp); + svc_process_bc(req, rqstp); + } + } +#endif } EXPORT_SYMBOL_GPL(svc_recv); -/* - * Drop request - */ -void svc_drop(struct svc_rqst *rqstp) -{ - trace_svc_drop(rqstp); - dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); - svc_xprt_release(rqstp); -} -EXPORT_SYMBOL_GPL(svc_drop); - -/* - * Return reply to client. +/** + * svc_send - Return reply to client + * @rqstp: RPC transaction context + * */ -int svc_send(struct svc_rqst *rqstp) +void svc_send(struct svc_rqst *rqstp) { struct svc_xprt *xprt; - int len = -EFAULT; struct xdr_buf *xb; + int status; xprt = rqstp->rq_xprt; - if (!xprt) - goto out; - - /* release the receive skb before sending the reply */ - rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); /* calculate over-all length */ xb = &rqstp->rq_res; xb->len = xb->head[0].iov_len + xb->page_len + xb->tail[0].iov_len; + trace_svc_xdr_sendto(rqstp->rq_xid, xb); + trace_svc_stats_latency(rqstp); - /* Grab mutex to serialize outgoing data. */ - mutex_lock(&xprt->xpt_mutex); - if (test_bit(XPT_DEAD, &xprt->xpt_flags) - || test_bit(XPT_CLOSE, &xprt->xpt_flags)) - len = -ENOTCONN; - else - len = xprt->xpt_ops->xpo_sendto(rqstp); - mutex_unlock(&xprt->xpt_mutex); - rpc_wake_up(&xprt->xpt_bc_pending); - svc_xprt_release(rqstp); + status = xprt->xpt_ops->xpo_sendto(rqstp); - if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) - len = 0; -out: - trace_svc_send(rqstp, len); - return len; + trace_svc_send(rqstp, status); } /* * Timer function to close old temporary transports, using * a mark-and-sweep algorithm. */ -static void svc_age_temp_xprts(unsigned long closure) +static void svc_age_temp_xprts(struct timer_list *t) { - struct svc_serv *serv = (struct svc_serv *)closure; + struct svc_serv *serv = timer_container_of(serv, t, sv_temptimer); struct svc_xprt *xprt; struct list_head *le, *next; @@ -1040,7 +1000,7 @@ static void call_xpt_users(struct svc_xprt *xprt) spin_lock(&xprt->xpt_lock); while (!list_empty(&xprt->xpt_users)) { u = list_first_entry(&xprt->xpt_users, struct svc_xpt_user, list); - list_del(&u->list); + list_del_init(&u->list); u->callback(u); } spin_unlock(&xprt->xpt_lock); @@ -1054,29 +1014,49 @@ static void svc_delete_xprt(struct svc_xprt *xprt) struct svc_serv *serv = xprt->xpt_server; struct svc_deferred_req *dr; - /* Only do this once */ + /* unregister with rpcbind for when transport type is TCP or UDP. + */ + if (test_bit(XPT_RPCB_UNREG, &xprt->xpt_flags)) { + struct svc_sock *svsk = container_of(xprt, struct svc_sock, + sk_xprt); + struct socket *sock = svsk->sk_sock; + + if (svc_register(serv, xprt->xpt_net, sock->sk->sk_family, + sock->sk->sk_protocol, 0) < 0) + pr_warn("failed to unregister %s with rpcbind\n", + xprt->xpt_class->xcl_name); + } + if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) - BUG(); + return; - dprintk("svc: svc_delete_xprt(%p)\n", xprt); + trace_svc_xprt_detach(xprt); xprt->xpt_ops->xpo_detach(xprt); + if (xprt->xpt_bc_xprt) + xprt->xpt_bc_xprt->ops->close(xprt->xpt_bc_xprt); spin_lock_bh(&serv->sv_lock); list_del_init(&xprt->xpt_list); - WARN_ON_ONCE(!list_empty(&xprt->xpt_ready)); - if (test_bit(XPT_TEMP, &xprt->xpt_flags)) + if (test_bit(XPT_TEMP, &xprt->xpt_flags) && + !test_bit(XPT_PEER_VALID, &xprt->xpt_flags)) serv->sv_tmpcnt--; spin_unlock_bh(&serv->sv_lock); while ((dr = svc_deferred_dequeue(xprt)) != NULL) - kfree(dr); + free_deferred(xprt, dr); call_xpt_users(xprt); svc_xprt_put(xprt); } -void svc_close_xprt(struct svc_xprt *xprt) +/** + * svc_xprt_close - Close a client connection + * @xprt: transport to disconnect + * + */ +void svc_xprt_close(struct svc_xprt *xprt) { + trace_svc_xprt_close(xprt); set_bit(XPT_CLOSE, &xprt->xpt_flags); if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) /* someone else will have to effect the close */ @@ -1089,14 +1069,14 @@ void svc_close_xprt(struct svc_xprt *xprt) */ svc_delete_xprt(xprt); } -EXPORT_SYMBOL_GPL(svc_close_xprt); +EXPORT_SYMBOL_GPL(svc_xprt_close); static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, struct net *net) { struct svc_xprt *xprt; int ret = 0; - spin_lock(&serv->sv_lock); + spin_lock_bh(&serv->sv_lock); list_for_each_entry(xprt, xprt_list, xpt_list) { if (xprt->xpt_net != net) continue; @@ -1104,44 +1084,39 @@ static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, st set_bit(XPT_CLOSE, &xprt->xpt_flags); svc_xprt_enqueue(xprt); } - spin_unlock(&serv->sv_lock); + spin_unlock_bh(&serv->sv_lock); return ret; } -static struct svc_xprt *svc_dequeue_net(struct svc_serv *serv, struct net *net) +static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net) { - struct svc_pool *pool; struct svc_xprt *xprt; - struct svc_xprt *tmp; int i; for (i = 0; i < serv->sv_nrpools; i++) { - pool = &serv->sv_pools[i]; - - spin_lock_bh(&pool->sp_lock); - list_for_each_entry_safe(xprt, tmp, &pool->sp_sockets, xpt_ready) { - if (xprt->xpt_net != net) - continue; - list_del_init(&xprt->xpt_ready); - spin_unlock_bh(&pool->sp_lock); - return xprt; + struct svc_pool *pool = &serv->sv_pools[i]; + struct llist_node *q, **t1, *t2; + + q = lwq_dequeue_all(&pool->sp_xprts); + lwq_for_each_safe(xprt, t1, t2, &q, xpt_ready) { + if (xprt->xpt_net == net) { + set_bit(XPT_CLOSE, &xprt->xpt_flags); + svc_delete_xprt(xprt); + xprt = NULL; + } } - spin_unlock_bh(&pool->sp_lock); - } - return NULL; -} -static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net) -{ - struct svc_xprt *xprt; - - while ((xprt = svc_dequeue_net(serv, net))) { - set_bit(XPT_CLOSE, &xprt->xpt_flags); - svc_delete_xprt(xprt); + if (q) + lwq_enqueue_batch(q, &pool->sp_xprts); } } -/* +/** + * svc_xprt_destroy_all - Destroy transports associated with @serv + * @serv: RPC service to be shut down + * @net: target network namespace + * @unregister: true if it is OK to unregister the destroyed xprts + * * Server threads may still be running (especially in the case where the * service is still running in other network namespaces). * @@ -1153,7 +1128,8 @@ static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net) * threads, we may need to wait a little while and then check again to * see if they're done. */ -void svc_close_net(struct svc_serv *serv, struct net *net) +void svc_xprt_destroy_all(struct svc_serv *serv, struct net *net, + bool unregister) { int delay = 0; @@ -1163,7 +1139,11 @@ void svc_close_net(struct svc_serv *serv, struct net *net) svc_clean_up_xprts(serv, net); msleep(delay++); } + + if (unregister) + svc_rpcb_cleanup(serv, net); } +EXPORT_SYMBOL_GPL(svc_xprt_destroy_all); /* * Handle defer and revisit of requests @@ -1179,16 +1159,15 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many) set_bit(XPT_DEFERRED, &xprt->xpt_flags); if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { spin_unlock(&xprt->xpt_lock); - dprintk("revisit canceled\n"); + trace_svc_defer_drop(dr); + free_deferred(xprt, dr); svc_xprt_put(xprt); - trace_svc_drop_deferred(dr); - kfree(dr); return; } - dprintk("revisit queued\n"); dr->xprt = NULL; list_add(&dr->handle.recent, &xprt->xpt_deferred); spin_unlock(&xprt->xpt_lock); + trace_svc_defer_queue(dr); svc_xprt_enqueue(xprt); svc_xprt_put(xprt); } @@ -1227,44 +1206,50 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req) dr->addrlen = rqstp->rq_addrlen; dr->daddr = rqstp->rq_daddr; dr->argslen = rqstp->rq_arg.len >> 2; - dr->xprt_hlen = rqstp->rq_xprt_hlen; /* back up head to the start of the buffer and copy */ skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, dr->argslen << 2); } + dr->xprt_ctxt = rqstp->rq_xprt_ctxt; + rqstp->rq_xprt_ctxt = NULL; + trace_svc_defer(rqstp); svc_xprt_get(rqstp->rq_xprt); dr->xprt = rqstp->rq_xprt; set_bit(RQ_DROPME, &rqstp->rq_flags); dr->handle.revisit = svc_revisit; - trace_svc_defer(rqstp); return &dr->handle; } /* * recv data from a deferred request into an active one */ -static int svc_deferred_recv(struct svc_rqst *rqstp) +static noinline int svc_deferred_recv(struct svc_rqst *rqstp) { struct svc_deferred_req *dr = rqstp->rq_deferred; + trace_svc_defer_recv(dr); + /* setup iov_base past transport header */ - rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); + rqstp->rq_arg.head[0].iov_base = dr->args; /* The iov_len does not include the transport header bytes */ - rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; + rqstp->rq_arg.head[0].iov_len = dr->argslen << 2; rqstp->rq_arg.page_len = 0; /* The rq_arg.len includes the transport header bytes */ - rqstp->rq_arg.len = dr->argslen<<2; + rqstp->rq_arg.len = dr->argslen << 2; rqstp->rq_prot = dr->prot; memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); rqstp->rq_addrlen = dr->addrlen; /* Save off transport header len in case we get deferred again */ - rqstp->rq_xprt_hlen = dr->xprt_hlen; rqstp->rq_daddr = dr->daddr; rqstp->rq_respages = rqstp->rq_pages; - return (dr->argslen<<2) - dr->xprt_hlen; + rqstp->rq_xprt_ctxt = dr->xprt_ctxt; + + dr->xprt_ctxt = NULL; + svc_xprt_received(rqstp->rq_xprt); + return dr->argslen << 2; } @@ -1280,7 +1265,6 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) struct svc_deferred_req, handle.recent); list_del_init(&dr->handle.recent); - trace_svc_revisit_deferred(dr); } else clear_bit(XPT_DEFERRED, &xprt->xpt_flags); spin_unlock(&xprt->xpt_lock); @@ -1288,6 +1272,40 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) } /** + * svc_find_listener - find an RPC transport instance + * @serv: pointer to svc_serv to search + * @xcl_name: C string containing transport's class name + * @net: owner net pointer + * @sa: sockaddr containing address + * + * Return the transport instance pointer for the endpoint accepting + * connections/peer traffic from the specified transport class, + * and matching sockaddr. + */ +struct svc_xprt *svc_find_listener(struct svc_serv *serv, const char *xcl_name, + struct net *net, const struct sockaddr *sa) +{ + struct svc_xprt *xprt; + struct svc_xprt *found = NULL; + + spin_lock_bh(&serv->sv_lock); + list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { + if (xprt->xpt_net != net) + continue; + if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) + continue; + if (!rpc_cmp_addr_port(sa, (struct sockaddr *)&xprt->xpt_local)) + continue; + found = xprt; + svc_xprt_get(xprt); + break; + } + spin_unlock_bh(&serv->sv_lock); + return found; +} +EXPORT_SYMBOL_GPL(svc_find_listener); + +/** * svc_find_xprt - find an RPC transport instance * @serv: pointer to svc_serv to search * @xcl_name: C string containing transport's class name @@ -1390,29 +1408,36 @@ int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) } EXPORT_SYMBOL_GPL(svc_xprt_names); - /*----------------------------------------------------------------------------*/ static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) { unsigned int pidx = (unsigned int)*pos; - struct svc_serv *serv = m->private; + struct svc_info *si = m->private; dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); + mutex_lock(si->mutex); + if (!pidx) return SEQ_START_TOKEN; - return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); + if (!si->serv) + return NULL; + return pidx > si->serv->sv_nrpools ? NULL + : &si->serv->sv_pools[pidx - 1]; } static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) { struct svc_pool *pool = p; - struct svc_serv *serv = m->private; + struct svc_info *si = m->private; + struct svc_serv *serv = si->serv; dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); - if (p == SEQ_START_TOKEN) { + if (!serv) { + pool = NULL; + } else if (p == SEQ_START_TOKEN) { pool = &serv->sv_pools[0]; } else { unsigned int pidx = (pool - &serv->sv_pools[0]); @@ -1427,6 +1452,9 @@ static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) static void svc_pool_stats_stop(struct seq_file *m, void *p) { + struct svc_info *si = m->private; + + mutex_unlock(si->mutex); } static int svc_pool_stats_show(struct seq_file *m, void *p) @@ -1438,12 +1466,11 @@ static int svc_pool_stats_show(struct seq_file *m, void *p) return 0; } - seq_printf(m, "%u %lu %lu %lu %lu\n", - pool->sp_id, - (unsigned long)atomic_long_read(&pool->sp_stats.packets), - pool->sp_stats.sockets_queued, - (unsigned long)atomic_long_read(&pool->sp_stats.threads_woken), - (unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout)); + seq_printf(m, "%u %llu %llu %llu 0\n", + pool->sp_id, + percpu_counter_sum_positive(&pool->sp_messages_arrived), + percpu_counter_sum_positive(&pool->sp_sockets_queued), + percpu_counter_sum_positive(&pool->sp_threads_woken)); return 0; } @@ -1455,14 +1482,18 @@ static const struct seq_operations svc_pool_stats_seq_ops = { .show = svc_pool_stats_show, }; -int svc_pool_stats_open(struct svc_serv *serv, struct file *file) +int svc_pool_stats_open(struct svc_info *info, struct file *file) { + struct seq_file *seq; int err; err = seq_open(file, &svc_pool_stats_seq_ops); - if (!err) - ((struct seq_file *) file->private_data)->private = serv; - return err; + if (err) + return err; + seq = file->private_data; + seq->private = info; + + return 0; } EXPORT_SYMBOL(svc_pool_stats_open); |
