]> Git Repo - linux.git/blobdiff - net/sunrpc/xprt.c
SUNRPC: Add a label for RPC calls that require allocation on receive
[linux.git] / net / sunrpc / xprt.c
index a8db2e3f89044460741fe41dcb6dcadfd7b2d669..7333874c65954509845c1990bbcffb725b6ab80b 100644 (file)
@@ -68,8 +68,6 @@
 static void     xprt_init(struct rpc_xprt *xprt, struct net *net);
 static __be32  xprt_alloc_xid(struct rpc_xprt *xprt);
 static void    xprt_connect_status(struct rpc_task *task);
-static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
-static void     __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
 static void     xprt_destroy(struct rpc_xprt *xprt);
 
 static DEFINE_SPINLOCK(xprt_list_lock);
@@ -171,6 +169,17 @@ out:
 }
 EXPORT_SYMBOL_GPL(xprt_load_transport);
 
+static void xprt_clear_locked(struct rpc_xprt *xprt)
+{
+       xprt->snd_task = NULL;
+       if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
+               smp_mb__before_atomic();
+               clear_bit(XPRT_LOCKED, &xprt->state);
+               smp_mb__after_atomic();
+       } else
+               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
+}
+
 /**
  * xprt_reserve_xprt - serialize write access to transports
  * @task: task that is requesting access to the transport
@@ -183,44 +192,53 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       int priority;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
                if (task == xprt->snd_task)
                        return 1;
                goto out_sleep;
        }
+       if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
+               goto out_unlock;
        xprt->snd_task = task;
-       if (req != NULL)
-               req->rq_ntrans++;
 
        return 1;
 
+out_unlock:
+       xprt_clear_locked(xprt);
 out_sleep:
        dprintk("RPC: %5u failed to lock transport %p\n",
                        task->tk_pid, xprt);
-       task->tk_timeout = 0;
+       task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
        task->tk_status = -EAGAIN;
-       if (req == NULL)
-               priority = RPC_PRIORITY_LOW;
-       else if (!req->rq_ntrans)
-               priority = RPC_PRIORITY_NORMAL;
-       else
-               priority = RPC_PRIORITY_HIGH;
-       rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+       rpc_sleep_on(&xprt->sending, task, NULL);
        return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 
-static void xprt_clear_locked(struct rpc_xprt *xprt)
+static bool
+xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
 {
-       xprt->snd_task = NULL;
-       if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
-               smp_mb__before_atomic();
-               clear_bit(XPRT_LOCKED, &xprt->state);
-               smp_mb__after_atomic();
-       } else
-               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
+       return test_bit(XPRT_CWND_WAIT, &xprt->state);
+}
+
+static void
+xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
+{
+       if (!list_empty(&xprt->xmit_queue)) {
+               /* Peek at head of queue to see if it can make progress */
+               if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
+                                       rq_xmit)->rq_cong)
+                       return;
+       }
+       set_bit(XPRT_CWND_WAIT, &xprt->state);
+}
+
+static void
+xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
+{
+       if (!RPCXPRT_CONGESTED(xprt))
+               clear_bit(XPRT_CWND_WAIT, &xprt->state);
 }
 
 /*
@@ -230,11 +248,11 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
  * integrated into the decision of whether a request is allowed to be
  * woken up and given access to the transport.
+ * Note that the lock is only granted if we know there are free slots.
  */
 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       int priority;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
                if (task == xprt->snd_task)
@@ -245,25 +263,19 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
                xprt->snd_task = task;
                return 1;
        }
-       if (__xprt_get_cong(xprt, task)) {
+       if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
+               goto out_unlock;
+       if (!xprt_need_congestion_window_wait(xprt)) {
                xprt->snd_task = task;
-               req->rq_ntrans++;
                return 1;
        }
+out_unlock:
        xprt_clear_locked(xprt);
 out_sleep:
-       if (req)
-               __xprt_put_cong(xprt, req);
        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
-       task->tk_timeout = 0;
+       task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
        task->tk_status = -EAGAIN;
-       if (req == NULL)
-               priority = RPC_PRIORITY_LOW;
-       else if (!req->rq_ntrans)
-               priority = RPC_PRIORITY_NORMAL;
-       else
-               priority = RPC_PRIORITY_HIGH;
-       rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
+       rpc_sleep_on(&xprt->sending, task, NULL);
        return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -272,6 +284,8 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        int retval;
 
+       if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
+               return 1;
        spin_lock_bh(&xprt->transport_lock);
        retval = xprt->ops->reserve_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
@@ -281,12 +295,8 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
 {
        struct rpc_xprt *xprt = data;
-       struct rpc_rqst *req;
 
-       req = task->tk_rqstp;
        xprt->snd_task = task;
-       if (req)
-               req->rq_ntrans++;
        return true;
 }
 
@@ -294,53 +304,30 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 {
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
-
+       if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
+               goto out_unlock;
        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
                                __xprt_lock_write_func, xprt))
                return;
