return;
}
- rpc_exit(task, -ERESTARTSYS);
+ rpc_call_rpcerror(task, -ERESTARTSYS);
}
static int
req->rq_rbuffer,
req->rq_rcvsize);
+ req->rq_reply_bytes_recvd = 0;
req->rq_snd_buf.head[0].iov_len = 0;
xdr_init_encode(&xdr, &req->rq_snd_buf,
req->rq_snd_buf.head[0].iov_base, req);
if (!rpc_task_need_encode(task))
goto out;
dprint_status(task);
+ /* Dequeue task from the receive queue while we're encoding */
+ xprt_request_dequeue_xprt(task);
/* Encode here so that rpcsec_gss can use correct sequence number. */
rpc_xdr_encode(task);
/* Did the encode result in an error condition? */
static void
call_bind_status(struct rpc_task *task)
{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
int status = -EIO;
if (rpc_task_transmitted(task)) {
return;
}
- if (task->tk_status >= 0) {
- dprint_status(task);
+ dprint_status(task);
+ trace_rpc_bind_status(task);
+ if (task->tk_status >= 0)
+ goto out_next;
+ if (xprt_bound(xprt)) {
task->tk_status = 0;
- task->tk_action = call_connect;
- return;
+ goto out_next;
}
- trace_rpc_bind_status(task);
switch (task->tk_status) {
case -ENOMEM:
dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
task->tk_rebind_retry--;
rpc_delay(task, 3*HZ);
goto retry_timeout;
+ case -ENOBUFS:
+ rpc_delay(task, HZ >> 2);
+ goto retry_timeout;
case -EAGAIN:
goto retry_timeout;
case -ETIMEDOUT:
case -ENETDOWN:
case -EHOSTUNREACH:
case -ENETUNREACH:
- case -ENOBUFS:
case -EPIPE:
dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
task->tk_pid, task->tk_status);
rpc_call_rpcerror(task, status);
return;
-
+out_next:
+ task->tk_action = call_connect;
+ return;
retry_timeout:
task->tk_status = 0;
task->tk_action = call_bind;
static void
call_connect_status(struct rpc_task *task)
{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
struct rpc_clnt *clnt = task->tk_client;
int status = task->tk_status;
}
dprint_status(task);
-
trace_rpc_connect_status(task);
+
+ if (task->tk_status == 0) {
+ clnt->cl_stats->netreconn++;
+ goto out_next;
+ }
+ if (xprt_connected(xprt)) {
+ task->tk_status = 0;
+ goto out_next;
+ }
+
task->tk_status = 0;
switch (status) {
case -ECONNREFUSED:
case -ENETDOWN:
case -ENETUNREACH:
case -EHOSTUNREACH:
- case -EADDRINUSE:
- case -ENOBUFS:
case -EPIPE:
xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
task->tk_rqstp->rq_connect_cookie);
/* retry with existing socket, after a delay */
rpc_delay(task, 3*HZ);
/* fall through */
+ case -EADDRINUSE:
case -ENOTCONN:
case -EAGAIN:
case -ETIMEDOUT:
goto out_retry;
- case 0:
- clnt->cl_stats->netreconn++;
- task->tk_action = call_transmit;
- return;
+ case -ENOBUFS:
+ rpc_delay(task, HZ >> 2);
+ goto out_retry;
}
rpc_call_rpcerror(task, status);
return;
+out_next:
+ task->tk_action = call_transmit;
+ return;
out_retry:
/* Check for timeouts before looping back to call_bind */
task->tk_action = call_bind;
case -ECONNABORTED:
case -ENOTCONN:
rpc_force_rebind(clnt);
- /* fall through */
+ break;
case -EADDRINUSE:
rpc_delay(task, 3*HZ);
/* fall through */
struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp;
struct xdr_stream xdr;
+ int err;
dprint_status(task);
* before it changed req->rq_reply_bytes_recvd.
*/
smp_rmb();
+
+ /*
+ * Did we ever call xprt_complete_rqst()? If not, we should assume
+ * the message is incomplete.
+ */
+ err = -EAGAIN;
+ if (!req->rq_reply_bytes_recvd)
+ goto out;
+
req->rq_rcv_buf.len = req->rq_private_buf.len;
/* Check that the softirq receive buffer is valid */
xdr_init_decode(&xdr, &req->rq_rcv_buf,
req->rq_rcv_buf.head[0].iov_base, req);
- switch (rpc_decode_header(task, &xdr)) {
+ err = rpc_decode_header(task, &xdr);
+ out:
+ switch (err) {
case 0:
task->tk_action = rpc_exit_task;
task->tk_status = rpcauth_unwrap_resp(task, &xdr);
return;
case -EAGAIN:
task->tk_status = 0;
- xdr_free_bvec(&req->rq_rcv_buf);
- req->rq_reply_bytes_recvd = 0;
- req->rq_rcv_buf.len = 0;
if (task->tk_client->cl_discrtry)
xprt_conditional_disconnect(req->rq_xprt,
req->rq_connect_cookie);
return 0;
out_fail:
trace_rpc_bad_callhdr(task);
- rpc_exit(task, error);
+ rpc_call_rpcerror(task, error);
return error;
}
return -EAGAIN;
}
out_err:
- rpc_exit(task, error);
+ rpc_call_rpcerror(task, error);
return error;
out_unparsable:
}
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
+ static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
+ {
+ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
+ __xprt_lock_write_next_cong(xprt);
+ }
+
/*
* Clear the congestion window wait flag and wake up the next
* entry on xprt->sending
spin_lock(&xprt->transport_lock);
xprt_clear_connected(xprt);
xprt_clear_write_space_locked(xprt);
+ xprt_clear_congestion_window_wait_locked(xprt);
xprt_wake_pending_tasks(xprt, -ENOTCONN);
spin_unlock(&xprt->transport_lock);
}
spin_unlock(&xprt->queue_lock);
}
+ /**
+ * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmit and receive queues, and ensure that
+ * it is not pinned by the receive work item.
+ */
+ void
+ xprt_request_dequeue_xprt(struct rpc_task *task)
+ {
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+ test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+ xprt_is_pinned_rqst(req)) {
+ spin_lock(&xprt->queue_lock);
+ xprt_request_dequeue_transmit_locked(task);
+ xprt_request_dequeue_receive_locked(task);
+ while (xprt_is_pinned_rqst(req)) {
+ set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+ spin_unlock(&xprt->queue_lock);
+ xprt_wait_on_pinned_rqst(req);
+ spin_lock(&xprt->queue_lock);
+ clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+ }
+ spin_unlock(&xprt->queue_lock);
+ }
+ }
+
/**
* xprt_request_prepare - prepare an encoded request for transport
* @req: pointer to rpc_rqst
status = -EBADMSG;
goto out_dequeue;
}
- if (task->tk_ops->rpc_call_prepare_transmit) {
- task->tk_ops->rpc_call_prepare_transmit(task,
- task->tk_calldata);
- status = task->tk_status;
- if (status < 0)
- goto out_dequeue;
- }
if (RPC_SIGNALLED(task)) {
status = -ERESTARTSYS;
goto out_dequeue;
xprt_do_reserve(xprt, task);
}
- static void
- xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
- {
- struct rpc_xprt *xprt = req->rq_xprt;
-
- if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
- test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
- xprt_is_pinned_rqst(req)) {
- spin_lock(&xprt->queue_lock);
- xprt_request_dequeue_transmit_locked(task);
- xprt_request_dequeue_receive_locked(task);
- while (xprt_is_pinned_rqst(req)) {
- set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
- spin_unlock(&xprt->queue_lock);
- xprt_wait_on_pinned_rqst(req);
- spin_lock(&xprt->queue_lock);
- clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
- }
- spin_unlock(&xprt->queue_lock);
- }
- }
-
/**
* xprt_release - release an RPC request slot
* @task: task which is finished with the slot
}
xprt = req->rq_xprt;
- xprt_request_dequeue_all(task, req);
+ xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
#include <linux/slab.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
+ #include <linux/log2.h>
#include <asm-generic/barrier.h>
#include <asm/bitops.h>
* internal functions
*/
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+ static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
*/
- list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
- rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+ rpcrdma_reps_destroy(buf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
init_waitqueue_head(&ep->rep_connect_wait);
ep->rep_receive_count = 0;
- sendcq = ib_alloc_cq(ia->ri_id->device, NULL,
- ep->rep_attr.cap.max_send_wr + 1,
- ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0,
- IB_POLL_WORKQUEUE);
+ sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ ep->rep_attr.cap.max_send_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
goto out1;
}
- recvcq = ib_alloc_cq(ia->ri_id->device, NULL,
- ep->rep_attr.cap.max_recv_wr + 1,
- 0, IB_POLL_WORKQUEUE);
+ recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
goto out2;
* Unlike a normal reconnection, a fresh PD and a new set
* of MRs and buffers is needed.
*/
- static int
- rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
int rc, err;
trace_xprtrdma_reinsert(r_xprt);
}
rc = -ENETUNREACH;
- err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
if (err) {
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
return rc;
}
- static int
- rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
- struct rpcrdma_ia *ia)
+ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rdma_cm_id *id, *old;
int err, rc;
trace_xprtrdma_reconnect(r_xprt);
- rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
id = rpcrdma_create_id(r_xprt, ia);
goto out_destroy;
}
- err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+ err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
if (err)
goto out_destroy;
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct ib_qp_init_attr qp_init_attr;
int rc;
retry:
+ memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
dprintk("RPC: %s: connecting...\n", __func__);
- rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
rc = -ENETUNREACH;
goto out_noupdate;
}
break;
case -ENODEV:
- rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+ rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
if (rc)
goto out_noupdate;
break;
default:
- rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+ rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
if (rc)
goto out;
}
if (rc)
goto out;
+ if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
if (ep->rep_connected <= 0) {
if (ep->rep_connected == -EAGAIN)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
unsigned int count;
- LIST_HEAD(free);
- LIST_HEAD(all);
for (count = 0; count < ia->ri_max_segs; count++) {
struct rpcrdma_mr *mr;
int rc;
- mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+ mr = kzalloc(sizeof(*mr), GFP_NOFS);
if (!mr)
break;
mr->mr_xprt = r_xprt;
- list_add(&mr->mr_list, &free);
- list_add(&mr->mr_all, &all);
+ spin_lock(&buf->rb_lock);
+ list_add(&mr->mr_list, &buf->rb_mrs);
+ list_add(&mr->mr_all, &buf->rb_all_mrs);
+ spin_unlock(&buf->rb_lock);
}
- spin_lock(&buf->rb_mrlock);
- list_splice(&free, &buf->rb_mrs);
- list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
- spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
}
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_refresh_worker.work);
+ rb_refresh_worker);
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
+ size_t maxhdrsize;
req = kzalloc(sizeof(*req), flags);
if (req == NULL)
goto out1;
- rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
+ /* Compute maximum header buffer size in bytes */
+ maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+ r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+ maxhdrsize *= sizeof(__be32);
+ rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+ DMA_TO_DEVICE, flags);
if (!rb)
goto out2;
req->rl_rdmabuf = rb;
if (!req->rl_recvbuf)
goto out4;
+ INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
return NULL;
}
+ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+ {
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
+ kfree(rep);
+ }
+
+ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+ {
+ struct llist_node *node;
+
+ /* Calls to llist_del_first are required to be serialized */
+ node = llist_del_first(&buf->rb_free_reps);
+ if (!node)
+ return NULL;
+ return llist_entry(node, struct rpcrdma_rep, rr_node);
+ }
+
+ static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+ struct rpcrdma_rep *rep)
+ {
+ if (!rep->rr_temp)
+ llist_add(&rep->rr_node, &buf->rb_free_reps);
+ else
+ rpcrdma_rep_destroy(rep);
+ }
+
+ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+ {
+ struct rpcrdma_rep *rep;
+
+ while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+ rpcrdma_rep_destroy(rep);
+ }
+
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
- INIT_LIST_HEAD(&buf->rb_all);
- INIT_DELAYED_WORK(&buf->rb_refresh_worker,
- rpcrdma_mr_refresh_worker);
+ INIT_LIST_HEAD(&buf->rb_all_mrs);
+ INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
rpcrdma_mrs_create(r_xprt);
}
buf->rb_credits = 1;
- INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
return rc;
}
- static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
- {
- rpcrdma_regbuf_free(rep->rr_rdmabuf);
- kfree(rep);
- }
-
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
* This function assumes that the caller prevents concurrent device
* unload and transport tear-down.
*/
- void
- rpcrdma_req_destroy(struct rpcrdma_req *req)
+ void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);
+ while (!list_empty(&req->rl_free_mrs))
+ rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
unsigned int count;
count = 0;
- spin_lock(&buf->rb_mrlock);
- while (!list_empty(&buf->rb_all)) {
- mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+ spin_lock(&buf->rb_lock);
+ while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+ struct rpcrdma_mr,
+ mr_all)) != NULL) {
list_del(&mr->mr_all);
-
- spin_unlock(&buf->rb_mrlock);
-
- /* Ensure MW is not on any rl_registered list */
- if (!list_empty(&mr->mr_list))
- list_del(&mr->mr_list);
+ spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
count++;
- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
}
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
r_xprt->rx_stats.mrs_allocated = 0;
-
- dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
/**
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf);
-
- while (!list_empty(&buf->rb_recv_bufs)) {
- struct rpcrdma_rep *rep;
-
- rep = list_first_entry(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mr *mr = NULL;
-
- spin_lock(&buf->rb_mrlock);
- if (!list_empty(&buf->rb_mrs))
- mr = rpcrdma_mr_pop(&buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
+ struct rpcrdma_mr *mr;
- if (!mr)
- goto out_nomrs;
+ spin_lock(&buf->rb_lock);
+ mr = rpcrdma_mr_pop(&buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
return mr;
-
- out_nomrs:
- trace_xprtrdma_nomrs(r_xprt);
- if (r_xprt->rx_ep.rep_connected != -ENODEV)
- schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
- /* Allow the reply handler and refresh worker to run */
- cond_resched();
-
- return NULL;
- }
-
- static void
- __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
- {
- spin_lock(&buf->rb_mrlock);
- rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
- }
-
- /**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
- *
- */
- void
- rpcrdma_mr_put(struct rpcrdma_mr *mr)
- {
- __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}
/**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
*
*/
- void
- rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
- __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+
+ rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+ }
+
+ static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+ {
+ struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ mr->mr_req = NULL;
+ spin_lock(&buf->rb_lock);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
}
/**
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- struct rpcrdma_rep *rep = req->rl_reply;
-
+ if (req->rl_reply)
+ rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
- if (rep) {
- if (!rep->rr_temp) {
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- rep = NULL;
- }
- }
spin_unlock(&buffers->rb_lock);
- if (rep)
- rpcrdma_rep_destroy(rep);
}
- /*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
+ /**
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
+ *
+ * Used after error conditions.
*/
- void
- rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
- struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
- if (!rep->rr_temp) {
- spin_lock(&buffers->rb_lock);
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock(&buffers->rb_lock);
- } else {
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
}
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (ep->rep_receive_count > needed)
+ if (likely(ep->rep_receive_count > needed))
goto out;
needed -= ep->rep_receive_count;
if (!temp)
/* fast path: all needed reps can be found on the free list */
wr = NULL;
- spin_lock(&buf->rb_lock);
while (needed) {
- rep = list_first_entry_or_null(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
+ rep = rpcrdma_rep_get_locked(buf);
if (!rep)
- break;
-
- list_del(&rep->rr_list);
- rep->rr_recv_wr.next = wr;
- wr = &rep->rr_recv_wr;
- --needed;
- }
- spin_unlock(&buf->rb_lock);
-
- while (needed) {
- rep = rpcrdma_rep_create(r_xprt, temp);
+ rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep)
break;
if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
goto release_wrs;
- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+ trace_xprtrdma_post_recv(rep);
++count;
}