]> Git Repo - linux.git/commitdiff
Merge tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
authorLinus Torvalds <[email protected]>
Thu, 26 Sep 2019 19:20:14 +0000 (12:20 -0700)
committerLinus Torvalds <[email protected]>
Thu, 26 Sep 2019 19:20:14 +0000 (12:20 -0700)
Pull NFS client updates from Anna Schumaker:
 "Stable bugfixes:
   - Dequeue the request from the receive queue while we're re-encoding
     # v4.20+
   - Fix buffer handling of GSS MIC without slack # 5.1

  Features:
   - Increase xprtrdma maximum transport header and slot table sizes
   - Add support for nfs4_call_sync() calls using a custom
     rpc_task_struct
   - Optimize the default readahead size
   - Enable pNFS filelayout LAYOUTGET on OPEN

  Other bugfixes and cleanups:
   - Fix possible null-pointer dereferences and memory leaks
   - Various NFS over RDMA cleanups
   - Various NFS over RDMA comment updates
   - Don't receive TCP data into a reset request buffer
   - Don't try to parse incomplete RPC messages
   - Fix congestion window race with disconnect
   - Clean up pNFS return-on-close error handling
   - Fixes for NFS4ERR_OLD_STATEID handling"

* tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (53 commits)
  pNFS/filelayout: enable LAYOUTGET on OPEN
  NFS: Optimise the default readahead size
  NFSv4: Handle NFS4ERR_OLD_STATEID in LOCKU
  NFSv4: Handle NFS4ERR_OLD_STATEID in CLOSE/OPEN_DOWNGRADE
  NFSv4: Fix OPEN_DOWNGRADE error handling
  pNFS: Handle NFS4ERR_OLD_STATEID on layoutreturn by bumping the state seqid
  NFSv4: Add a helper to increment stateid seqids
  NFSv4: Handle RPC level errors in LAYOUTRETURN
  NFSv4: Handle NFS4ERR_DELAY correctly in return-on-close
  NFSv4: Clean up pNFS return-on-close error handling
  pNFS: Ensure we do clear the return-on-close layout stateid on fatal errors
  NFS: remove unused check for negative dentry
  NFSv3: use nfs_add_or_obtain() to create and reference inodes
  NFS: Refactor nfs_instantiate() for dentry referencing callers
  SUNRPC: Fix congestion window race with disconnect
  SUNRPC: Don't try to parse incomplete RPC messages
  SUNRPC: Rename xdr_buf_read_netobj to xdr_buf_read_mic
  SUNRPC: Fix buffer handling of GSS MIC without slack
  SUNRPC: RPC level errors should always set task->tk_rpc_status
  SUNRPC: Don't receive TCP data into a request buffer that has been reset
  ...

1  2 
fs/nfs/dir.c
fs/nfs/internal.h
fs/nfs/super.c
include/linux/sunrpc/sched.h
net/sunrpc/clnt.c
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/verbs.c

diff --combined fs/nfs/dir.c
index 0adfd88401108029c2fb065db3fd8307580aa427,dd8b218785bebb686673b37c513ea7843a1faf6a..e180033e35cf1d0fc89fc69c442c1d4a8cab84cd
@@@ -1487,7 -1487,7 +1487,7 @@@ static int nfs_finish_open(struct nfs_o
        if (S_ISREG(file->f_path.dentry->d_inode->i_mode))
                nfs_file_set_open_context(file, ctx);
        else
 -              err = -ESTALE;
 +              err = -EOPENSTALE;
  out:
        return err;
  }