+out_unlock:
        xprt_clear_locked(xprt);
 }
 
-static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
-{
-       struct rpc_xprt *xprt = data;
-       struct rpc_rqst *req;
-
-       req = task->tk_rqstp;
-       if (req == NULL) {
-               xprt->snd_task = task;
-               return true;
-       }
-       if (__xprt_get_cong(xprt, task)) {
-               xprt->snd_task = task;
-               req->rq_ntrans++;
-               return true;
-       }
-       return false;
-}
-
 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
-       if (RPCXPRT_CONGESTED(xprt))
+       if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
+               goto out_unlock;
+       if (xprt_need_congestion_window_wait(xprt))
                goto out_unlock;
        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
-                               __xprt_lock_write_cong_func, xprt))
+                               __xprt_lock_write_func, xprt))
                return;
 out_unlock:
        xprt_clear_locked(xprt);
 }
 
-static void xprt_task_clear_bytes_sent(struct rpc_task *task)
-{
-       if (task != NULL) {
-               struct rpc_rqst *req = task->tk_rqstp;
-               if (req != NULL)
-                       req->rq_bytes_sent = 0;
-       }
-}
-
 /**
  * xprt_release_xprt - allow other requests to use a transport
  * @xprt: transport with other tasks potentially waiting
@@ -351,7 +338,6 @@ static void xprt_task_clear_bytes_sent(struct rpc_task *task)
 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
-               xprt_task_clear_bytes_sent(task);
                xprt_clear_locked(xprt);
                __xprt_lock_write_next(xprt);
        }
@@ -369,7 +355,6 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);
 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
-               xprt_task_clear_bytes_sent(task);
                xprt_clear_locked(xprt);
                __xprt_lock_write_next_cong(xprt);
        }
@@ -378,6 +363,8 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 
 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
+       if (xprt->snd_task != task)
+               return;
        spin_lock_bh(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
@@ -388,16 +375,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
  * overflowed. Put the task to sleep if this is the case.
  */
 static int
-__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
-
        if (req->rq_cong)
                return 1;
        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
-                       task->tk_pid, xprt->cong, xprt->cwnd);
-       if (RPCXPRT_CONGESTED(xprt))
+                       req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
+       if (RPCXPRT_CONGESTED(xprt)) {
+               xprt_set_congestion_window_wait(xprt);
                return 0;
+       }
        req->rq_cong = 1;
        xprt->cong += RPC_CWNDSCALE;
        return 1;
@@ -414,9 +401,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
                return;
        req->rq_cong = 0;
        xprt->cong -= RPC_CWNDSCALE;
+       xprt_test_and_clear_congestion_window_wait(xprt);
        __xprt_lock_write_next_cong(xprt);
 }
 
+/**
+ * xprt_request_get_cong - Request congestion control credits
+ * @xprt: pointer to transport
+ * @req: pointer to RPC request
+ *
+ * Useful for transports that require congestion control.
+ */
+bool
+xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+       bool ret = false;
+
+       if (req->rq_cong)
+               return true;
+       spin_lock_bh(&xprt->transport_lock);
+       ret = __xprt_get_cong(xprt, req) != 0;
+       spin_unlock_bh(&xprt->transport_lock);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(xprt_request_get_cong);
+
 /**
  * xprt_release_rqst_cong - housekeeping when request is complete
  * @task: RPC request that recently completed
@@ -431,6 +440,20 @@ void xprt_release_rqst_cong(struct rpc_task *task)
 }
 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 
+/*
+ * Clear the congestion window wait flag and wake up the next
+ * entry on xprt->sending
+ */
+static void
+xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
+{
+       if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
+               spin_lock_bh(&xprt->transport_lock);
+               __xprt_lock_write_next_cong(xprt);
+               spin_unlock_bh(&xprt->transport_lock);
+       }
+}
+
 /**
  * xprt_adjust_cwnd - adjust transport congestion window
  * @xprt: pointer to xprt
@@ -488,39 +511,46 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 
 /**
  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
- * @task: task to be put to sleep
- * @action: function pointer to be executed after wait
+ * @xprt: transport
  *
  * Note that we only set the timer for the case of RPC_IS_SOFT(), since
  * we don't in general want to force a socket disconnection due to
  * an incomplete RPC call transmission.
  */
