]> Git Repo - linux.git/blob - fs/nfs/localio.c
Linux 6.14-rc3
[linux.git] / fs / nfs / localio.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * NFS client support for local clients to bypass network stack
4  *
5  * Copyright (C) 2014 Weston Andros Adamson <[email protected]>
6  * Copyright (C) 2019 Trond Myklebust <[email protected]>
7  * Copyright (C) 2024 Mike Snitzer <[email protected]>
8  * Copyright (C) 2024 NeilBrown <[email protected]>
9  */
10
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30
31 #define NFSDBG_FACILITY         NFSDBG_VFS
32
33 struct nfs_local_kiocb {
34         struct kiocb            kiocb;
35         struct bio_vec          *bvec;
36         struct nfs_pgio_header  *hdr;
37         struct work_struct      work;
38         void (*aio_complete_work)(struct work_struct *);
39         struct nfsd_file        *localio;
40 };
41
42 struct nfs_local_fsync_ctx {
43         struct nfsd_file        *localio;
44         struct nfs_commit_data  *data;
45         struct work_struct      work;
46         struct completion       *done;
47 };
48
49 static bool localio_enabled __read_mostly = true;
50 module_param(localio_enabled, bool, 0644);
51
52 static bool localio_O_DIRECT_semantics __read_mostly = false;
53 module_param(localio_O_DIRECT_semantics, bool, 0644);
54 MODULE_PARM_DESC(localio_O_DIRECT_semantics,
55                  "LOCALIO will use O_DIRECT semantics to filesystem.");
56
57 static inline bool nfs_client_is_local(const struct nfs_client *clp)
58 {
59         return !!rcu_access_pointer(clp->cl_uuid.net);
60 }
61
62 bool nfs_server_is_local(const struct nfs_client *clp)
63 {
64         return nfs_client_is_local(clp) && localio_enabled;
65 }
66 EXPORT_SYMBOL_GPL(nfs_server_is_local);
67
68 /*
69  * UUID_IS_LOCAL XDR functions
70  */
71
72 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
73                                      struct xdr_stream *xdr,
74                                      const void *data)
75 {
76         const u8 *uuid = data;
77
78         encode_opaque_fixed(xdr, uuid, UUID_SIZE);
79 }
80
81 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
82                                    struct xdr_stream *xdr,
83                                    void *result)
84 {
85         /* void return */
86         return 0;
87 }
88
89 static const struct rpc_procinfo nfs_localio_procedures[] = {
90         [LOCALIOPROC_UUID_IS_LOCAL] = {
91                 .p_proc = LOCALIOPROC_UUID_IS_LOCAL,
92                 .p_encode = localio_xdr_enc_uuidargs,
93                 .p_decode = localio_xdr_dec_uuidres,
94                 .p_arglen = XDR_QUADLEN(UUID_SIZE),
95                 .p_replen = 0,
96                 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
97                 .p_name = "UUID_IS_LOCAL",
98         },
99 };
100
101 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
102 static const struct rpc_version nfslocalio_version1 = {
103         .number                 = 1,
104         .nrprocs                = ARRAY_SIZE(nfs_localio_procedures),
105         .procs                  = nfs_localio_procedures,
106         .counts                 = nfs_localio_counts,
107 };
108
109 static const struct rpc_version *nfslocalio_version[] = {
110        [1]                      = &nfslocalio_version1,
111 };
112
113 extern const struct rpc_program nfslocalio_program;
114 static struct rpc_stat          nfslocalio_rpcstat = { &nfslocalio_program };
115
116 const struct rpc_program nfslocalio_program = {
117         .name                   = "nfslocalio",
118         .number                 = NFS_LOCALIO_PROGRAM,
119         .nrvers                 = ARRAY_SIZE(nfslocalio_version),
120         .version                = nfslocalio_version,
121         .stats                  = &nfslocalio_rpcstat,
122 };
123
124 /*
125  * nfs_init_localioclient - Initialise an NFS localio client connection
126  */
127 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
128 {
129         struct rpc_clnt *rpcclient_localio;
130
131         rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
132                                                  &nfslocalio_program, 1);
133
134         dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
135                 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
136                 (IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
137
138         return rpcclient_localio;
139 }
140
141 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
142 {
143         u8 uuid[UUID_SIZE];
144         struct rpc_message msg = {
145                 .rpc_argp = &uuid,
146         };
147         struct rpc_clnt *rpcclient_localio;
148         int status;
149
150         rpcclient_localio = nfs_init_localioclient(clp);
151         if (IS_ERR(rpcclient_localio))
152                 return false;
153
154         export_uuid(uuid, &clp->cl_uuid.uuid);
155
156         msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
157         status = rpc_call_sync(rpcclient_localio, &msg, 0);
158         dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
159                 __func__, status);
160         rpc_shutdown_client(rpcclient_localio);
161
162         /* Server is only local if it initialized required struct members */
163         if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom)
164                 return false;
165
166         return true;
167 }
168
169 /*
170  * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
171  * - called after alloc_client and init_client (so cl_rpcclient exists)
172  * - this function is idempotent, it can be called for old or new clients
173  */
174 void nfs_local_probe(struct nfs_client *clp)
175 {
176         /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
177         if (!localio_enabled ||
178             clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
179                 nfs_localio_disable_client(clp);
180                 return;
181         }
182
183         if (nfs_client_is_local(clp)) {
184                 /* If already enabled, disable and re-enable */
185                 nfs_localio_disable_client(clp);
186         }
187
188         if (!nfs_uuid_begin(&clp->cl_uuid))
189                 return;
190         if (nfs_server_uuid_is_local(clp))
191                 nfs_localio_enable_client(clp);
192         nfs_uuid_end(&clp->cl_uuid);
193 }
194 EXPORT_SYMBOL_GPL(nfs_local_probe);
195
196 void nfs_local_probe_async_work(struct work_struct *work)
197 {
198         struct nfs_client *clp =
199                 container_of(work, struct nfs_client, cl_local_probe_work);
200
201         nfs_local_probe(clp);
202 }
203
204 void nfs_local_probe_async(struct nfs_client *clp)
205 {
206         queue_work(nfsiod_workqueue, &clp->cl_local_probe_work);
207 }
208 EXPORT_SYMBOL_GPL(nfs_local_probe_async);
209
210 static inline struct nfsd_file *nfs_local_file_get(struct nfsd_file *nf)
211 {
212         return nfs_to->nfsd_file_get(nf);
213 }
214
215 static inline void nfs_local_file_put(struct nfsd_file *nf)
216 {
217         nfs_to->nfsd_file_put(nf);
218 }
219
220 /*
221  * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
222  *
223  * Returns a pointer to a struct nfsd_file or ERR_PTR.
224  * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local().
225  */
226 static struct nfsd_file *
227 __nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
228                     struct nfs_fh *fh, struct nfs_file_localio *nfl,
229                     const fmode_t mode)
230 {
231         struct nfsd_file *localio;
232
233         localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
234                                     cred, fh, nfl, mode);
235         if (IS_ERR(localio)) {
236                 int status = PTR_ERR(localio);
237                 trace_nfs_local_open_fh(fh, mode, status);
238                 switch (status) {
239                 case -ENOMEM:
240                 case -ENXIO:
241                 case -ENOENT:
242                         /* Revalidate localio, will disable if unsupported */
243                         nfs_local_probe(clp);
244                 }
245         }
246         return localio;
247 }
248
249 /*
250  * nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
251  * First checking if the open nfsd_file is already cached, otherwise
252  * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio.
253  *
254  * Returns a pointer to a struct nfsd_file or NULL.
255  */
256 struct nfsd_file *
257 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
258                   struct nfs_fh *fh, struct nfs_file_localio *nfl,
259                   const fmode_t mode)
260 {
261         struct nfsd_file *nf, *new, __rcu **pnf;
262
263         if (!nfs_server_is_local(clp))
264                 return NULL;
265         if (mode & ~(FMODE_READ | FMODE_WRITE))
266                 return NULL;
267
268         if (mode & FMODE_WRITE)
269                 pnf = &nfl->rw_file;
270         else
271                 pnf = &nfl->ro_file;
272
273         new = NULL;
274         rcu_read_lock();
275         nf = rcu_dereference(*pnf);
276         if (!nf) {
277                 rcu_read_unlock();
278                 new = __nfs_local_open_fh(clp, cred, fh, nfl, mode);
279                 if (IS_ERR(new))
280                         return NULL;
281                 /* try to swap in the pointer */
282                 spin_lock(&clp->cl_uuid.lock);
283                 nf = rcu_dereference_protected(*pnf, 1);
284                 if (!nf) {
285                         nf = new;
286                         new = NULL;
287                         rcu_assign_pointer(*pnf, nf);
288                 }
289                 spin_unlock(&clp->cl_uuid.lock);
290                 rcu_read_lock();
291         }
292         nf = nfs_local_file_get(nf);
293         rcu_read_unlock();
294         if (new)
295                 nfs_to_nfsd_file_put_local(new);
296         return nf;
297 }
298 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
299
300 static struct bio_vec *
301 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
302                 unsigned int npages, gfp_t flags)
303 {
304         struct bio_vec *bvec, *p;
305
306         bvec = kmalloc_array(npages, sizeof(*bvec), flags);
307         if (bvec != NULL) {
308                 for (p = bvec; npages > 0; p++, pagevec++, npages--) {
309                         p->bv_page = *pagevec;
310                         p->bv_len = PAGE_SIZE;
311                         p->bv_offset = 0;
312                 }
313         }
314         return bvec;
315 }
316
317 static void
318 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
319 {
320         kfree(iocb->bvec);
321         kfree(iocb);
322 }
323
324 static struct nfs_local_kiocb *
325 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
326                      struct file *file, gfp_t flags)
327 {
328         struct nfs_local_kiocb *iocb;
329
330         iocb = kmalloc(sizeof(*iocb), flags);
331         if (iocb == NULL)
332                 return NULL;
333         iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
334                         hdr->page_array.npages, flags);
335         if (iocb->bvec == NULL) {
336                 kfree(iocb);
337                 return NULL;
338         }
339
340         if (localio_O_DIRECT_semantics &&
341             test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
342                 iocb->kiocb.ki_filp = file;
343                 iocb->kiocb.ki_flags = IOCB_DIRECT;
344         } else
345                 init_sync_kiocb(&iocb->kiocb, file);
346
347         iocb->kiocb.ki_pos = hdr->args.offset;
348         iocb->hdr = hdr;
349         iocb->kiocb.ki_flags &= ~IOCB_APPEND;
350         iocb->aio_complete_work = NULL;
351
352         return iocb;
353 }
354
355 static void
356 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
357 {
358         struct nfs_pgio_header *hdr = iocb->hdr;
359
360         iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
361                       hdr->args.count + hdr->args.pgbase);
362         if (hdr->args.pgbase != 0)
363                 iov_iter_advance(i, hdr->args.pgbase);
364 }
365
366 static void
367 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
368                 const struct rpc_call_ops *call_ops)
369 {
370         call_ops->rpc_call_done(&hdr->task, hdr);
371         call_ops->rpc_release(hdr);
372 }
373
374 static void
375 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
376                 const struct rpc_call_ops *call_ops)
377 {
378         hdr->task.tk_ops = call_ops;
379         if (!hdr->task.tk_start)
380                 hdr->task.tk_start = ktime_get();
381 }
382
383 static void
384 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
385 {
386         if (status >= 0) {
387                 hdr->res.count = status;
388                 hdr->res.op_status = NFS4_OK;
389                 hdr->task.tk_status = 0;
390         } else {
391                 hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
392                 hdr->task.tk_status = status;
393         }
394 }
395
396 static void
397 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
398 {
399         struct nfs_pgio_header *hdr = iocb->hdr;
400
401         nfs_local_file_put(iocb->localio);
402         nfs_local_iocb_free(iocb);
403         nfs_local_hdr_release(hdr, hdr->task.tk_ops);
404 }
405
406 /*
407  * Complete the I/O from iocb->kiocb.ki_complete()
408  *
409  * Note that this function can be called from a bottom half context,
410  * hence we need to queue the rpc_call_done() etc to a workqueue
411  */
412 static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
413 {
414         INIT_WORK(&iocb->work, iocb->aio_complete_work);
415         queue_work(nfsiod_workqueue, &iocb->work);
416 }
417
418 static void
419 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
420 {
421         struct nfs_pgio_header *hdr = iocb->hdr;
422         struct file *filp = iocb->kiocb.ki_filp;
423
424         nfs_local_pgio_done(hdr, status);
425
426         /*
427          * Must clear replen otherwise NFSv3 data corruption will occur
428          * if/when switching from LOCALIO back to using normal RPC.
429          */
430         hdr->res.replen = 0;
431
432         if (hdr->res.count != hdr->args.count ||
433             hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
434                 hdr->res.eof = true;
435
436         dprintk("%s: read %ld bytes eof %d.\n", __func__,
437                         status > 0 ? status : 0, hdr->res.eof);
438 }
439
440 static void nfs_local_read_aio_complete_work(struct work_struct *work)
441 {
442         struct nfs_local_kiocb *iocb =
443                 container_of(work, struct nfs_local_kiocb, work);
444
445         nfs_local_pgio_release(iocb);
446 }
447
448 static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
449 {
450         struct nfs_local_kiocb *iocb =
451                 container_of(kiocb, struct nfs_local_kiocb, kiocb);
452
453         nfs_local_read_done(iocb, ret);
454         nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
455 }
456
457 static void nfs_local_call_read(struct work_struct *work)
458 {
459         struct nfs_local_kiocb *iocb =
460                 container_of(work, struct nfs_local_kiocb, work);
461         struct file *filp = iocb->kiocb.ki_filp;
462         const struct cred *save_cred;
463         struct iov_iter iter;
464         ssize_t status;
465
466         save_cred = override_creds(filp->f_cred);
467
468         nfs_local_iter_init(&iter, iocb, READ);
469
470         status = filp->f_op->read_iter(&iocb->kiocb, &iter);
471         if (status != -EIOCBQUEUED) {
472                 nfs_local_read_done(iocb, status);
473                 nfs_local_pgio_release(iocb);
474         }
475
476         revert_creds(save_cred);
477 }
478
479 static int
480 nfs_do_local_read(struct nfs_pgio_header *hdr,
481                   struct nfsd_file *localio,
482                   const struct rpc_call_ops *call_ops)
483 {
484         struct nfs_local_kiocb *iocb;
485         struct file *file = nfs_to->nfsd_file_file(localio);
486
487         /* Don't support filesystems without read_iter */
488         if (!file->f_op->read_iter)
489                 return -EAGAIN;
490
491         dprintk("%s: vfs_read count=%u pos=%llu\n",
492                 __func__, hdr->args.count, hdr->args.offset);
493
494         iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL);
495         if (iocb == NULL)
496                 return -ENOMEM;
497         iocb->localio = localio;
498
499         nfs_local_pgio_init(hdr, call_ops);
500         hdr->res.eof = false;
501
502         if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
503                 iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
504                 iocb->aio_complete_work = nfs_local_read_aio_complete_work;
505         }
506
507         INIT_WORK(&iocb->work, nfs_local_call_read);
508         queue_work(nfslocaliod_workqueue, &iocb->work);
509
510         return 0;
511 }
512
513 static void
514 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
515 {
516         struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
517         u32 *verf = (u32 *)verifier->data;
518         int seq = 0;
519
520         do {
521                 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
522                 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
523                 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
524         } while (need_seqretry(&clp->cl_boot_lock, seq));
525         done_seqretry(&clp->cl_boot_lock, seq);
526 }
527
528 static void
529 nfs_reset_boot_verifier(struct inode *inode)
530 {
531         struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
532
533         write_seqlock(&clp->cl_boot_lock);
534         ktime_get_real_ts64(&clp->cl_nfssvc_boot);
535         write_sequnlock(&clp->cl_boot_lock);
536 }
537
538 static void
539 nfs_set_local_verifier(struct inode *inode,
540                 struct nfs_writeverf *verf,
541                 enum nfs3_stable_how how)
542 {
543         nfs_copy_boot_verifier(&verf->verifier, inode);
544         verf->committed = how;
545 }
546
547 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
548 static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
549 {
550         u32 request_mask = STATX_BASIC_STATS;
551
552         if (version == 4)
553                 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
554         return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
555 }
556
557 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
558 static u64 __nfsd4_change_attribute(const struct kstat *stat,
559                                     const struct inode *inode)
560 {
561         u64 chattr;
562
563         if (stat->result_mask & STATX_CHANGE_COOKIE) {
564                 chattr = stat->change_cookie;
565                 if (S_ISREG(inode->i_mode) &&
566                     !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
567                         chattr += (u64)stat->ctime.tv_sec << 30;
568                         chattr += stat->ctime.tv_nsec;
569                 }
570         } else {
571                 chattr = time_to_chattr(&stat->ctime);
572         }
573         return chattr;
574 }
575
576 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
577 {
578         struct kstat stat;
579         struct file *filp = iocb->kiocb.ki_filp;
580         struct nfs_pgio_header *hdr = iocb->hdr;
581         struct nfs_fattr *fattr = hdr->res.fattr;
582         int version = NFS_PROTO(hdr->inode)->version;
583
584         if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
585                 return;
586
587         fattr->valid = (NFS_ATTR_FATTR_FILEID |
588                         NFS_ATTR_FATTR_CHANGE |
589                         NFS_ATTR_FATTR_SIZE |
590                         NFS_ATTR_FATTR_ATIME |
591                         NFS_ATTR_FATTR_MTIME |
592                         NFS_ATTR_FATTR_CTIME |
593                         NFS_ATTR_FATTR_SPACE_USED);
594
595         fattr->fileid = stat.ino;
596         fattr->size = stat.size;
597         fattr->atime = stat.atime;
598         fattr->mtime = stat.mtime;
599         fattr->ctime = stat.ctime;
600         if (version == 4) {
601                 fattr->change_attr =
602                         __nfsd4_change_attribute(&stat, file_inode(filp));
603         } else
604                 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
605         fattr->du.nfs3.used = stat.blocks << 9;
606 }
607
608 static void
609 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
610 {
611         struct nfs_pgio_header *hdr = iocb->hdr;
612         struct inode *inode = hdr->inode;
613
614         dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
615
616         /* Handle short writes as if they are ENOSPC */
617         if (status > 0 && status < hdr->args.count) {
618                 hdr->mds_offset += status;
619                 hdr->args.offset += status;
620                 hdr->args.pgbase += status;
621                 hdr->args.count -= status;
622                 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
623                 status = -ENOSPC;
624         }
625         if (status < 0)
626                 nfs_reset_boot_verifier(inode);
627
628         nfs_local_pgio_done(hdr, status);
629 }
630
631 static void nfs_local_write_aio_complete_work(struct work_struct *work)
632 {
633         struct nfs_local_kiocb *iocb =
634                 container_of(work, struct nfs_local_kiocb, work);
635
636         nfs_local_vfs_getattr(iocb);
637         nfs_local_pgio_release(iocb);
638 }
639
640 static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
641 {
642         struct nfs_local_kiocb *iocb =
643                 container_of(kiocb, struct nfs_local_kiocb, kiocb);
644
645         nfs_local_write_done(iocb, ret);
646         nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
647 }
648
649 static void nfs_local_call_write(struct work_struct *work)
650 {
651         struct nfs_local_kiocb *iocb =
652                 container_of(work, struct nfs_local_kiocb, work);
653         struct file *filp = iocb->kiocb.ki_filp;
654         unsigned long old_flags = current->flags;
655         const struct cred *save_cred;
656         struct iov_iter iter;
657         ssize_t status;
658
659         current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
660         save_cred = override_creds(filp->f_cred);
661
662         nfs_local_iter_init(&iter, iocb, WRITE);
663
664         file_start_write(filp);
665         status = filp->f_op->write_iter(&iocb->kiocb, &iter);
666         file_end_write(filp);
667         if (status != -EIOCBQUEUED) {
668                 nfs_local_write_done(iocb, status);
669                 nfs_local_vfs_getattr(iocb);
670                 nfs_local_pgio_release(iocb);
671         }
672
673         revert_creds(save_cred);
674         current->flags = old_flags;
675 }
676
677 static int
678 nfs_do_local_write(struct nfs_pgio_header *hdr,
679                    struct nfsd_file *localio,
680                    const struct rpc_call_ops *call_ops)
681 {
682         struct nfs_local_kiocb *iocb;
683         struct file *file = nfs_to->nfsd_file_file(localio);
684
685         /* Don't support filesystems without write_iter */
686         if (!file->f_op->write_iter)
687                 return -EAGAIN;
688
689         dprintk("%s: vfs_write count=%u pos=%llu %s\n",
690                 __func__, hdr->args.count, hdr->args.offset,
691                 (hdr->args.stable == NFS_UNSTABLE) ?  "unstable" : "stable");
692
693         iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO);
694         if (iocb == NULL)
695                 return -ENOMEM;
696         iocb->localio = localio;
697
698         switch (hdr->args.stable) {
699         default:
700                 break;
701         case NFS_DATA_SYNC:
702                 iocb->kiocb.ki_flags |= IOCB_DSYNC;
703                 break;
704         case NFS_FILE_SYNC:
705                 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
706         }
707
708         nfs_local_pgio_init(hdr, call_ops);
709
710         nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
711
712         if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
713                 iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
714                 iocb->aio_complete_work = nfs_local_write_aio_complete_work;
715         }
716
717         INIT_WORK(&iocb->work, nfs_local_call_write);
718         queue_work(nfslocaliod_workqueue, &iocb->work);
719
720         return 0;
721 }
722
723 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
724                    struct nfs_pgio_header *hdr,
725                    const struct rpc_call_ops *call_ops)
726 {
727         int status = 0;
728
729         if (!hdr->args.count)
730                 return 0;
731
732         switch (hdr->rw_mode) {
733         case FMODE_READ:
734                 status = nfs_do_local_read(hdr, localio, call_ops);
735                 break;
736         case FMODE_WRITE:
737                 status = nfs_do_local_write(hdr, localio, call_ops);
738                 break;
739         default:
740                 dprintk("%s: invalid mode: %d\n", __func__,
741                         hdr->rw_mode);
742                 status = -EINVAL;
743         }
744
745         if (status != 0) {
746                 if (status == -EAGAIN)
747                         nfs_localio_disable_client(clp);
748                 nfs_local_file_put(localio);
749                 hdr->task.tk_status = status;
750                 nfs_local_hdr_release(hdr, call_ops);
751         }
752         return status;
753 }
754
755 static void
756 nfs_local_init_commit(struct nfs_commit_data *data,
757                 const struct rpc_call_ops *call_ops)
758 {
759         data->task.tk_ops = call_ops;
760 }
761
762 static int
763 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
764 {
765         loff_t start = data->args.offset;
766         loff_t end = LLONG_MAX;
767
768         if (data->args.count > 0) {
769                 end = start + data->args.count - 1;
770                 if (end < start)
771                         end = LLONG_MAX;
772         }
773
774         dprintk("%s: commit %llu - %llu\n", __func__, start, end);
775         return vfs_fsync_range(filp, start, end, 0);
776 }
777
778 static void
779 nfs_local_commit_done(struct nfs_commit_data *data, int status)
780 {
781         if (status >= 0) {
782                 nfs_set_local_verifier(data->inode,
783                                 data->res.verf,
784                                 NFS_FILE_SYNC);
785                 data->res.op_status = NFS4_OK;
786                 data->task.tk_status = 0;
787         } else {
788                 nfs_reset_boot_verifier(data->inode);
789                 data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
790                 data->task.tk_status = status;
791         }
792 }
793
794 static void
795 nfs_local_release_commit_data(struct nfsd_file *localio,
796                 struct nfs_commit_data *data,
797                 const struct rpc_call_ops *call_ops)
798 {
799         nfs_local_file_put(localio);
800         call_ops->rpc_call_done(&data->task, data);
801         call_ops->rpc_release(data);
802 }
803
804 static void
805 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
806 {
807         nfs_local_release_commit_data(ctx->localio, ctx->data,
808                                       ctx->data->task.tk_ops);
809         kfree(ctx);
810 }
811
812 static void
813 nfs_local_fsync_work(struct work_struct *work)
814 {
815         struct nfs_local_fsync_ctx *ctx;
816         int status;
817
818         ctx = container_of(work, struct nfs_local_fsync_ctx, work);
819
820         status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
821                                       ctx->data);
822         nfs_local_commit_done(ctx->data, status);
823         if (ctx->done != NULL)
824                 complete(ctx->done);
825         nfs_local_fsync_ctx_free(ctx);
826 }
827
828 static struct nfs_local_fsync_ctx *
829 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
830                           struct nfsd_file *localio, gfp_t flags)
831 {
832         struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
833
834         if (ctx != NULL) {
835                 ctx->localio = localio;
836                 ctx->data = data;
837                 INIT_WORK(&ctx->work, nfs_local_fsync_work);
838                 ctx->done = NULL;
839         }
840         return ctx;
841 }
842
843 int nfs_local_commit(struct nfsd_file *localio,
844                      struct nfs_commit_data *data,
845                      const struct rpc_call_ops *call_ops, int how)
846 {
847         struct nfs_local_fsync_ctx *ctx;
848
849         ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
850         if (!ctx) {
851                 nfs_local_commit_done(data, -ENOMEM);
852                 nfs_local_release_commit_data(localio, data, call_ops);
853                 return -ENOMEM;
854         }
855
856         nfs_local_init_commit(data, call_ops);
857
858         if (how & FLUSH_SYNC) {
859                 DECLARE_COMPLETION_ONSTACK(done);
860                 ctx->done = &done;
861                 queue_work(nfsiod_workqueue, &ctx->work);
862                 wait_for_completion(&done);
863         } else
864                 queue_work(nfsiod_workqueue, &ctx->work);
865
866         return 0;
867 }
This page took 0.075746 seconds and 4 git commands to generate.