@@@ -1669,10 -1669,8 +1669,8 @@@ static int nfs4_lookup_revalidate(struc
  
  #endif /* CONFIG_NFSV4 */
  
- /*
-  * Code common to create, mkdir, and mknod.
-  */
- int nfs_instantiate(struct dentry *dentry, struct nfs_fh *fhandle,
+ struct dentry *
+ nfs_add_or_obtain(struct dentry *dentry, struct nfs_fh *fhandle,
                                struct nfs_fattr *fattr,
                                struct nfs4_label *label)
  {
        struct inode *dir = d_inode(parent);
        struct inode *inode;
        struct dentry *d;
-       int error = -EACCES;
+       int error;
  
        d_drop(dentry);
  
-       /* We may have been initialized further down */
-       if (d_really_is_positive(dentry))
-               goto out;
        if (fhandle->size == 0) {
                error = NFS_PROTO(dir)->lookup(dir, &dentry->d_name, fhandle, fattr, NULL);
                if (error)
        }
        inode = nfs_fhget(dentry->d_sb, fhandle, fattr, label);
        d = d_splice_alias(inode, dentry);
-       if (IS_ERR(d)) {
-               error = PTR_ERR(d);
-               goto out_error;
-       }
-       dput(d);
  out:
        dput(parent);
-       return 0;
+       return d;
  out_error:
        nfs_mark_for_revalidate(dir);
-       dput(parent);
-       return error;
+       d = ERR_PTR(error);
+       goto out;
+ }
+ EXPORT_SYMBOL_GPL(nfs_add_or_obtain);
+ /*
+  * Code common to create, mkdir, and mknod.
+  */
+ int nfs_instantiate(struct dentry *dentry, struct nfs_fh *fhandle,
+                               struct nfs_fattr *fattr,
+                               struct nfs4_label *label)
+ {
+       struct dentry *d;
+       d = nfs_add_or_obtain(dentry, fhandle, fattr, label);
+       if (IS_ERR(d))
+               return PTR_ERR(d);
+       /* Callers don't care */
+       dput(d);
+       return 0;
  }
  EXPORT_SYMBOL_GPL(nfs_instantiate);
  
diff --combined fs/nfs/internal.h
index e64f810223be6b961c865e92b45afa40800a45c2,4b946e6a052f3ec3fe9e0af78f3a1877e17a6050..447a3c17fa8e6c6b87c56f35253b4bedf4ebc702
@@@ -16,14 -16,6 +16,6 @@@ extern const struct export_operations n
  
  struct nfs_string;
  
- /* Maximum number of readahead requests
-  * FIXME: this should really be a sysctl so that users may tune it to suit
-  *        their needs. People that do NFS over a slow network, might for
-  *        instance want to reduce it to something closer to 1 for improved
-  *        interactive response.
-  */
- #define NFS_MAX_READAHEAD     (RPC_DEF_SLOT_TABLE - 1)
  static inline void nfs_attr_check_mountpoint(struct super_block *parent, struct nfs_fattr *fattr)
  {
        if (!nfs_fsid_equal(&NFS_SB(parent)->fsid, &fattr->fsid))
@@@ -775,13 -767,3 +767,13 @@@ static inline bool nfs_error_is_fatal(i
        }
  }
  
 +static inline bool nfs_error_is_fatal_on_server(int err)
 +{
 +      switch (err) {
 +      case 0:
 +      case -ERESTARTSYS:
 +      case -EINTR:
 +              return false;
 +      }
 +      return nfs_error_is_fatal(err);
 +}
diff --combined fs/nfs/super.c
index 19a76cfa8b1fcf9c2cd976c43ba8823110115562,c96194e286924e3956e8b10ffb8ea3fa205a3f2a..a84df7d63403249e9d8dec97272dd7cbfd35a209
@@@ -2382,15 -2382,6 +2382,15 @@@ void nfs_fill_super(struct super_block 
                sb->s_flags |= SB_POSIXACL;
                sb->s_time_gran = 1;
                sb->s_export_op = &nfs_export_ops;
 +      } else
 +              sb->s_time_gran = 1000;
 +
 +      if (server->nfs_client->rpc_ops->version != 4) {
 +              sb->s_time_min = 0;
 +              sb->s_time_max = U32_MAX;
 +      } else {
 +              sb->s_time_min = S64_MIN;
 +              sb->s_time_max = S64_MAX;
        }
  
        nfs_initialise_sb(sb);
@@@ -2411,6 -2402,7 +2411,6 @@@ static void nfs_clone_super(struct supe
        sb->s_maxbytes = old_sb->s_maxbytes;
        sb->s_xattr = old_sb->s_xattr;
        sb->s_op = old_sb->s_op;
 -      sb->s_time_gran = 1;
        sb->s_export_op = old_sb->s_export_op;
  
        if (server->nfs_client->rpc_ops->version != 2) {
                 * so ourselves when necessary.
                 */
                sb->s_flags |= SB_POSIXACL;
 +              sb->s_time_gran = 1;
 +      } else
 +              sb->s_time_gran = 1000;
 +
 +      if (server->nfs_client->rpc_ops->version != 4) {
 +              sb->s_time_min = 0;
 +              sb->s_time_max = U32_MAX;
 +      } else {
 +              sb->s_time_min = S64_MIN;
 +              sb->s_time_max = S64_MAX;
        }
  
        nfs_initialise_sb(sb);
@@@ -2645,6 -2627,13 +2645,13 @@@ int nfs_clone_sb_security(struct super_
  }
  EXPORT_SYMBOL_GPL(nfs_clone_sb_security);
  
+ static void nfs_set_readahead(struct backing_dev_info *bdi,
+                             unsigned long iomax_pages)
+ {
+       bdi->ra_pages = VM_READAHEAD_PAGES;
+       bdi->io_pages = iomax_pages;
+ }
  struct dentry *nfs_fs_mount_common(struct nfs_server *server,
                                   int flags, const char *dev_name,
                                   struct nfs_mount_info *mount_info,
                        mntroot = ERR_PTR(error);
                        goto error_splat_super;
                }
-               s->s_bdi->ra_pages = server->rpages * NFS_MAX_READAHEAD;
+               nfs_set_readahead(s->s_bdi, server->rpages);
                server->super = s;
        }
  
index 27536b961552c21217b9ccad96a0ecec71243342,d1283bddd218b1956bf0e886af8364c6930f761f..a6ef35184ef1338d8f20cbeda95c648c1496b42b
@@@ -98,6 -98,7 +98,6 @@@ typedef void                  (*rpc_action)(struct rpc
  
  struct rpc_call_ops {
        void (*rpc_call_prepare)(struct rpc_task *, void *);
 -      void (*rpc_call_prepare_transmit)(struct rpc_task *, void *);
        void (*rpc_call_done)(struct rpc_task *, void *);
        void (*rpc_count_stats)(struct rpc_task *, void *);
        void (*rpc_release)(void *);
@@@ -242,9 -243,6 +242,6 @@@ void               rpc_sleep_on_priority_timeout(str
  void          rpc_sleep_on_priority(struct rpc_wait_queue *,
                                        struct rpc_task *,
                                        int priority);
- void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
-               struct rpc_wait_queue *queue,
-               struct rpc_task *task);
  void          rpc_wake_up_queued_task(struct rpc_wait_queue *,
                                        struct rpc_task *);
  void          rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *,
diff --combined net/sunrpc/clnt.c
index a07b516e503a0a8252d0f3876f38df65b812da0d,8b622ceb1158050de933e1f1e2e326414763de9f..f7f78566be463ef229ffccc04b140525cf948043
@@@ -1837,7 -1837,7 +1837,7 @@@ call_allocate(struct rpc_task *task
                return;
        }
  
-       rpc_exit(task, -ERESTARTSYS);
+       rpc_call_rpcerror(task, -ERESTARTSYS);
  }
  
  static int
@@@ -1862,6 -1862,7 +1862,7 @@@ rpc_xdr_encode(struct rpc_task *task
                     req->rq_rbuffer,
                     req->rq_rcvsize);
  
+       req->rq_reply_bytes_recvd = 0;
        req->rq_snd_buf.head[0].iov_len = 0;
        xdr_init_encode(&xdr, &req->rq_snd_buf,
                        req->rq_snd_buf.head[0].iov_base, req);
@@@ -1881,6 -1882,8 +1882,8 @@@ call_encode(struct rpc_task *task
        if (!rpc_task_need_encode(task))
                goto out;
        dprint_status(task);
+       /* Dequeue task from the receive queue while we're encoding */
+       xprt_request_dequeue_xprt(task);
        /* Encode here so that rpcsec_gss can use correct sequence number. */
        rpc_xdr_encode(task);
        /* Did the encode result in an error condition? */
@@@ -1970,7 -1973,6 +1973,7 @@@ call_bind(struct rpc_task *task
  static void
  call_bind_status(struct rpc_task *task)
  {
 +      struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
        int status = -EIO;
  
        if (rpc_task_transmitted(task)) {
                return;
        }
  
 -      if (task->tk_status >= 0) {
 -              dprint_status(task);
 +      dprint_status(task);
 +      trace_rpc_bind_status(task);
 +      if (task->tk_status >= 0)
 +              goto out_next;
 +      if (xprt_bound(xprt)) {
                task->tk_status = 0;
 -              task->tk_action = call_connect;
 -              return;
 +              goto out_next;
        }
  
 -      trace_rpc_bind_status(task);
        switch (task->tk_status) {
        case -ENOMEM:
                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
                task->tk_rebind_retry--;
                rpc_delay(task, 3*HZ);
                goto retry_timeout;
 +      case -ENOBUFS:
 +              rpc_delay(task, HZ >> 2);
 +              goto retry_timeout;
        case -EAGAIN:
                goto retry_timeout;
        case -ETIMEDOUT:
        case -ENETDOWN:
        case -EHOSTUNREACH:
        case -ENETUNREACH:
 -      case -ENOBUFS:
        case -EPIPE:
                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
                                task->tk_pid, task->tk_status);
  
        rpc_call_rpcerror(task, status);
        return;
 -
 +out_next:
 +      task->tk_action = call_connect;
 +      return;
  retry_timeout:
        task->tk_status = 0;
        task->tk_action = call_bind;
@@@ -2096,7 -2093,6 +2099,7 @@@ call_connect(struct rpc_task *task
  static void
  call_connect_status(struct rpc_task *task)
  {
 +      struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
        struct rpc_clnt *clnt = task->tk_client;
        int status = task->tk_status;
  
        }
  
        dprint_status(task);
 -
        trace_rpc_connect_status(task);
 +
 +      if (task->tk_status == 0) {
 +              clnt->cl_stats->netreconn++;
 +              goto out_next;
 +      }
 +      if (xprt_connected(xprt)) {
 +              task->tk_status = 0;
 +              goto out_next;
 +      }
 +
        task->tk_status = 0;
        switch (status) {
        case -ECONNREFUSED:
        case -ENETDOWN:
        case -ENETUNREACH:
        case -EHOSTUNREACH:
 -      case -EADDRINUSE:
 -      case -ENOBUFS:
        case -EPIPE:
                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
                                            task->tk_rqstp->rq_connect_cookie);
                /* retry with existing socket, after a delay */
                rpc_delay(task, 3*HZ);
                /* fall through */
 +      case -EADDRINUSE:
        case -ENOTCONN:
        case -EAGAIN:
        case -ETIMEDOUT:
                goto out_retry;
 -      case 0:
 -              clnt->cl_stats->netreconn++;
 -              task->tk_action = call_transmit;
 -              return;
 +      case -ENOBUFS:
 +              rpc_delay(task, HZ >> 2);
 +              goto out_retry;
        }
        rpc_call_rpcerror(task, status);
        return;
 +out_next:
 +      task->tk_action = call_transmit;
 +      return;
  out_retry:
        /* Check for timeouts before looping back to call_bind */
        task->tk_action = call_bind;
@@@ -2382,7 -2368,7 +2385,7 @@@ call_status(struct rpc_task *task
        case -ECONNABORTED:
        case -ENOTCONN:
                rpc_force_rebind(clnt);
 -              /* fall through */
 +              break;
        case -EADDRINUSE:
                rpc_delay(task, 3*HZ);
                /* fall through */
@@@ -2479,6 -2465,7 +2482,7 @@@ call_decode(struct rpc_task *task
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
        struct xdr_stream xdr;
+       int err;
  
        dprint_status(task);
  
         * before it changed req->rq_reply_bytes_recvd.
         */
        smp_rmb();
+       /*
+        * Did we ever call xprt_complete_rqst()? If not, we should assume
+        * the message is incomplete.
+        */
+       err = -EAGAIN;
+       if (!req->rq_reply_bytes_recvd)
+               goto out;
        req->rq_rcv_buf.len = req->rq_private_buf.len;
  
        /* Check that the softirq receive buffer is valid */
  
        xdr_init_decode(&xdr, &req->rq_rcv_buf,
                        req->rq_rcv_buf.head[0].iov_base, req);
-       switch (rpc_decode_header(task, &xdr)) {
+       err = rpc_decode_header(task, &xdr);
+ out:
+       switch (err) {
        case 0:
                task->tk_action = rpc_exit_task;
                task->tk_status = rpcauth_unwrap_resp(task, &xdr);
                return;
        case -EAGAIN:
                task->tk_status = 0;
-               xdr_free_bvec(&req->rq_rcv_buf);
-               req->rq_reply_bytes_recvd = 0;
-               req->rq_rcv_buf.len = 0;
                if (task->tk_client->cl_discrtry)
                        xprt_conditional_disconnect(req->rq_xprt,
                                                    req->rq_connect_cookie);
@@@ -2561,7 -2556,7 +2573,7 @@@ rpc_encode_header(struct rpc_task *task
        return 0;
  out_fail:
        trace_rpc_bad_callhdr(task);
-       rpc_exit(task, error);
+       rpc_call_rpcerror(task, error);
        return error;
  }
  
@@@ -2628,7 -2623,7 +2640,7 @@@ out_garbage
                return -EAGAIN;
        }
  out_err:
-       rpc_exit(task, error);
+       rpc_call_rpcerror(task, error);
        return error;
  
  out_unparsable:
diff --combined net/sunrpc/xprt.c
index 2e71f5455c6cc95d9182b6a3f36b458f5d20a951,83ec4edd2f91a410a411037216ece18a405a4aca..8a45b3ccc31343aa089e9bed8837247256ecd61d
@@@ -456,6 -456,12 +456,12 @@@ void xprt_release_rqst_cong(struct rpc_
  }
  EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
  
+ static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
+ {
+       if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
+               __xprt_lock_write_next_cong(xprt);
+ }
  /*
   * Clear the congestion window wait flag and wake up the next
   * entry on xprt->sending
@@@ -671,6 -677,7 +677,7 @@@ void xprt_disconnect_done(struct rpc_xp
        spin_lock(&xprt->transport_lock);
        xprt_clear_connected(xprt);
        xprt_clear_write_space_locked(xprt);
+       xprt_clear_congestion_window_wait_locked(xprt);
        xprt_wake_pending_tasks(xprt, -ENOTCONN);
        spin_unlock(&xprt->transport_lock);
  }
@@@ -1323,6 -1330,36 +1330,36 @@@ xprt_request_dequeue_transmit(struct rp
        spin_unlock(&xprt->queue_lock);
  }
  
+ /**
+  * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
+  * @task: pointer to rpc_task
+  *
+  * Remove a task from the transmit and receive queues, and ensure that
+  * it is not pinned by the receive work item.
+  */
+ void
+ xprt_request_dequeue_xprt(struct rpc_task *task)
+ {
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+           xprt_is_pinned_rqst(req)) {
+               spin_lock(&xprt->queue_lock);
+               xprt_request_dequeue_transmit_locked(task);
+               xprt_request_dequeue_receive_locked(task);
+               while (xprt_is_pinned_rqst(req)) {
+                       set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+                       spin_unlock(&xprt->queue_lock);
+                       xprt_wait_on_pinned_rqst(req);
+                       spin_lock(&xprt->queue_lock);
+                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+               }
+               spin_unlock(&xprt->queue_lock);
+       }
+ }
  /**
   * xprt_request_prepare - prepare an encoded request for transport
   * @req: pointer to rpc_rqst
@@@ -1408,6 -1445,13 +1445,6 @@@ xprt_request_transmit(struct rpc_rqst *
                        status = -EBADMSG;
                        goto out_dequeue;
                }
 -              if (task->tk_ops->rpc_call_prepare_transmit) {
 -                      task->tk_ops->rpc_call_prepare_transmit(task,
 -                                      task->tk_calldata);
 -                      status = task->tk_status;
 -                      if (status < 0)
 -                              goto out_dequeue;
 -              }
                if (RPC_SIGNALLED(task)) {
                        status = -ERESTARTSYS;
                        goto out_dequeue;
@@@ -1747,28 -1791,6 +1784,6 @@@ void xprt_retry_reserve(struct rpc_tas
        xprt_do_reserve(xprt, task);
  }
  
- static void
- xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
- {
-       struct rpc_xprt *xprt = req->rq_xprt;
-       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
-           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
-           xprt_is_pinned_rqst(req)) {
-               spin_lock(&xprt->queue_lock);
-               xprt_request_dequeue_transmit_locked(task);
-               xprt_request_dequeue_receive_locked(task);
-               while (xprt_is_pinned_rqst(req)) {
-                       set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-                       spin_unlock(&xprt->queue_lock);
-                       xprt_wait_on_pinned_rqst(req);
-                       spin_lock(&xprt->queue_lock);
-                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-               }
-               spin_unlock(&xprt->queue_lock);
-       }
- }
  /**
   * xprt_release - release an RPC request slot
   * @task: task which is finished with the slot
@@@ -1788,7 -1810,7 +1803,7 @@@ void xprt_release(struct rpc_task *task
        }
  
        xprt = req->rq_xprt;
-       xprt_request_dequeue_all(task, req);
+       xprt_request_dequeue_xprt(task);
        spin_lock(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
        if (xprt->ops->release_request)
index b10aa16557f00d104bae3b757e83adccbfe68228,796945751e666d98d6fa2a3797f37a0e0fc8f9cf..3a907537e2cf31cf471d7cb92dfbe85ebcc9d706
@@@ -53,6 -53,7 +53,7 @@@
  #include <linux/slab.h>
  #include <linux/sunrpc/addr.h>
  #include <linux/sunrpc/svc_rdma.h>
+ #include <linux/log2.h>
  
  #include <asm-generic/barrier.h>
  #include <asm/bitops.h>
   * internal functions
   */
  static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
  static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+ static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
  static struct rpcrdma_regbuf *
  rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
@@@ -405,9 -408,8 +408,8 @@@ rpcrdma_ia_remove(struct rpcrdma_ia *ia
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
-       struct rpcrdma_rep *rep;
  
-       cancel_delayed_work_sync(&buf->rb_refresh_worker);
+       cancel_work_sync(&buf->rb_refresh_worker);
  
        /* This is similar to rpcrdma_ep_destroy, but:
         * - Don't cancel the connect worker.
        /* The ULP is responsible for ensuring all DMA
         * mappings and MRs are gone.
         */
-       list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
-               rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+       rpcrdma_reps_destroy(buf);
        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
                rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
                rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
@@@ -521,17 -522,18 +522,17 @@@ int rpcrdma_ep_create(struct rpcrdma_xp
        init_waitqueue_head(&ep->rep_connect_wait);
        ep->rep_receive_count = 0;
  
 -      sendcq = ib_alloc_cq(ia->ri_id->device, NULL,
 -                           ep->rep_attr.cap.max_send_wr + 1,
 -                           ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0,
 -                           IB_POLL_WORKQUEUE);
 +      sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 +                               ep->rep_attr.cap.max_send_wr + 1,
 +                               IB_POLL_WORKQUEUE);
        if (IS_ERR(sendcq)) {
                rc = PTR_ERR(sendcq);
                goto out1;
        }
  
 -      recvcq = ib_alloc_cq(ia->ri_id->device, NULL,
 -                           ep->rep_attr.cap.max_recv_wr + 1,
 -                           0, IB_POLL_WORKQUEUE);
 +      recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 +                               ep->rep_attr.cap.max_recv_wr + 1,
 +                               IB_POLL_WORKQUEUE);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                goto out2;
@@@ -604,10 -606,10 +605,10 @@@ void rpcrdma_ep_destroy(struct rpcrdma_
   * Unlike a normal reconnection, a fresh PD and a new set
   * of MRs and buffers is needed.
   */
- static int
- rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
-                        struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+                                   struct ib_qp_init_attr *qp_init_attr)
  {
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        int rc, err;
  
        trace_xprtrdma_reinsert(r_xprt);
        }
  
        rc = -ENETUNREACH;
-       err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+       err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
        if (err) {
                pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
                goto out3;
@@@ -641,16 -643,16 +642,16 @@@ out1
        return rc;
  }
  
- static int
- rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
-                    struct rpcrdma_ia *ia)
+ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+                               struct ib_qp_init_attr *qp_init_attr)
  {
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rdma_cm_id *id, *old;
        int err, rc;
  
        trace_xprtrdma_reconnect(r_xprt);
  
-       rpcrdma_ep_disconnect(ep, ia);
+       rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
  
        rc = -EHOSTUNREACH;
        id = rpcrdma_create_id(r_xprt, ia);
                goto out_destroy;
        }
  
-       err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+       err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
        if (err)
                goto out_destroy;
  
@@@ -697,25 -699,27 +698,27 @@@ rpcrdma_ep_connect(struct rpcrdma_ep *e
        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
                                                   rx_ia);
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+       struct ib_qp_init_attr qp_init_attr;
        int rc;
  
  retry:
+       memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
        switch (ep->rep_connected) {
        case 0:
                dprintk("RPC:       %s: connecting...\n", __func__);
-               rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+               rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
                if (rc) {
                        rc = -ENETUNREACH;
                        goto out_noupdate;
                }
                break;
        case -ENODEV:
-               rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+               rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
                if (rc)
                        goto out_noupdate;
                break;
        default:
-               rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+               rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
                if (rc)
                        goto out;
        }
        if (rc)
                goto out;
  
+       if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+               xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
        wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
        if (ep->rep_connected <= 0) {
                if (ep->rep_connected == -EAGAIN)
@@@ -942,14 -948,12 +947,12 @@@ rpcrdma_mrs_create(struct rpcrdma_xprt 
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        unsigned int count;
-       LIST_HEAD(free);
-       LIST_HEAD(all);
  
        for (count = 0; count < ia->ri_max_segs; count++) {
                struct rpcrdma_mr *mr;
                int rc;
  
-               mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+               mr = kzalloc(sizeof(*mr), GFP_NOFS);
                if (!mr)
                        break;
  
  
                mr->mr_xprt = r_xprt;
  
-               list_add(&mr->mr_list, &free);
-               list_add(&mr->mr_all, &all);
+               spin_lock(&buf->rb_lock);
+               list_add(&mr->mr_list, &buf->rb_mrs);
+               list_add(&mr->mr_all, &buf->rb_all_mrs);
+               spin_unlock(&buf->rb_lock);
        }
  
-       spin_lock(&buf->rb_mrlock);
-       list_splice(&free, &buf->rb_mrs);
-       list_splice(&all, &buf->rb_all);
        r_xprt->rx_stats.mrs_allocated += count;
-       spin_unlock(&buf->rb_mrlock);
        trace_xprtrdma_createmrs(r_xprt, count);
  }
  
@@@ -977,7 -979,7 +978,7 @@@ static voi
  rpcrdma_mr_refresh_worker(struct work_struct *work)
  {
        struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
-                                                 rb_refresh_worker.work);
+                                                 rb_refresh_worker);
        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
                                                   rx_buf);
  
@@@ -999,12 -1001,18 +1000,18 @@@ struct rpcrdma_req *rpcrdma_req_create(
        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
        struct rpcrdma_regbuf *rb;
        struct rpcrdma_req *req;
+       size_t maxhdrsize;
  
        req = kzalloc(sizeof(*req), flags);
        if (req == NULL)
                goto out1;
  
-       rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
+       /* Compute maximum header buffer size in bytes */
+       maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+                    r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+       maxhdrsize *= sizeof(__be32);
+       rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+                                 DMA_TO_DEVICE, flags);
        if (!rb)
                goto out2;
        req->rl_rdmabuf = rb;
        if (!req->rl_recvbuf)
                goto out4;
  
+       INIT_LIST_HEAD(&req->rl_free_mrs);
        INIT_LIST_HEAD(&req->rl_registered);
        spin_lock(&buffer->rb_lock);
        list_add(&req->rl_all, &buffer->rb_allreqs);
        return NULL;
  }
  
+ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+ {
+       rpcrdma_regbuf_free(rep->rr_rdmabuf);
+       kfree(rep);
+ }
+ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+ {
+       struct llist_node *node;
+       /* Calls to llist_del_first are required to be serialized */
+       node = llist_del_first(&buf->rb_free_reps);
+       if (!node)
+               return NULL;
+       return llist_entry(node, struct rpcrdma_rep, rr_node);
+ }
+ static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+                           struct rpcrdma_rep *rep)
+ {
+       if (!rep->rr_temp)
+               llist_add(&rep->rr_node, &buf->rb_free_reps);
+       else
+               rpcrdma_rep_destroy(rep);
+ }
+ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+ {
+       struct rpcrdma_rep *rep;
+       while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+               rpcrdma_rep_destroy(rep);
+ }
  /**
   * rpcrdma_buffer_create - Create initial set of req/rep objects
   * @r_xprt: transport instance to (re)initialize
@@@ -1078,12 -1121,10 +1120,10 @@@ int rpcrdma_buffer_create(struct rpcrdm
  
        buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
        buf->rb_bc_srv_max_requests = 0;
-       spin_lock_init(&buf->rb_mrlock);
        spin_lock_init(&buf->rb_lock);
        INIT_LIST_HEAD(&buf->rb_mrs);
-       INIT_LIST_HEAD(&buf->rb_all);
-       INIT_DELAYED_WORK(&buf->rb_refresh_worker,
-                         rpcrdma_mr_refresh_worker);
+       INIT_LIST_HEAD(&buf->rb_all_mrs);
+       INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
  
        rpcrdma_mrs_create(r_xprt);
  
        }
  
        buf->rb_credits = 1;
-       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       init_llist_head(&buf->rb_free_reps);
  
        rc = rpcrdma_sendctxs_create(r_xprt);
        if (rc)
@@@ -1114,12 -1155,6 +1154,6 @@@ out
        return rc;
  }
  
- static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
- {
-       rpcrdma_regbuf_free(rep->rr_rdmabuf);
-       kfree(rep);
- }
  /**
   * rpcrdma_req_destroy - Destroy an rpcrdma_req object
   * @req: unused object to be destroyed
   * This function assumes that the caller prevents concurrent device
   * unload and transport tear-down.
   */
- void
- rpcrdma_req_destroy(struct rpcrdma_req *req)
+ void rpcrdma_req_destroy(struct rpcrdma_req *req)
  {
        list_del(&req->rl_all);
  
+       while (!list_empty(&req->rl_free_mrs))
+               rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
        rpcrdma_regbuf_free(req->rl_recvbuf);
        rpcrdma_regbuf_free(req->rl_sendbuf);
        rpcrdma_regbuf_free(req->rl_rdmabuf);
@@@ -1147,25 -1184,19 +1183,19 @@@ rpcrdma_mrs_destroy(struct rpcrdma_buff
        unsigned int count;
  
        count = 0;
-       spin_lock(&buf->rb_mrlock);
-       while (!list_empty(&buf->rb_all)) {
-               mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+       spin_lock(&buf->rb_lock);
+       while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+                                             struct rpcrdma_mr,
+                                             mr_all)) != NULL) {
                list_del(&mr->mr_all);
-               spin_unlock(&buf->rb_mrlock);
-               /* Ensure MW is not on any rl_registered list */
-               if (!list_empty(&mr->mr_list))
-                       list_del(&mr->mr_list);
+               spin_unlock(&buf->rb_lock);
  
                frwr_release_mr(mr);
                count++;
-               spin_lock(&buf->rb_mrlock);
+               spin_lock(&buf->rb_lock);
        }
-       spin_unlock(&buf->rb_mrlock);
+       spin_unlock(&buf->rb_lock);
        r_xprt->rx_stats.mrs_allocated = 0;
-       dprintk("RPC:       %s: released %u MRs\n", __func__, count);
  }
  
  /**
  void
  rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
  {
-       cancel_delayed_work_sync(&buf->rb_refresh_worker);
+       cancel_work_sync(&buf->rb_refresh_worker);
  
        rpcrdma_sendctxs_destroy(buf);
-       while (!list_empty(&buf->rb_recv_bufs)) {
-               struct rpcrdma_rep *rep;
-               rep = list_first_entry(&buf->rb_recv_bufs,
-                                      struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_reps_destroy(buf);
  
        while (!list_empty(&buf->rb_send_bufs)) {
                struct rpcrdma_req *req;
@@@ -1215,54 -1238,20 +1237,20 @@@ struct rpcrdma_mr 
  rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
  {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-       struct rpcrdma_mr *mr = NULL;
-       spin_lock(&buf->rb_mrlock);
-       if (!list_empty(&buf->rb_mrs))
-               mr = rpcrdma_mr_pop(&buf->rb_mrs);
-       spin_unlock(&buf->rb_mrlock);
+       struct rpcrdma_mr *mr;
  
-       if (!mr)
-               goto out_nomrs;
+       spin_lock(&buf->rb_lock);
+       mr = rpcrdma_mr_pop(&buf->rb_mrs);
+       spin_unlock(&buf->rb_lock);
        return mr;
- out_nomrs:
-       trace_xprtrdma_nomrs(r_xprt);
-       if (r_xprt->rx_ep.rep_connected != -ENODEV)
-               schedule_delayed_work(&buf->rb_refresh_worker, 0);
-       /* Allow the reply handler and refresh worker to run */
-       cond_resched();
-       return NULL;
- }
- static void
- __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
- {
-       spin_lock(&buf->rb_mrlock);
-       rpcrdma_mr_push(mr, &buf->rb_mrs);
-       spin_unlock(&buf->rb_mrlock);
- }
- /**
-  * rpcrdma_mr_put - Release an rpcrdma_mr object
-  * @mr: object to release
-  *
-  */
- void
- rpcrdma_mr_put(struct rpcrdma_mr *mr)
- {
-       __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
  }
  
  /**
-  * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
-  * @mr: object to release
+  * rpcrdma_mr_put - DMA unmap an MR and release it
+  * @mr: MR to release
   *
   */
- void
- rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
  {
        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
  
                                mr->mr_sg, mr->mr_nents, mr->mr_dir);
                mr->mr_dir = DMA_NONE;
        }
-       __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+       rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+ }
+ static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+ {
+       struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       mr->mr_req = NULL;
+       spin_lock(&buf->rb_lock);
+       rpcrdma_mr_push(mr, &buf->rb_mrs);
+       spin_unlock(&buf->rb_lock);
  }
  
  /**
@@@ -1303,39 -1304,24 +1303,24 @@@ rpcrdma_buffer_get(struct rpcrdma_buffe
   */
  void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
  {
-       struct rpcrdma_rep *rep = req->rl_reply;
+       if (req->rl_reply)
+               rpcrdma_rep_put(buffers, req->rl_reply);
        req->rl_reply = NULL;
  
        spin_lock(&buffers->rb_lock);
        list_add(&req->rl_list, &buffers->rb_send_bufs);
-       if (rep) {
-               if (!rep->rr_temp) {
-                       list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-                       rep = NULL;
-               }
-       }
        spin_unlock(&buffers->rb_lock);
-       if (rep)
-               rpcrdma_rep_destroy(rep);
  }
  