-void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
+void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
-       struct rpc_xprt *xprt = req->rq_xprt;
-
-       task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
-       rpc_sleep_on(&xprt->pending, task, action);
+       set_bit(XPRT_WRITE_SPACE, &xprt->state);
 }
 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 
+static bool
+xprt_clear_write_space_locked(struct rpc_xprt *xprt)
+{
+       if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
+               __xprt_lock_write_next(xprt);
+               dprintk("RPC:       write space: waking waiting task on "
+                               "xprt %p\n", xprt);
+               return true;
+       }
+       return false;
+}
+
 /**
  * xprt_write_space - wake the task waiting for transport output buffer space
  * @xprt: transport with waiting tasks
  *
  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
  */
-void xprt_write_space(struct rpc_xprt *xprt)
+bool xprt_write_space(struct rpc_xprt *xprt)
 {
+       bool ret;
+
+       if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
+               return false;
        spin_lock_bh(&xprt->transport_lock);
-       if (xprt->snd_task) {
-               dprintk("RPC:       write space: waking waiting task on "
-                               "xprt %p\n", xprt);
-               rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
-                               &xprt->pending, xprt->snd_task);
-       }
+       ret = xprt_clear_write_space_locked(xprt);
        spin_unlock_bh(&xprt->transport_lock);
+       return ret;
 }
 EXPORT_SYMBOL_GPL(xprt_write_space);
 
@@ -631,6 +661,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt)
        dprintk("RPC:       disconnected transport %p\n", xprt);
        spin_lock_bh(&xprt->transport_lock);
        xprt_clear_connected(xprt);
+       xprt_clear_write_space_locked(xprt);
        xprt_wake_pending_tasks(xprt, -EAGAIN);
        spin_unlock_bh(&xprt->transport_lock);
 }
