summaryrefslogtreecommitdiff
path: root/net/sunrpc/svc_xprt.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/svc_xprt.c')
-rw-r--r--net/sunrpc/svc_xprt.c913
1 files changed, 498 insertions, 415 deletions
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 4eb8fbf2508d..6973184ff667 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-2.0-only
/*
* linux/net/sunrpc/svc_xprt.c
*
@@ -5,9 +6,9 @@
*/
#include <linux/sched.h>
+#include <linux/sched/mm.h>
#include <linux/errno.h>
#include <linux/freezer.h>
-#include <linux/kthread.h>
#include <linux/slab.h>
#include <net/sock.h>
#include <linux/sunrpc/addr.h>
@@ -15,6 +16,7 @@
#include <linux/sunrpc/svc_xprt.h>
#include <linux/sunrpc/svcsock.h>
#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/bc_xprt.h>
#include <linux/module.h>
#include <linux/netdevice.h>
#include <trace/events/sunrpc.h>
@@ -34,7 +36,7 @@ static void svc_delete_xprt(struct svc_xprt *xprt);
/* apparently the "standard" is that clients close
* idle connections after 5 minutes, servers after
* 6 minutes
- * http://www.connectathon.org/talks96/nfstcp.pdf
+ * http://nfsv4bat.org/Documents/ConnectAThon/1996/nfstcp.pdf
*/
static int svc_conn_age_period = 6*60;
@@ -44,7 +46,6 @@ static LIST_HEAD(svc_xprt_class_list);
/* SMP locking strategy:
*
- * svc_pool->sp_lock protects most of the fields of that pool.
* svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
* when both need to be taken (rare), svc_serv->sv_lock is first.
* The "service mutex" protects svc_serv->sv_nrthread.
@@ -72,13 +73,18 @@ static LIST_HEAD(svc_xprt_class_list);
* that no other thread will be using the transport or will
* try to set XPT_DEAD.
*/
+
+/**
+ * svc_reg_xprt_class - Register a server-side RPC transport class
+ * @xcl: New transport class to be registered
+ *
+ * Returns zero on success; otherwise a negative errno is returned.
+ */
int svc_reg_xprt_class(struct svc_xprt_class *xcl)
{
struct svc_xprt_class *cl;
int res = -EEXIST;
- dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name);
-
INIT_LIST_HEAD(&xcl->xcl_list);
spin_lock(&svc_xprt_class_lock);
/* Make sure there isn't already a class with the same name */
@@ -94,17 +100,30 @@ out:
}
EXPORT_SYMBOL_GPL(svc_reg_xprt_class);
+/**
+ * svc_unreg_xprt_class - Unregister a server-side RPC transport class
+ * @xcl: Transport class to be unregistered
+ *
+ */
void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
{
- dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name);
spin_lock(&svc_xprt_class_lock);
list_del_init(&xcl->xcl_list);
spin_unlock(&svc_xprt_class_lock);
}
EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
-/*
- * Format the transport list for printing
+/**
+ * svc_print_xprts - Format the transport list for printing
+ * @buf: target buffer for formatted address
+ * @maxlen: length of target buffer
+ *
+ * Fills in @buf with a string containing a list of transport names, each name
+ * terminated with '\n'. If the buffer is too small, some entries may be
+ * missing, but it is guaranteed that all lines in the output buffer are
+ * complete.
+ *
+ * Returns positive length of the filled-in string.
*/
int svc_print_xprts(char *buf, int maxlen)
{
@@ -117,9 +136,9 @@ int svc_print_xprts(char *buf, int maxlen)
list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
int slen;
- sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
- slen = strlen(tmpstr);
- if (len + slen > maxlen)
+ slen = snprintf(tmpstr, sizeof(tmpstr), "%s %d\n",
+ xcl->xcl_name, xcl->xcl_max_payload);
+ if (slen >= sizeof(tmpstr) || len + slen >= maxlen)
break;
len += slen;
strcat(buf, tmpstr);
@@ -129,6 +148,21 @@ int svc_print_xprts(char *buf, int maxlen)
return len;
}
+/**
+ * svc_xprt_deferred_close - Close a transport
+ * @xprt: transport instance
+ *
+ * Used in contexts that need to defer the work of shutting down
+ * the transport to an nfsd thread.
+ */
+void svc_xprt_deferred_close(struct svc_xprt *xprt)
+{
+ trace_svc_xprt_close(xprt);
+ if (!test_and_set_bit(XPT_CLOSE, &xprt->xpt_flags))
+ svc_xprt_enqueue(xprt);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_deferred_close);
+
static void svc_xprt_free(struct kref *kref)
{
struct svc_xprt *xprt =
@@ -136,12 +170,14 @@ static void svc_xprt_free(struct kref *kref)
struct module *owner = xprt->xpt_class->xcl_owner;
if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags))
svcauth_unix_info_release(xprt);
- put_net(xprt->xpt_net);
+ put_cred(xprt->xpt_cred);
+ put_net_track(xprt->xpt_net, &xprt->ns_tracker);
/* See comment on corresponding get in xs_setup_bc_tcp(): */
if (xprt->xpt_bc_xprt)
xprt_put(xprt->xpt_bc_xprt);
if (xprt->xpt_bc_xps)
xprt_switch_put(xprt->xpt_bc_xps);
+ trace_svc_xprt_free(xprt);
xprt->xpt_ops->xpo_free(xprt);
module_put(owner);
}
@@ -165,66 +201,27 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
kref_init(&xprt->xpt_ref);
xprt->xpt_server = serv;
INIT_LIST_HEAD(&xprt->xpt_list);
- INIT_LIST_HEAD(&xprt->xpt_ready);
INIT_LIST_HEAD(&xprt->xpt_deferred);
INIT_LIST_HEAD(&xprt->xpt_users);
mutex_init(&xprt->xpt_mutex);
spin_lock_init(&xprt->xpt_lock);
set_bit(XPT_BUSY, &xprt->xpt_flags);
- xprt->xpt_net = get_net(net);
+ xprt->xpt_net = get_net_track(net, &xprt->ns_tracker, GFP_ATOMIC);
strcpy(xprt->xpt_remotebuf, "uninitialized");
}
EXPORT_SYMBOL_GPL(svc_xprt_init);
-static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
- struct svc_serv *serv,
- struct net *net,
- const int family,
- const unsigned short port,
- int flags)
-{
- struct sockaddr_in sin = {
- .sin_family = AF_INET,
- .sin_addr.s_addr = htonl(INADDR_ANY),
- .sin_port = htons(port),
- };
-#if IS_ENABLED(CONFIG_IPV6)
- struct sockaddr_in6 sin6 = {
- .sin6_family = AF_INET6,
- .sin6_addr = IN6ADDR_ANY_INIT,
- .sin6_port = htons(port),
- };
-#endif
- struct sockaddr *sap;
- size_t len;
-
- switch (family) {
- case PF_INET:
- sap = (struct sockaddr *)&sin;
- len = sizeof(sin);
- break;
-#if IS_ENABLED(CONFIG_IPV6)
- case PF_INET6:
- sap = (struct sockaddr *)&sin6;
- len = sizeof(sin6);
- break;
-#endif
- default:
- return ERR_PTR(-EAFNOSUPPORT);
- }
-
- return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags);
-}
-
-/*
- * svc_xprt_received conditionally queues the transport for processing
- * by another thread. The caller must hold the XPT_BUSY bit and must
+/**
+ * svc_xprt_received - start next receiver thread
+ * @xprt: controlling transport
+ *
+ * The caller must hold the XPT_BUSY bit and must
* not thereafter touch transport data.
*
* Note: XPT_DATA only gets cleared when a read-attempt finds no (or
* insufficient) data.
*/
-static void svc_xprt_received(struct svc_xprt *xprt)
+void svc_xprt_received(struct svc_xprt *xprt)
{
if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) {
WARN_ONCE(1, "xprt=0x%p already busy!", xprt);
@@ -232,14 +229,15 @@ static void svc_xprt_received(struct svc_xprt *xprt)
}
/* As soon as we clear busy, the xprt could be closed and
- * 'put', so we need a reference to call svc_enqueue_xprt with:
+ * 'put', so we need a reference to call svc_xprt_enqueue with:
*/
svc_xprt_get(xprt);
smp_mb__before_atomic();
clear_bit(XPT_BUSY, &xprt->xpt_flags);
- xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt);
+ svc_xprt_enqueue(xprt);
svc_xprt_put(xprt);
}
+EXPORT_SYMBOL_GPL(svc_xprt_received);
void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new)
{
@@ -250,9 +248,9 @@ void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new)
svc_xprt_received(new);
}
-static int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
- struct net *net, const int family,
- const unsigned short port, int flags)
+static int _svc_xprt_create(struct svc_serv *serv, const char *xprt_name,
+ struct net *net, struct sockaddr *sap,
+ size_t len, int flags, const struct cred *cred)
{
struct svc_xprt_class *xcl;
@@ -268,11 +266,15 @@ static int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
goto err;
spin_unlock(&svc_xprt_class_lock);
- newxprt = __svc_xpo_create(xcl, serv, net, family, port, flags);
+ newxprt = xcl->xcl_ops->xpo_create(serv, net, sap, len, flags);
if (IS_ERR(newxprt)) {
+ trace_svc_xprt_create_err(serv->sv_programs->pg_name,
+ xcl->xcl_name, sap, len,
+ newxprt);
module_put(xcl->xcl_owner);
return PTR_ERR(newxprt);
}
+ newxprt->xpt_cred = get_cred(cred);
svc_add_new_perm_xprt(serv, newxprt);
newport = svc_xprt_local_port(newxprt);
return newport;
@@ -284,24 +286,95 @@ static int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
return -EPROTONOSUPPORT;
}
-int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
- struct net *net, const int family,
- const unsigned short port, int flags)
+/**
+ * svc_xprt_create_from_sa - Add a new listener to @serv from socket address
+ * @serv: target RPC service
+ * @xprt_name: transport class name
+ * @net: network namespace
+ * @sap: socket address pointer
+ * @flags: SVC_SOCK flags
+ * @cred: credential to bind to this transport
+ *
+ * Return local xprt port on success or %-EPROTONOSUPPORT on failure
+ */
+int svc_xprt_create_from_sa(struct svc_serv *serv, const char *xprt_name,
+ struct net *net, struct sockaddr *sap,
+ int flags, const struct cred *cred)
{
+ size_t len;
int err;
- dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
- err = _svc_create_xprt(serv, xprt_name, net, family, port, flags);
+ switch (sap->sa_family) {
+ case AF_INET:
+ len = sizeof(struct sockaddr_in);
+ break;
+#if IS_ENABLED(CONFIG_IPV6)
+ case AF_INET6:
+ len = sizeof(struct sockaddr_in6);
+ break;
+#endif
+ default:
+ return -EAFNOSUPPORT;
+ }
+
+ err = _svc_xprt_create(serv, xprt_name, net, sap, len, flags, cred);
if (err == -EPROTONOSUPPORT) {
request_module("svc%s", xprt_name);
- err = _svc_create_xprt(serv, xprt_name, net, family, port, flags);
+ err = _svc_xprt_create(serv, xprt_name, net, sap, len, flags,
+ cred);
}
- if (err < 0)
- dprintk("svc: transport %s not found, err %d\n",
- xprt_name, -err);
+
return err;
}
-EXPORT_SYMBOL_GPL(svc_create_xprt);
+EXPORT_SYMBOL_GPL(svc_xprt_create_from_sa);
+
+/**
+ * svc_xprt_create - Add a new listener to @serv
+ * @serv: target RPC service
+ * @xprt_name: transport class name
+ * @net: network namespace
+ * @family: network address family
+ * @port: listener port
+ * @flags: SVC_SOCK flags
+ * @cred: credential to bind to this transport
+ *
+ * Return local xprt port on success or %-EPROTONOSUPPORT on failure
+ */
+int svc_xprt_create(struct svc_serv *serv, const char *xprt_name,
+ struct net *net, const int family,
+ const unsigned short port, int flags,
+ const struct cred *cred)
+{
+ struct sockaddr_in sin = {
+ .sin_family = AF_INET,
+ .sin_addr.s_addr = htonl(INADDR_ANY),
+ .sin_port = htons(port),
+ };
+#if IS_ENABLED(CONFIG_IPV6)
+ struct sockaddr_in6 sin6 = {
+ .sin6_family = AF_INET6,
+ .sin6_addr = IN6ADDR_ANY_INIT,
+ .sin6_port = htons(port),
+ };
+#endif
+ struct sockaddr *sap;
+
+ switch (family) {
+ case PF_INET:
+ sap = (struct sockaddr *)&sin;
+ break;
+#if IS_ENABLED(CONFIG_IPV6)
+ case PF_INET6:
+ sap = (struct sockaddr *)&sin6;
+ break;
+#endif
+ default:
+ return -EAFNOSUPPORT;
+ }
+
+ return svc_xprt_create_from_sa(serv, xprt_name, net, sap, flags, cred);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_create);
/*
* Copy the local and remote xprt addresses to the rqstp structure
@@ -357,15 +430,32 @@ static void svc_xprt_release_slot(struct svc_rqst *rqstp)
struct svc_xprt *xprt = rqstp->rq_xprt;
if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
atomic_dec(&xprt->xpt_nr_rqsts);
+ smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */
svc_xprt_enqueue(xprt);
}
}
-static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
+static bool svc_xprt_ready(struct svc_xprt *xprt)
{
- if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
+ unsigned long xpt_flags;
+
+ /*
+ * If another cpu has recently updated xpt_flags,
+ * sk_sock->flags, xpt_reserved, or xpt_nr_rqsts, we need to
+ * know about it; otherwise it's possible that both that cpu and
+ * this one could call svc_xprt_enqueue() without either
+ * svc_xprt_enqueue() recognizing that the conditions below
+ * are satisfied, and we could stall indefinitely:
+ */
+ smp_rmb();
+ xpt_flags = READ_ONCE(xprt->xpt_flags);
+
+ trace_svc_xprt_enqueue(xprt, xpt_flags);
+ if (xpt_flags & BIT(XPT_BUSY))
+ return false;
+ if (xpt_flags & (BIT(XPT_CONN) | BIT(XPT_CLOSE) | BIT(XPT_HANDSHAKE)))
return true;
- if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
+ if (xpt_flags & (BIT(XPT_DATA) | BIT(XPT_DEFERRED))) {
if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
svc_xprt_slots_in_range(xprt))
return true;
@@ -375,13 +465,16 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
return false;
}
-void svc_xprt_do_enqueue(struct svc_xprt *xprt)
+/**
+ * svc_xprt_enqueue - Queue a transport on an idle nfsd thread
+ * @xprt: transport with data pending
+ *
+ */
+void svc_xprt_enqueue(struct svc_xprt *xprt)
{
struct svc_pool *pool;
- struct svc_rqst *rqstp = NULL;
- int cpu;
- if (!svc_xprt_has_something_to_do(xprt))
+ if (!svc_xprt_ready(xprt))
return;
/* Mark transport as busy. It will remain in this state until
@@ -392,45 +485,13 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
return;
- cpu = get_cpu();
- pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
-
- atomic_long_inc(&pool->sp_stats.packets);
+ pool = svc_pool_for_cpu(xprt->xpt_server);
- spin_lock_bh(&pool->sp_lock);
- list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
- pool->sp_stats.sockets_queued++;
- spin_unlock_bh(&pool->sp_lock);
+ percpu_counter_inc(&pool->sp_sockets_queued);
+ xprt->xpt_qtime = ktime_get();
+ lwq_enqueue(&xprt->xpt_ready, &pool->sp_xprts);
- /* find a thread for this xprt */
- rcu_read_lock();
- list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
- if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
- continue;
- atomic_long_inc(&pool->sp_stats.threads_woken);
- rqstp->rq_qtime = ktime_get();
- wake_up_process(rqstp->rq_task);
- goto out_unlock;
- }
- set_bit(SP_CONGESTED, &pool->sp_flags);
- rqstp = NULL;
-out_unlock:
- rcu_read_unlock();
- put_cpu();
- trace_svc_xprt_do_enqueue(xprt, rqstp);
-}
-EXPORT_SYMBOL_GPL(svc_xprt_do_enqueue);
-
-/*
- * Queue up a transport with data pending. If there are idle nfsd
- * processes, wake 'em up.
- *
- */
-void svc_xprt_enqueue(struct svc_xprt *xprt)
-{
- if (test_bit(XPT_BUSY, &xprt->xpt_flags))
- return;
- xprt->xpt_server->sv_ops->svo_enqueue_xprt(xprt);
+ svc_pool_wake_idle_thread(pool);
}
EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
@@ -441,18 +502,9 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
{
struct svc_xprt *xprt = NULL;
- if (list_empty(&pool->sp_sockets))
- goto out;
-
- spin_lock_bh(&pool->sp_lock);
- if (likely(!list_empty(&pool->sp_sockets))) {
- xprt = list_first_entry(&pool->sp_sockets,
- struct svc_xprt, xpt_ready);
- list_del_init(&xprt->xpt_ready);
+ xprt = lwq_dequeue(&pool->sp_xprts, struct svc_xprt, xpt_ready);
+ if (xprt)
svc_xprt_get(xprt);
- }
- spin_unlock_bh(&pool->sp_lock);
-out:
return xprt;
}
@@ -475,22 +527,32 @@ void svc_reserve(struct svc_rqst *rqstp, int space)
if (xprt && space < rqstp->rq_reserved) {
atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
rqstp->rq_reserved = space;
-
+ smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */
svc_xprt_enqueue(xprt);
}
}
EXPORT_SYMBOL_GPL(svc_reserve);
+static void free_deferred(struct svc_xprt *xprt, struct svc_deferred_req *dr)
+{
+ if (!dr)
+ return;
+
+ xprt->xpt_ops->xpo_release_ctxt(xprt, dr->xprt_ctxt);
+ kfree(dr);
+}
+
static void svc_xprt_release(struct svc_rqst *rqstp)
{
struct svc_xprt *xprt = rqstp->rq_xprt;
- xprt->xpt_ops->xpo_release_rqst(rqstp);
+ xprt->xpt_ops->xpo_release_ctxt(xprt, rqstp->rq_xprt_ctxt);
+ rqstp->rq_xprt_ctxt = NULL;
- kfree(rqstp->rq_deferred);
+ free_deferred(xprt, rqstp->rq_deferred);
rqstp->rq_deferred = NULL;
- svc_free_res_pages(rqstp);
+ svc_rqst_release_pages(rqstp);
rqstp->rq_res.page_len = 0;
rqstp->rq_res.page_base = 0;
@@ -511,7 +573,10 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
svc_xprt_put(xprt);
}
-/*
+/**
+ * svc_wake_up - Wake up a service thread for non-transport work
+ * @serv: RPC service
+ *
* Some svc_serv's will have occasional work to do, even when a xprt is not
* waiting to be serviced. This function is there to "kick" a task in one of
* those services so that it can wake up and do that work. Note that we only
@@ -520,27 +585,10 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
*/
void svc_wake_up(struct svc_serv *serv)
{
- struct svc_rqst *rqstp;
- struct svc_pool *pool;
+ struct svc_pool *pool = &serv->sv_pools[0];
- pool = &serv->sv_pools[0];
-
- rcu_read_lock();
- list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
- /* skip any that aren't queued */
- if (test_bit(RQ_BUSY, &rqstp->rq_flags))
- continue;
- rcu_read_unlock();
- wake_up_process(rqstp->rq_task);
- trace_svc_wake_up(rqstp->rq_task->pid);
- return;
- }
- rcu_read_unlock();
-
- /* No free entries available */
set_bit(SP_TASK_PENDING, &pool->sp_flags);
- smp_wmb();
- trace_svc_wake_up(0);
+ svc_pool_wake_idle_thread(pool);
}
EXPORT_SYMBOL_GPL(svc_wake_up);
@@ -559,7 +607,8 @@ int svc_port_is_privileged(struct sockaddr *sin)
}
/*
- * Make sure that we don't have too many active connections. If we have,
+ * Make sure that we don't have too many connections that have not yet
+ * demonstrated that they have access to the NFS server. If we have,
* something must be dropped. It's not clear what will happen if we allow
* "too many" connections, but when dealing with network-facing software,
* we have to code defensively. Here we do that by imposing hard limits.
@@ -571,34 +620,26 @@ int svc_port_is_privileged(struct sockaddr *sin)
* The only somewhat efficient mechanism would be if drop old
* connections from the same IP first. But right now we don't even
* record the client IP in svc_sock.
- *
- * single-threaded services that expect a lot of clients will probably
- * need to set sv_maxconn to override the default value which is based
- * on the number of threads
*/
static void svc_check_conn_limits(struct svc_serv *serv)
{
- unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn :
- (serv->sv_nrthreads+3) * 20;
-
- if (serv->sv_tmpcnt > limit) {
- struct svc_xprt *xprt = NULL;
+ if (serv->sv_tmpcnt > XPT_MAX_TMP_CONN) {
+ struct svc_xprt *xprt = NULL, *xprti;
spin_lock_bh(&serv->sv_lock);
if (!list_empty(&serv->sv_tempsocks)) {
- /* Try to help the admin */
- net_notice_ratelimited("%s: too many open connections, consider increasing the %s\n",
- serv->sv_name, serv->sv_maxconn ?
- "max number of connections" :
- "number of threads");
/*
* Always select the oldest connection. It's not fair,
- * but so is life
+ * but nor is life.
*/
- xprt = list_entry(serv->sv_tempsocks.prev,
- struct svc_xprt,
- xpt_list);
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
- svc_xprt_get(xprt);
+ list_for_each_entry_reverse(xprti, &serv->sv_tempsocks,
+ xpt_list) {
+ if (!test_bit(XPT_PEER_VALID, &xprti->xpt_flags)) {
+ xprt = xprti;
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_get(xprt);
+ break;
+ }
+ }
}
spin_unlock_bh(&serv->sv_lock);
@@ -609,39 +650,30 @@ static void svc_check_conn_limits(struct svc_serv *serv)
}
}
-static int svc_alloc_arg(struct svc_rqst *rqstp)
+static bool svc_alloc_arg(struct svc_rqst *rqstp)
{
- struct svc_serv *serv = rqstp->rq_server;
- struct xdr_buf *arg;
- int pages;
- int i;
+ struct xdr_buf *arg = &rqstp->rq_arg;
+ unsigned long pages, filled, ret;
+
+ pages = rqstp->rq_maxpages;
+ for (filled = 0; filled < pages; filled = ret) {
+ ret = alloc_pages_bulk(GFP_KERNEL, pages, rqstp->rq_pages);
+ if (ret > filled)
+ /* Made progress, don't sleep yet */
+ continue;
- /* now allocate needed pages. If we get a failure, sleep briefly */
- pages = (serv->sv_max_mesg + 2 * PAGE_SIZE) >> PAGE_SHIFT;
- if (pages > RPCSVC_MAXPAGES) {
- pr_warn_once("svc: warning: pages=%u > RPCSVC_MAXPAGES=%lu\n",
- pages, RPCSVC_MAXPAGES);
- /* use as many pages as possible */
- pages = RPCSVC_MAXPAGES;
- }
- for (i = 0; i < pages ; i++)
- while (rqstp->rq_pages[i] == NULL) {
- struct page *p = alloc_page(GFP_KERNEL);
- if (!p) {
- set_current_state(TASK_INTERRUPTIBLE);
- if (signalled() || kthread_should_stop()) {
- set_current_state(TASK_RUNNING);
- return -EINTR;
- }
- schedule_timeout(msecs_to_jiffies(500));
- }
- rqstp->rq_pages[i] = p;
+ set_current_state(TASK_IDLE);
+ if (svc_thread_should_stop(rqstp)) {
+ set_current_state(TASK_RUNNING);
+ return false;
}
- rqstp->rq_page_end = &rqstp->rq_pages[i];
- rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
+ trace_svc_alloc_arg_err(pages, ret);
+ memalloc_retry_wait(GFP_KERNEL);
+ }
+ rqstp->rq_page_end = &rqstp->rq_pages[pages];
+ rqstp->rq_pages[pages] = NULL; /* this might be seen in nfsd_splice_actor() */
/* Make arg->head point to first page and arg->pages point to rest */
- arg = &rqstp->rq_arg;
arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
arg->head[0].iov_len = PAGE_SIZE;
arg->pages = rqstp->rq_pages + 1;
@@ -650,84 +682,66 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
arg->page_len = (pages-2)*PAGE_SIZE;
arg->len = (pages-1)*PAGE_SIZE;
arg->tail[0].iov_len = 0;
- return 0;
+
+ rqstp->rq_xid = xdr_zero;
+ return true;
}
static bool
-rqst_should_sleep(struct svc_rqst *rqstp)
+svc_thread_should_sleep(struct svc_rqst *rqstp)
{
struct svc_pool *pool = rqstp->rq_pool;
/* did someone call svc_wake_up? */
- if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
+ if (test_bit(SP_TASK_PENDING, &pool->sp_flags))
return false;
/* was a socket queued? */
- if (!list_empty(&pool->sp_sockets))
+ if (!lwq_empty(&pool->sp_xprts))
return false;
/* are we shutting down? */
- if (signalled() || kthread_should_stop())
+ if (svc_thread_should_stop(rqstp))
return false;
- /* are we freezing? */
- if (freezing(current))
- return false;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (svc_is_backchannel(rqstp)) {
+ if (!lwq_empty(&rqstp->rq_server->sv_cb_list))
+ return false;
+ }
+#endif
return true;
}
-static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
+static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
{
- struct svc_pool *pool = rqstp->rq_pool;
- long time_left = 0;
-
- /* rq_xprt should be clear on entry */
- WARN_ON_ONCE(rqstp->rq_xprt);
-
- rqstp->rq_xprt = svc_xprt_dequeue(pool);
- if (rqstp->rq_xprt)
- goto out_found;
-
- /*
- * We have to be able to interrupt this wait
- * to bring down the daemons ...
- */
- set_current_state(TASK_INTERRUPTIBLE);
- smp_mb__before_atomic();
- clear_bit(SP_CONGESTED, &pool->sp_flags);
- clear_bit(RQ_BUSY, &rqstp->rq_flags);
- smp_mb__after_atomic();
-
- if (likely(rqst_should_sleep(rqstp)))
- time_left = schedule_timeout(timeout);
- else
+ struct svc_pool *pool = rqstp->rq_pool;
+
+ if (svc_thread_should_sleep(rqstp)) {
+ set_current_state(TASK_IDLE | TASK_FREEZABLE);
+ llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
+ if (likely(svc_thread_should_sleep(rqstp)))
+ schedule();
+
+ while (!llist_del_first_this(&pool->sp_idle_threads,
+ &rqstp->rq_idle)) {
+ /* Work just became available. This thread can only
+ * handle it after removing rqstp from the idle
+ * list. If that attempt failed, some other thread
+ * must have queued itself after finding no
+ * work to do, so that thread has taken responsibly
+ * for this new work. This thread can safely sleep
+ * until woken again.
+ */
+ schedule();
+ set_current_state(TASK_IDLE | TASK_FREEZABLE);
+ }
__set_current_state(TASK_RUNNING);
-
+ } else {
+ cond_resched();
+ }
try_to_freeze();
-
- set_bit(RQ_BUSY, &rqstp->rq_flags);
- smp_mb__after_atomic();
- rqstp->rq_xprt = svc_xprt_dequeue(pool);
- if (rqstp->rq_xprt)
- goto out_found;
-
- if (!time_left)
- atomic_long_inc(&pool->sp_stats.threads_timedout);
-
- if (signalled() || kthread_should_stop())
- return ERR_PTR(-EINTR);
- return ERR_PTR(-EAGAIN);
-out_found:
- /* Normally we will wait up to 5 seconds for any required
- * cache information to be provided.
- */
- if (!test_bit(SP_CONGESTED, &pool->sp_flags))
- rqstp->rq_chandle.thread_wait = 5*HZ;
- else
- rqstp->rq_chandle.thread_wait = 1*HZ;
- trace_svc_xprt_dequeue(rqstp);
- return rqstp->rq_xprt;
}
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
@@ -746,13 +760,12 @@ static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt
svc_xprt_received(newxpt);
}
-static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
{
struct svc_serv *serv = rqstp->rq_server;
int len = 0;
if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
- dprintk("svc_recv: found XPT_CLOSE\n");
if (test_and_clear_bit(XPT_KILL_TEMP, &xprt->xpt_flags))
xprt->xpt_ops->xpo_kill_temp_xprt(xprt);
svc_delete_xprt(xprt);
@@ -768,140 +781,139 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
__module_get(xprt->xpt_class->xcl_owner);
svc_check_conn_limits(xprt->xpt_server);
newxpt = xprt->xpt_ops->xpo_accept(xprt);
- if (newxpt)
+ if (newxpt) {
+ newxpt->xpt_cred = get_cred(xprt->xpt_cred);
svc_add_new_temp_xprt(serv, newxpt);
- else
+ trace_svc_xprt_accept(newxpt, serv->sv_name);
+ } else {
module_put(xprt->xpt_class->xcl_owner);
+ }
+ svc_xprt_received(xprt);
+ } else if (test_bit(XPT_HANDSHAKE, &xprt->xpt_flags)) {
+ xprt->xpt_ops->xpo_handshake(xprt);
+ svc_xprt_received(xprt);
} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
/* XPT_DATA|XPT_DEFERRED case: */
- dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
- rqstp, rqstp->rq_pool->sp_id, xprt,
- kref_read(&xprt->xpt_ref));
rqstp->rq_deferred = svc_deferred_dequeue(xprt);
if (rqstp->rq_deferred)
len = svc_deferred_recv(rqstp);
else
len = xprt->xpt_ops->xpo_recvfrom(rqstp);
- rqstp->rq_stime = ktime_get();
rqstp->rq_reserved = serv->sv_max_mesg;
atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
- }
- /* clear XPT_BUSY: */
- svc_xprt_received(xprt);
+ if (len <= 0)
+ goto out;
+
+ trace_svc_xdr_recvfrom(&rqstp->rq_arg);
+
+ clear_bit(XPT_OLD, &xprt->xpt_flags);
+
+ rqstp->rq_chandle.defer = svc_defer;
+
+ if (serv->sv_stats)
+ serv->sv_stats->netcnt++;
+ percpu_counter_inc(&rqstp->rq_pool->sp_messages_arrived);
+ rqstp->rq_stime = ktime_get();
+ svc_process(rqstp);
+ } else
+ svc_xprt_received(xprt);
+
out:
- trace_svc_handle_xprt(xprt, len);
- return len;
+ rqstp->rq_res.len = 0;
+ svc_xprt_release(rqstp);
}
-/*
- * Receive the next request on any transport. This code is carefully
- * organised not to touch any cachelines in the shared svc_serv
- * structure, only cachelines in the local svc_pool.
- */
-int svc_recv(struct svc_rqst *rqstp, long timeout)
+static void svc_thread_wake_next(struct svc_rqst *rqstp)
{
- struct svc_xprt *xprt = NULL;
- struct svc_serv *serv = rqstp->rq_server;
- int len, err;
+ if (!svc_thread_should_sleep(rqstp))
+ /* More work pending after I dequeued some,
+ * wake another worker
+ */
+ svc_pool_wake_idle_thread(rqstp->rq_pool);
+}
- dprintk("svc: server %p waiting for data (to = %ld)\n",
- rqstp, timeout);
+/**
+ * svc_recv - Receive and process the next request on any transport
+ * @rqstp: an idle RPC service thread
+ *
+ * This code is carefully organised not to touch any cachelines in
+ * the shared svc_serv structure, only cachelines in the local
+ * svc_pool.
+ */
+void svc_recv(struct svc_rqst *rqstp)
+{
+ struct svc_pool *pool = rqstp->rq_pool;
- if (rqstp->rq_xprt)
- printk(KERN_ERR
- "svc_recv: service %p, transport not NULL!\n",
- rqstp);
+ if (!svc_alloc_arg(rqstp))
+ return;
- err = svc_alloc_arg(rqstp);
- if (err)
- goto out;
+ svc_thread_wait_for_work(rqstp);
- try_to_freeze();
- cond_resched();
- err = -EINTR;
- if (signalled() || kthread_should_stop())
- goto out;
+ clear_bit(SP_TASK_PENDING, &pool->sp_flags);
- xprt = svc_get_next_xprt(rqstp, timeout);
- if (IS_ERR(xprt)) {
- err = PTR_ERR(xprt);
- goto out;
+ if (svc_thread_should_stop(rqstp)) {
+ svc_thread_wake_next(rqstp);
+ return;
}
- len = svc_handle_xprt(rqstp, xprt);
+ rqstp->rq_xprt = svc_xprt_dequeue(pool);
+ if (rqstp->rq_xprt) {
+ struct svc_xprt *xprt = rqstp->rq_xprt;
- /* No data, incomplete (TCP) read, or accept() */
- err = -EAGAIN;
- if (len <= 0)
- goto out_release;
+ svc_thread_wake_next(rqstp);
+ /* Normally we will wait up to 5 seconds for any required
+ * cache information to be provided. When there are no
+ * idle threads, we reduce the wait time.
+ */
+ if (pool->sp_idle_threads.first)
+ rqstp->rq_chandle.thread_wait = 5 * HZ;
+ else
+ rqstp->rq_chandle.thread_wait = 1 * HZ;
- clear_bit(XPT_OLD, &xprt->xpt_flags);
+ trace_svc_xprt_dequeue(rqstp);
+ svc_handle_xprt(rqstp, xprt);
+ }
- xprt->xpt_ops->xpo_secure_port(rqstp);
- rqstp->rq_chandle.defer = svc_defer;
- rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (svc_is_backchannel(rqstp)) {
+ struct svc_serv *serv = rqstp->rq_server;
+ struct rpc_rqst *req;
- if (serv->sv_stats)
- serv->sv_stats->netcnt++;
- trace_svc_recv(rqstp, len);
- return len;
-out_release:
- rqstp->rq_res.len = 0;
- svc_xprt_release(rqstp);
-out:
- return err;
+ req = lwq_dequeue(&serv->sv_cb_list,
+ struct rpc_rqst, rq_bc_list);
+ if (req) {
+ svc_thread_wake_next(rqstp);
+ svc_process_bc(req, rqstp);
+ }
+ }
+#endif
}
EXPORT_SYMBOL_GPL(svc_recv);
-/*
- * Drop request
- */
-void svc_drop(struct svc_rqst *rqstp)
-{
- trace_svc_drop(rqstp);
- dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
- svc_xprt_release(rqstp);
-}
-EXPORT_SYMBOL_GPL(svc_drop);
-
-/*
- * Return reply to client.
+/**
+ * svc_send - Return reply to client
+ * @rqstp: RPC transaction context
+ *
*/
-int svc_send(struct svc_rqst *rqstp)
+void svc_send(struct svc_rqst *rqstp)
{
struct svc_xprt *xprt;
- int len = -EFAULT;
struct xdr_buf *xb;
+ int status;
xprt = rqstp->rq_xprt;
- if (!xprt)
- goto out;
-
- /* release the receive skb before sending the reply */
- xprt->xpt_ops->xpo_release_rqst(rqstp);
/* calculate over-all length */
xb = &rqstp->rq_res;
xb->len = xb->head[0].iov_len +
xb->page_len +
xb->tail[0].iov_len;
-
- /* Grab mutex to serialize outgoing data. */
- mutex_lock(&xprt->xpt_mutex);
+ trace_svc_xdr_sendto(rqstp->rq_xid, xb);
trace_svc_stats_latency(rqstp);
- if (test_bit(XPT_DEAD, &xprt->xpt_flags)
- || test_bit(XPT_CLOSE, &xprt->xpt_flags))
- len = -ENOTCONN;
- else
- len = xprt->xpt_ops->xpo_sendto(rqstp);
- mutex_unlock(&xprt->xpt_mutex);
- trace_svc_send(rqstp, len);
- svc_xprt_release(rqstp);
- if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
- len = 0;
-out:
- return len;
+ status = xprt->xpt_ops->xpo_sendto(rqstp);
+
+ trace_svc_send(rqstp, status);
}
/*
@@ -910,7 +922,7 @@ out:
*/
static void svc_age_temp_xprts(struct timer_list *t)
{
- struct svc_serv *serv = from_timer(serv, t, sv_temptimer);
+ struct svc_serv *serv = timer_container_of(serv, t, sv_temptimer);
struct svc_xprt *xprt;
struct list_head *le, *next;
@@ -1002,29 +1014,49 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
struct svc_serv *serv = xprt->xpt_server;
struct svc_deferred_req *dr;
- /* Only do this once */
+ /* unregister with rpcbind for when transport type is TCP or UDP.
+ */
+ if (test_bit(XPT_RPCB_UNREG, &xprt->xpt_flags)) {
+ struct svc_sock *svsk = container_of(xprt, struct svc_sock,
+ sk_xprt);
+ struct socket *sock = svsk->sk_sock;
+
+ if (svc_register(serv, xprt->xpt_net, sock->sk->sk_family,
+ sock->sk->sk_protocol, 0) < 0)
+ pr_warn("failed to unregister %s with rpcbind\n",
+ xprt->xpt_class->xcl_name);
+ }
+
if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags))
- BUG();
+ return;
- dprintk("svc: svc_delete_xprt(%p)\n", xprt);
+ trace_svc_xprt_detach(xprt);
xprt->xpt_ops->xpo_detach(xprt);
+ if (xprt->xpt_bc_xprt)
+ xprt->xpt_bc_xprt->ops->close(xprt->xpt_bc_xprt);
spin_lock_bh(&serv->sv_lock);
list_del_init(&xprt->xpt_list);
- WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
- if (test_bit(XPT_TEMP, &xprt->xpt_flags))
+ if (test_bit(XPT_TEMP, &xprt->xpt_flags) &&
+ !test_bit(XPT_PEER_VALID, &xprt->xpt_flags))
serv->sv_tmpcnt--;
spin_unlock_bh(&serv->sv_lock);
while ((dr = svc_deferred_dequeue(xprt)) != NULL)
- kfree(dr);
+ free_deferred(xprt, dr);
call_xpt_users(xprt);
svc_xprt_put(xprt);
}
-void svc_close_xprt(struct svc_xprt *xprt)
+/**
+ * svc_xprt_close - Close a client connection
+ * @xprt: transport to disconnect
+ *
+ */
+void svc_xprt_close(struct svc_xprt *xprt)
{
+ trace_svc_xprt_close(xprt);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
/* someone else will have to effect the close */
@@ -1037,14 +1069,14 @@ void svc_close_xprt(struct svc_xprt *xprt)
*/
svc_delete_xprt(xprt);
}
-EXPORT_SYMBOL_GPL(svc_close_xprt);
+EXPORT_SYMBOL_GPL(svc_xprt_close);
static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, struct net *net)
{
struct svc_xprt *xprt;
int ret = 0;
- spin_lock(&serv->sv_lock);
+ spin_lock_bh(&serv->sv_lock);
list_for_each_entry(xprt, xprt_list, xpt_list) {
if (xprt->xpt_net != net)
continue;
@@ -1052,44 +1084,39 @@ static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, st
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt);
}
- spin_unlock(&serv->sv_lock);
+ spin_unlock_bh(&serv->sv_lock);
return ret;
}
-static struct svc_xprt *svc_dequeue_net(struct svc_serv *serv, struct net *net)
+static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
{
- struct svc_pool *pool;
struct svc_xprt *xprt;
- struct svc_xprt *tmp;
int i;
for (i = 0; i < serv->sv_nrpools; i++) {
- pool = &serv->sv_pools[i];
-
- spin_lock_bh(&pool->sp_lock);
- list_for_each_entry_safe(xprt, tmp, &pool->sp_sockets, xpt_ready) {
- if (xprt->xpt_net != net)
- continue;
- list_del_init(&xprt->xpt_ready);
- spin_unlock_bh(&pool->sp_lock);
- return xprt;
+ struct svc_pool *pool = &serv->sv_pools[i];
+ struct llist_node *q, **t1, *t2;
+
+ q = lwq_dequeue_all(&pool->sp_xprts);
+ lwq_for_each_safe(xprt, t1, t2, &q, xpt_ready) {
+ if (xprt->xpt_net == net) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_delete_xprt(xprt);
+ xprt = NULL;
+ }
}
- spin_unlock_bh(&pool->sp_lock);
- }
- return NULL;
-}
-
-static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
-{
- struct svc_xprt *xprt;
- while ((xprt = svc_dequeue_net(serv, net))) {
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
- svc_delete_xprt(xprt);
+ if (q)
+ lwq_enqueue_batch(q, &pool->sp_xprts);
}
}
-/*
+/**
+ * svc_xprt_destroy_all - Destroy transports associated with @serv
+ * @serv: RPC service to be shut down
+ * @net: target network namespace
+ * @unregister: true if it is OK to unregister the destroyed xprts
+ *
* Server threads may still be running (especially in the case where the
* service is still running in other network namespaces).
*
@@ -1101,7 +1128,8 @@ static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
* threads, we may need to wait a little while and then check again to
* see if they're done.
*/
-void svc_close_net(struct svc_serv *serv, struct net *net)
+void svc_xprt_destroy_all(struct svc_serv *serv, struct net *net,
+ bool unregister)
{
int delay = 0;
@@ -1111,7 +1139,11 @@ void svc_close_net(struct svc_serv *serv, struct net *net)
svc_clean_up_xprts(serv, net);
msleep(delay++);
}
+
+ if (unregister)
+ svc_rpcb_cleanup(serv, net);
}
+EXPORT_SYMBOL_GPL(svc_xprt_destroy_all);
/*
* Handle defer and revisit of requests
@@ -1127,16 +1159,15 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
set_bit(XPT_DEFERRED, &xprt->xpt_flags);
if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) {
spin_unlock(&xprt->xpt_lock);
- dprintk("revisit canceled\n");
+ trace_svc_defer_drop(dr);
+ free_deferred(xprt, dr);
svc_xprt_put(xprt);
- trace_svc_drop_deferred(dr);
- kfree(dr);
return;
}
- dprintk("revisit queued\n");
dr->xprt = NULL;
list_add(&dr->handle.recent, &xprt->xpt_deferred);
spin_unlock(&xprt->xpt_lock);
+ trace_svc_defer_queue(dr);
svc_xprt_enqueue(xprt);
svc_xprt_put(xprt);
}
@@ -1175,44 +1206,50 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
dr->addrlen = rqstp->rq_addrlen;
dr->daddr = rqstp->rq_daddr;
dr->argslen = rqstp->rq_arg.len >> 2;
- dr->xprt_hlen = rqstp->rq_xprt_hlen;
/* back up head to the start of the buffer and copy */
skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip,
dr->argslen << 2);
}
+ dr->xprt_ctxt = rqstp->rq_xprt_ctxt;
+ rqstp->rq_xprt_ctxt = NULL;
+ trace_svc_defer(rqstp);
svc_xprt_get(rqstp->rq_xprt);
dr->xprt = rqstp->rq_xprt;
set_bit(RQ_DROPME, &rqstp->rq_flags);
dr->handle.revisit = svc_revisit;
- trace_svc_defer(rqstp);
return &dr->handle;
}
/*
* recv data from a deferred request into an active one
*/
-static int svc_deferred_recv(struct svc_rqst *rqstp)
+static noinline int svc_deferred_recv(struct svc_rqst *rqstp)
{
struct svc_deferred_req *dr = rqstp->rq_deferred;
+ trace_svc_defer_recv(dr);
+
/* setup iov_base past transport header */
- rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2);
+ rqstp->rq_arg.head[0].iov_base = dr->args;
/* The iov_len does not include the transport header bytes */
- rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen;
+ rqstp->rq_arg.head[0].iov_len = dr->argslen << 2;
rqstp->rq_arg.page_len = 0;
/* The rq_arg.len includes the transport header bytes */
- rqstp->rq_arg.len = dr->argslen<<2;
+ rqstp->rq_arg.len = dr->argslen << 2;
rqstp->rq_prot = dr->prot;
memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
rqstp->rq_addrlen = dr->addrlen;
/* Save off transport header len in case we get deferred again */
- rqstp->rq_xprt_hlen = dr->xprt_hlen;
rqstp->rq_daddr = dr->daddr;
rqstp->rq_respages = rqstp->rq_pages;
- return (dr->argslen<<2) - dr->xprt_hlen;
+ rqstp->rq_xprt_ctxt = dr->xprt_ctxt;
+
+ dr->xprt_ctxt = NULL;
+ svc_xprt_received(rqstp->rq_xprt);
+ return dr->argslen << 2;
}
@@ -1228,7 +1265,6 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
struct svc_deferred_req,
handle.recent);
list_del_init(&dr->handle.recent);
- trace_svc_revisit_deferred(dr);
} else
clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
spin_unlock(&xprt->xpt_lock);
@@ -1236,6 +1272,40 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
}
/**
+ * svc_find_listener - find an RPC transport instance
+ * @serv: pointer to svc_serv to search
+ * @xcl_name: C string containing transport's class name
+ * @net: owner net pointer
+ * @sa: sockaddr containing address
+ *
+ * Return the transport instance pointer for the endpoint accepting
+ * connections/peer traffic from the specified transport class,
+ * and matching sockaddr.
+ */
+struct svc_xprt *svc_find_listener(struct svc_serv *serv, const char *xcl_name,
+ struct net *net, const struct sockaddr *sa)
+{
+ struct svc_xprt *xprt;
+ struct svc_xprt *found = NULL;
+
+ spin_lock_bh(&serv->sv_lock);
+ list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
+ if (xprt->xpt_net != net)
+ continue;
+ if (strcmp(xprt->xpt_class->xcl_name, xcl_name))
+ continue;
+ if (!rpc_cmp_addr_port(sa, (struct sockaddr *)&xprt->xpt_local))
+ continue;
+ found = xprt;
+ svc_xprt_get(xprt);
+ break;
+ }
+ spin_unlock_bh(&serv->sv_lock);
+ return found;
+}
+EXPORT_SYMBOL_GPL(svc_find_listener);
+
+/**
* svc_find_xprt - find an RPC transport instance
* @serv: pointer to svc_serv to search
* @xcl_name: C string containing transport's class name
@@ -1338,29 +1408,36 @@ int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen)
}
EXPORT_SYMBOL_GPL(svc_xprt_names);
-
/*----------------------------------------------------------------------------*/
static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos)
{
unsigned int pidx = (unsigned int)*pos;
- struct svc_serv *serv = m->private;
+ struct svc_info *si = m->private;
dprintk("svc_pool_stats_start, *pidx=%u\n", pidx);
+ mutex_lock(si->mutex);
+
if (!pidx)
return SEQ_START_TOKEN;
- return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]);
+ if (!si->serv)
+ return NULL;
+ return pidx > si->serv->sv_nrpools ? NULL
+ : &si->serv->sv_pools[pidx - 1];
}
static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos)
{
struct svc_pool *pool = p;
- struct svc_serv *serv = m->private;
+ struct svc_info *si = m->private;
+ struct svc_serv *serv = si->serv;
dprintk("svc_pool_stats_next, *pos=%llu\n", *pos);
- if (p == SEQ_START_TOKEN) {
+ if (!serv) {
+ pool = NULL;
+ } else if (p == SEQ_START_TOKEN) {
pool = &serv->sv_pools[0];
} else {
unsigned int pidx = (pool - &serv->sv_pools[0]);
@@ -1375,6 +1452,9 @@ static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos)
static void svc_pool_stats_stop(struct seq_file *m, void *p)
{
+ struct svc_info *si = m->private;
+
+ mutex_unlock(si->mutex);
}
static int svc_pool_stats_show(struct seq_file *m, void *p)
@@ -1386,12 +1466,11 @@ static int svc_pool_stats_show(struct seq_file *m, void *p)
return 0;
}
- seq_printf(m, "%u %lu %lu %lu %lu\n",
- pool->sp_id,
- (unsigned long)atomic_long_read(&pool->sp_stats.packets),
- pool->sp_stats.sockets_queued,
- (unsigned long)atomic_long_read(&pool->sp_stats.threads_woken),
- (unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout));
+ seq_printf(m, "%u %llu %llu %llu 0\n",
+ pool->sp_id,
+ percpu_counter_sum_positive(&pool->sp_messages_arrived),
+ percpu_counter_sum_positive(&pool->sp_sockets_queued),
+ percpu_counter_sum_positive(&pool->sp_threads_woken));
return 0;
}
@@ -1403,14 +1482,18 @@ static const struct seq_operations svc_pool_stats_seq_ops = {
.show = svc_pool_stats_show,
};
-int svc_pool_stats_open(struct svc_serv *serv, struct file *file)
+int svc_pool_stats_open(struct svc_info *info, struct file *file)
{
+ struct seq_file *seq;
int err;
err = seq_open(file, &svc_pool_stats_seq_ops);
- if (!err)
- ((struct seq_file *) file->private_data)->private = serv;
- return err;
+ if (err)
+ return err;
+ seq = file->private_data;
+ seq->private = info;
+
+ return 0;
}
EXPORT_SYMBOL(svc_pool_stats_open);