- /*
-  * Put reply buffers back into pool when not attached to
-  * request. This happens in error conditions.
+ /**
+  * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+  * @rep: rep to release
+  *
+  * Used after error conditions.
   */
- void
- rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
  {
-       struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-       if (!rep->rr_temp) {
-               spin_lock(&buffers->rb_lock);
-               list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-               spin_unlock(&buffers->rb_lock);
-       } else {
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
  }
  
  /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@@ -1483,7 -1469,7 +1468,7 @@@ rpcrdma_post_recvs(struct rpcrdma_xprt 
        count = 0;
  
        needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
-       if (ep->rep_receive_count > needed)
+       if (likely(ep->rep_receive_count > needed))
                goto out;
        needed -= ep->rep_receive_count;
        if (!temp)
  
        /* fast path: all needed reps can be found on the free list */
        wr = NULL;
-       spin_lock(&buf->rb_lock);
        while (needed) {
-               rep = list_first_entry_or_null(&buf->rb_recv_bufs,
-                                              struct rpcrdma_rep, rr_list);
+               rep = rpcrdma_rep_get_locked(buf);
                if (!rep)
-                       break;
-               list_del(&rep->rr_list);
-               rep->rr_recv_wr.next = wr;
-               wr = &rep->rr_recv_wr;
-               --needed;
-       }
-       spin_unlock(&buf->rb_lock);
-       while (needed) {
-               rep = rpcrdma_rep_create(r_xprt, temp);
+                       rep = rpcrdma_rep_create(r_xprt, temp);
                if (!rep)
                        break;
  
                if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
                        goto release_wrs;
  
-               trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+               trace_xprtrdma_post_recv(rep);
                ++count;
        }
  
This page took 0.163585 seconds and 4 git commands to generate.