@@ -654,6 +685,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
+static unsigned int
+xprt_connect_cookie(struct rpc_xprt *xprt)
+{
+       return READ_ONCE(xprt->connect_cookie);
+}
+
+static bool
+xprt_request_retransmit_after_disconnect(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
+               !xprt_connected(xprt);
+}
+
 /**
  * xprt_conditional_disconnect - force a transport to disconnect
  * @xprt: transport to disconnect
@@ -692,7 +739,7 @@ static void
 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
        __must_hold(&xprt->transport_lock)
 {
-       if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
+       if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
                mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 }
 
@@ -702,7 +749,7 @@ xprt_init_autodisconnect(struct timer_list *t)
        struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 
        spin_lock(&xprt->transport_lock);
-       if (!list_empty(&xprt->recv))
+       if (!RB_EMPTY_ROOT(&xprt->recv_queue))
                goto out_abort;
        /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
        xprt->last_used = jiffies;
@@ -726,7 +773,6 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
                goto out;
        if (xprt->snd_task != task)
                goto out;
-       xprt_task_clear_bytes_sent(task);
        xprt->snd_task = cookie;
        ret = true;
 out:
@@ -772,7 +818,6 @@ void xprt_connect(struct rpc_task *task)
                xprt->ops->close(xprt);
 
        if (!xprt_connected(xprt)) {
-               task->tk_rqstp->rq_bytes_sent = 0;
                task->tk_timeout = task->tk_rqstp->rq_timeout;
                task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -821,23 +866,92 @@ static void xprt_connect_status(struct rpc_task *task)
        }
 }
 
+enum xprt_xid_rb_cmp {
+       XID_RB_EQUAL,
+       XID_RB_LEFT,
+       XID_RB_RIGHT,
+};
+static enum xprt_xid_rb_cmp
+xprt_xid_cmp(__be32 xid1, __be32 xid2)
+{
+       if (xid1 == xid2)
+               return XID_RB_EQUAL;
+       if ((__force u32)xid1 < (__force u32)xid2)
+               return XID_RB_LEFT;
+       return XID_RB_RIGHT;
+}
+
+static struct rpc_rqst *
+xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
+{
+       struct rb_node *n = xprt->recv_queue.rb_node;
+       struct rpc_rqst *req;
+
+       while (n != NULL) {
+               req = rb_entry(n, struct rpc_rqst, rq_recv);
+               switch (xprt_xid_cmp(xid, req->rq_xid)) {
+               case XID_RB_LEFT:
+                       n = n->rb_left;
+                       break;
+               case XID_RB_RIGHT:
+                       n = n->rb_right;
+                       break;
+               case XID_RB_EQUAL:
+                       return req;
+               }
+       }
+       return NULL;
+}
+
+static void
+xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
+{
+       struct rb_node **p = &xprt->recv_queue.rb_node;
+       struct rb_node *n = NULL;
+       struct rpc_rqst *req;
+
+       while (*p != NULL) {
+               n = *p;
+               req = rb_entry(n, struct rpc_rqst, rq_recv);
+               switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
+               case XID_RB_LEFT:
+                       p = &n->rb_left;
+                       break;
+               case XID_RB_RIGHT:
+                       p = &n->rb_right;
+                       break;
+               case XID_RB_EQUAL:
+                       WARN_ON_ONCE(new != req);
+                       return;
+               }
+       }
+       rb_link_node(&new->rq_recv, n, p);
+       rb_insert_color(&new->rq_recv, &xprt->recv_queue);
+}
+
+static void
+xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+       rb_erase(&req->rq_recv, &xprt->recv_queue);
+}
+
 /**
  * xprt_lookup_rqst - find an RPC request corresponding to an XID
  * @xprt: transport on which the original request was transmitted
  * @xid: RPC XID of incoming reply
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 {
        struct rpc_rqst *entry;
 
-       list_for_each_entry(entry, &xprt->recv, rq_list)
-               if (entry->rq_xid == xid) {
-                       trace_xprt_lookup_rqst(xprt, xid, 0);
-                       entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
-                       return entry;
-               }
+       entry = xprt_request_rb_find(xprt, xid);
+       if (entry != NULL) {
+               trace_xprt_lookup_rqst(xprt, xid, 0);
+               entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
+               return entry;
+       }
 
        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
                        ntohl(xid));
@@ -847,16 +961,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 }
 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
+static bool
+xprt_is_pinned_rqst(struct rpc_rqst *req)
+{
+       return atomic_read(&req->rq_pin) != 0;
+}
+
 /**
  * xprt_pin_rqst - Pin a request on the transport receive list
  * @req: Request to pin
  *
  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
- * so should be holding the xprt transport lock.
+ * so should be holding the xprt receive lock.
  */
 void xprt_pin_rqst(struct rpc_rqst *req)
 {
-       set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+       atomic_inc(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 
@@ -864,38 +984,87 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst);
  * xprt_unpin_rqst - Unpin a request on the transport receive list
  * @req: Request to pin
  *
- * Caller should be holding the xprt transport lock.
+ * Caller should be holding the xprt receive lock.
  */
 void xprt_unpin_rqst(struct rpc_rqst *req)
 {
-       struct rpc_task *task = req->rq_task;
-
-       clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
-       if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
-               wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+       if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
+               atomic_dec(&req->rq_pin);
+               return;
+       }
+       if (atomic_dec_and_test(&req->rq_pin))
+               wake_up_var(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
 
 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->recv_lock)
 {
-       struct rpc_task *task = req->rq_task;
+       wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
+}
 
-       if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
-               spin_unlock(&req->rq_xprt->recv_lock);
-               set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-               wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
-                               TASK_UNINTERRUPTIBLE);
-               clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-               spin_lock(&req->rq_xprt->recv_lock);
-       }
+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+       return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+               READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
+}
+
+static bool
+xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
+{
+       return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+               READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
+}
+
+/**
+ * xprt_request_enqueue_receive - Add an request to the receive queue
+ * @task: RPC task
+ *
+ */
+void
+xprt_request_enqueue_receive(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (!xprt_request_need_enqueue_receive(task, req))
+               return;
+       spin_lock(&xprt->queue_lock);
+
+       /* Update the softirq receive buffer */
+       memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
+                       sizeof(req->rq_private_buf));
+
+       /* Add request to the receive list */
+       xprt_request_rb_insert(xprt, req);
+       set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
+       spin_unlock(&xprt->queue_lock);
+
+       xprt_reset_majortimeo(req);
+       /* Turn off autodisconnect */
+       del_singleshot_timer_sync(&xprt->timer);
+}
+
+/**
+ * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
+ * @task: RPC task
+ *
+ * Caller must hold xprt->queue_lock.
+ */
+static void
+xprt_request_dequeue_receive_locked(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
+               xprt_request_rb_remove(req->rq_xprt, req);
 }
 
 /**
  * xprt_update_rtt - Update RPC RTT statistics
  * @task: RPC request that recently completed
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 void xprt_update_rtt(struct rpc_task *task)
 {
@@ -917,7 +1086,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt);
  * @task: RPC request that recently completed
  * @copied: actual number of bytes received from the transport
  *
- * Caller holds xprt->recv_lock.
+ * Caller holds xprt->queue_lock.
  */
 void xprt_complete_rqst(struct rpc_task *task, int copied)
 {
@@ -930,12 +1099,12 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
 
        xprt->stat.recvs++;
 
-       list_del_init(&req->rq_list);
        req->rq_private_buf.len = copied;
        /* Ensure all writes are done before we update */
        /* req->rq_reply_bytes_recvd */
        smp_wmb();
        req->rq_reply_bytes_recvd = copied;
+       xprt_request_dequeue_receive_locked(task);
        rpc_wake_up_queued_task(&xprt->pending, task);
 }
 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
@@ -956,6 +1125,156 @@ static void xprt_timer(struct rpc_task *task)
                task->tk_status = 0;
 }
 
+/**
+ * xprt_request_wait_receive - wait for the reply to an RPC request
+ * @task: RPC task about to send a request
+ *
+ */
+void xprt_request_wait_receive(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
+               return;
+       /*
+        * Sleep on the pending queue if we're expecting a reply.
+        * The spinlock ensures atomicity between the test of
+        * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
+        */
+       spin_lock(&xprt->queue_lock);
+       if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
+               xprt->ops->set_retrans_timeout(task);
+               rpc_sleep_on(&xprt->pending, task, xprt_timer);
+               /*
+                * Send an extra queue wakeup call if the
+                * connection was dropped in case the call to
+                * rpc_sleep_on() raced.
+                */
+               if (xprt_request_retransmit_after_disconnect(task))
+                       rpc_wake_up_queued_task_set_status(&xprt->pending,
+                                       task, -ENOTCONN);
+       }
+       spin_unlock(&xprt->queue_lock);
+}
+
+static bool
+xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
+{
+       return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+}
+
+/**
+ * xprt_request_enqueue_transmit - queue a task for transmission
+ * @task: pointer to rpc_task
+ *
+ * Add a task to the transmission queue.
+ */
+void
+xprt_request_enqueue_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *pos, *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (xprt_request_need_enqueue_transmit(task, req)) {
+               spin_lock(&xprt->queue_lock);
+               /*
+                * Requests that carry congestion control credits are added
+                * to the head of the list to avoid starvation issues.
+                */
+               if (req->rq_cong) {
+                       xprt_clear_congestion_window_wait(xprt);
+                       list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
+                               if (pos->rq_cong)
+                                       continue;
+                               /* Note: req is added _before_ pos */
+                               list_add_tail(&req->rq_xmit, &pos->rq_xmit);
+                               INIT_LIST_HEAD(&req->rq_xmit2);
+                               goto out;
+                       }
+               } else if (RPC_IS_SWAPPER(task)) {
+                       list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
+                               if (pos->rq_cong || pos->rq_bytes_sent)
+                                       continue;
+                               if (RPC_IS_SWAPPER(pos->rq_task))
+                                       continue;
+                               /* Note: req is added _before_ pos */
+                               list_add_tail(&req->rq_xmit, &pos->rq_xmit);
+                               INIT_LIST_HEAD(&req->rq_xmit2);
+                               goto out;
+                       }
+               } else {
+                       list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
+                               if (pos->rq_task->tk_owner != task->tk_owner)
+                                       continue;
+                               list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
+                               INIT_LIST_HEAD(&req->rq_xmit);
+                               goto out;
+                       }
+               }
+               list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
+               INIT_LIST_HEAD(&req->rq_xmit2);
+out:
+               set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+               spin_unlock(&xprt->queue_lock);
+       }
+}
+
+/**
+ * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ * Caller must hold xprt->queue_lock
+ */
+static void
+xprt_request_dequeue_transmit_locked(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+               return;
+       if (!list_empty(&req->rq_xmit)) {
+               list_del(&req->rq_xmit);
+               if (!list_empty(&req->rq_xmit2)) {
+                       struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
+                                       struct rpc_rqst, rq_xmit2);
+                       list_del(&req->rq_xmit2);
+                       list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
+               }
+       } else
+               list_del(&req->rq_xmit2);
+}
+
+/**
+ * xprt_request_dequeue_transmit - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ */
+static void
+xprt_request_dequeue_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       spin_lock(&xprt->queue_lock);
+       xprt_request_dequeue_transmit_locked(task);
+       spin_unlock(&xprt->queue_lock);
+}
+
+/**
+ * xprt_request_need_retransmit - Test if a task needs retransmission
+ * @task: pointer to rpc_task
+ *
+ * Test for whether a connection breakage requires the task to retransmit
+ */
+bool
+xprt_request_need_retransmit(struct rpc_task *task)
+{
+       return xprt_request_retransmit_after_disconnect(task);
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -965,32 +1284,18 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
-       bool ret = false;
 
        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 
-       spin_lock_bh(&xprt->transport_lock);
-       if (!req->rq_bytes_sent) {
-               if (req->rq_reply_bytes_recvd) {
-                       task->tk_status = req->rq_reply_bytes_recvd;
-                       goto out_unlock;
-               }
-               if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
-                   && xprt_connected(xprt)
-                   && req->rq_connect_cookie == xprt->connect_cookie) {
-                       xprt->ops->set_retrans_timeout(task);
-                       rpc_sleep_on(&xprt->pending, task, xprt_timer);
-                       goto out_unlock;
-               }
-       }
-       if (!xprt->ops->reserve_xprt(xprt, task)) {
-               task->tk_status = -EAGAIN;
-               goto out_unlock;
+       if (!xprt_lock_write(xprt, task)) {
+               /* Race breaker: someone may have transmitted us */
+               if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+                       rpc_wake_up_queued_task_set_status(&xprt->sending,
+                                       task, 0);
+               return false;
+
        }
-       ret = true;
-out_unlock:
-       spin_unlock_bh(&xprt->transport_lock);
-       return ret;
+       return true;
 }
 
 void xprt_end_transmit(struct rpc_task *task)
@@ -999,54 +1304,62 @@ void xprt_end_transmit(struct rpc_task *task)
 }
 
 /**
- * xprt_transmit - send an RPC request on a transport
- * @task: controlling RPC task
+ * xprt_request_transmit - send an RPC request on a transport
+ * @req: pointer to request to transmit
+ * @snd_task: RPC task that owns the transport lock
  *
- * We have to copy the iovec because sendmsg fiddles with its contents.
+ * This performs the transmission of a single request.
+ * Note that if the request is not the same as snd_task, then it
+ * does need to be pinned.
+ * Returns '0' on success.
  */
-void xprt_transmit(struct rpc_task *task)
+static int
+xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
-       struct rpc_xprt *xprt = req->rq_xprt;
+       struct rpc_xprt *xprt = req->rq_xprt;
+       struct rpc_task *task = req->rq_task;
        unsigned int connect_cookie;
+       int is_retrans = RPC_WAS_SENT(task);
        int status;
 
        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 
-       if (!req->rq_reply_bytes_recvd) {
-               if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
-                       /*
-                        * Add to the list only if we're expecting a reply
-                        */
-                       /* Update the softirq receive buffer */
-                       memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
-                                       sizeof(req->rq_private_buf));
-                       /* Add request to the receive list */
-                       spin_lock(&xprt->recv_lock);
-                       list_add_tail(&req->rq_list, &xprt->recv);
-                       spin_unlock(&xprt->recv_lock);
-                       xprt_reset_majortimeo(req);
-                       /* Turn off autodisconnect */
-                       del_singleshot_timer_sync(&xprt->timer);
+       if (!req->rq_bytes_sent) {
+               if (xprt_request_data_received(task)) {
+                       status = 0;
+                       goto out_dequeue;
                }
-       } else if (!req->rq_bytes_sent)
-               return;
+               /* Verify that our message lies in the RPCSEC_GSS window */
+               if (rpcauth_xmit_need_reencode(task)) {
+                       status = -EBADMSG;
+                       goto out_dequeue;
+               }
+       }
+
+       /*
+        * Update req->rq_ntrans before transmitting to avoid races with
+        * xprt_update_rtt(), which needs to know that it is recording a
+        * reply to the first transmission.
+        */
+       req->rq_ntrans++;
 
        connect_cookie = xprt->connect_cookie;
-       status = xprt->ops->send_request(task);
+       status = xprt->ops->send_request(req);
        trace_xprt_transmit(xprt, req->rq_xid, status);
        if (status != 0) {
-               task->tk_status = status;
-               return;
+               req->rq_ntrans--;
+               return status;
        }
+
+       if (is_retrans)
+               task->tk_client->cl_stats->rpcretrans++;
+
        xprt_inject_disconnect(xprt);
 
        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
        task->tk_flags |= RPC_TASK_SENT;
        spin_lock_bh(&xprt->transport_lock);
 
-       xprt->ops->set_retrans_timeout(task);
-
        xprt->stat.sends++;
        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
        xprt->stat.bklog_u += xprt->backlog.qlen;
@@ -1055,25 +1368,49 @@ void xprt_transmit(struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 
        req->rq_connect_cookie = connect_cookie;
-       if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
-               /*
-                * Sleep on the pending queue if we're expecting a reply.
-                * The spinlock ensures atomicity between the test of
-                * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
-                */
-               spin_lock(&xprt->recv_lock);
-               if (!req->rq_reply_bytes_recvd) {
-                       rpc_sleep_on(&xprt->pending, task, xprt_timer);
-                       /*
-                        * Send an extra queue wakeup call if the
-                        * connection was dropped in case the call to
-                        * rpc_sleep_on() raced.
-                        */
-                       if (!xprt_connected(xprt))
-                               xprt_wake_pending_tasks(xprt, -ENOTCONN);
-               }
-               spin_unlock(&xprt->recv_lock);
+out_dequeue:
+       xprt_request_dequeue_transmit(task);
+       rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
+       return status;
+}
+
+/**
+ * xprt_transmit - send an RPC request on a transport
+ * @task: controlling RPC task
+ *
+ * Attempts to drain the transmit queue. On exit, either the transport
+ * signalled an error that needs to be handled before transmission can
+ * resume, or @task finished transmitting, and detected that it already
+ * received a reply.
+ */
+void
+xprt_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *next, *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+       int status;
+
+       spin_lock(&xprt->queue_lock);
+       while (!list_empty(&xprt->xmit_queue)) {
+               next = list_first_entry(&xprt->xmit_queue,
+                               struct rpc_rqst, rq_xmit);
+               xprt_pin_rqst(next);
+               spin_unlock(&xprt->queue_lock);
+               status = xprt_request_transmit(next, task);
+               if (status == -EBADMSG && next != req)
+                       status = 0;
+               cond_resched();
+               spin_lock(&xprt->queue_lock);
+               xprt_unpin_rqst(next);
+               if (status == 0) {
+                       if (!xprt_request_data_received(task) ||
+                           test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+                               continue;
+               } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+                       task->tk_status = status;
+               break;
        }
+       spin_unlock(&xprt->queue_lock);
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
@@ -1170,20 +1507,6 @@ out_init_req:
 }
 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
 
-void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-       /* Note: grabbing the xprt_lock_write() ensures that we throttle
-        * new slot allocation if the transport is congested (i.e. when
-        * reconnecting a stream transport or when out of socket write
-        * buffer space).
-        */
-       if (xprt_lock_write(xprt, task)) {
-               xprt_alloc_slot(xprt, task);
-               xprt_release_write(xprt, task);
-       }
-}
-EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot);
-
 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
        spin_lock(&xprt->reserve_lock);
@@ -1250,6 +1573,60 @@ void xprt_free(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_free);
 
+static void
+xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
+{
+       req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
+}
+
+static __be32
+xprt_alloc_xid(struct rpc_xprt *xprt)
+{
+       __be32 xid;
+
+       spin_lock(&xprt->reserve_lock);
+       xid = (__force __be32)xprt->xid++;
+       spin_unlock(&xprt->reserve_lock);
+       return xid;
+}
+
+static void
+xprt_init_xid(struct rpc_xprt *xprt)
+{
+       xprt->xid = prandom_u32();
+}
+
+static void
+xprt_request_init(struct rpc_task *task)
+{
+       struct rpc_xprt *xprt = task->tk_xprt;
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       req->rq_timeout = task->tk_client->cl_timeout->to_initval;
+       req->rq_task    = task;
+       req->rq_xprt    = xprt;
+       req->rq_buffer  = NULL;
+       req->rq_xid     = xprt_alloc_xid(xprt);
+       xprt_init_connect_cookie(req, xprt);
+       req->rq_bytes_sent = 0;
+       req->rq_snd_buf.len = 0;
+       req->rq_snd_buf.buflen = 0;
+       req->rq_rcv_buf.len = 0;
+       req->rq_rcv_buf.buflen = 0;
+       req->rq_release_snd_buf = NULL;
+       xprt_reset_majortimeo(req);
+       dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
+                       req, ntohl(req->rq_xid));
+}
+
+static void
+xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+       xprt->ops->alloc_slot(xprt, task);
+       if (task->tk_rqstp != NULL)
+               xprt_request_init(task);
+}
+
 /**
  * xprt_reserve - allocate an RPC request slot
  * @task: RPC task requesting a slot allocation
@@ -1269,7 +1646,7 @@ void xprt_reserve(struct rpc_task *task)
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
        if (!xprt_throttle_congested(xprt, task))
-               xprt->ops->alloc_slot(xprt, task);
+               xprt_do_reserve(xprt, task);
 }
 
 /**
@@ -1291,45 +1668,29 @@ void xprt_retry_reserve(struct rpc_task *task)
 
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       xprt->ops->alloc_slot(xprt, task);
+       xprt_do_reserve(xprt, task);
 }
 
-static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
-{
-       __be32 xid;
-
-       spin_lock(&xprt->reserve_lock);
-       xid = (__force __be32)xprt->xid++;
-       spin_unlock(&xprt->reserve_lock);
-       return xid;
-}
-
-static inline void xprt_init_xid(struct rpc_xprt *xprt)
-{
-       xprt->xid = prandom_u32();
-}
-
-void xprt_request_init(struct rpc_task *task)
+static void
+xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
 {
-       struct rpc_xprt *xprt = task->tk_xprt;
-       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
 
-       INIT_LIST_HEAD(&req->rq_list);
-       req->rq_timeout = task->tk_client->cl_timeout->to_initval;
-       req->rq_task    = task;
-       req->rq_xprt    = xprt;
-       req->rq_buffer  = NULL;
-       req->rq_xid     = xprt_alloc_xid(xprt);
-       req->rq_connect_cookie = xprt->connect_cookie - 1;
-       req->rq_bytes_sent = 0;
-       req->rq_snd_buf.len = 0;
-       req->rq_snd_buf.buflen = 0;
-       req->rq_rcv_buf.len = 0;
-       req->rq_rcv_buf.buflen = 0;
-       req->rq_release_snd_buf = NULL;
-       xprt_reset_majortimeo(req);
-       dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
-                       req, ntohl(req->rq_xid));
+       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+           xprt_is_pinned_rqst(req)) {
+               spin_lock(&xprt->queue_lock);
+               xprt_request_dequeue_transmit_locked(task);
+               xprt_request_dequeue_receive_locked(task);
+               while (xprt_is_pinned_rqst(req)) {
+                       set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+                       spin_unlock(&xprt->queue_lock);
+                       xprt_wait_on_pinned_rqst(req);
+                       spin_lock(&xprt->queue_lock);
+                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+               }
+               spin_unlock(&xprt->queue_lock);
+       }
 }
 
 /**
@@ -1345,8 +1706,7 @@ void xprt_release(struct rpc_task *task)
        if (req == NULL) {
                if (task->tk_client) {
                        xprt = task->tk_xprt;
-                       if (xprt->snd_task == task)
-                               xprt_release_write(xprt, task);
+                       xprt_release_write(xprt, task);
                }
                return;
        }
@@ -1356,12 +1716,7 @@ void xprt_release(struct rpc_task *task)
                task->tk_ops->rpc_count_stats(task, task->tk_calldata);
        else if (task->tk_client)
                rpc_count_iostats(task, task->tk_client->cl_metrics);
-       spin_lock(&xprt->recv_lock);
-       if (!list_empty(&req->rq_list)) {
-               list_del_init(&req->rq_list);
-               xprt_wait_on_pinned_rqst(req);
-       }
-       spin_unlock(&xprt->recv_lock);
+       xprt_request_dequeue_all(task, req);
        spin_lock_bh(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
        if (xprt->ops->release_request)
@@ -1385,16 +1740,36 @@ void xprt_release(struct rpc_task *task)
                xprt_free_bc_request(req);
 }
 
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+void
+xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
+{
+       struct xdr_buf *xbufp = &req->rq_snd_buf;
+
+       task->tk_rqstp = req;
+       req->rq_task = task;
+       xprt_init_connect_cookie(req, req->rq_xprt);
+       /*
+        * Set up the xdr_buf length.
+        * This also indicates that the buffer is XDR encoded already.
+        */
+       xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
+               xbufp->tail[0].iov_len;
+       req->rq_bytes_sent = 0;
+}
+#endif
+
 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 {
        kref_init(&xprt->kref);
 
        spin_lock_init(&xprt->transport_lock);
        spin_lock_init(&xprt->reserve_lock);
-       spin_lock_init(&xprt->recv_lock);
+       spin_lock_init(&xprt->queue_lock);
 
        INIT_LIST_HEAD(&xprt->free);
-       INIT_LIST_HEAD(&xprt->recv);
+       xprt->recv_queue = RB_ROOT;
+       INIT_LIST_HEAD(&xprt->xmit_queue);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        spin_lock_init(&xprt->bc_pa_lock);
        INIT_LIST_HEAD(&xprt->bc_pa_list);
@@ -1407,7 +1782,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
-       rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+       rpc_init_wait_queue(&xprt->sending, "xprt_sending");
        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
 
        xprt_init_xid(xprt);
This page took 0.069213 seconds and 4 git commands to generate.