]> Git Repo - linux.git/blob - fs/dlm/lock.c
net: pcs: xpcs: add 1000BASE-X AN interrupt support
[linux.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms, bool local);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 static inline void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 /* This is only called to add a reference when the code already holds
324    a valid reference to the rsb, so there's no need for locking. */
325
326 static inline void hold_rsb(struct dlm_rsb *r)
327 {
328         kref_get(&r->res_ref);
329 }
330
331 void dlm_hold_rsb(struct dlm_rsb *r)
332 {
333         hold_rsb(r);
334 }
335
336 /* When all references to the rsb are gone it's transferred to
337    the tossed list for later disposal. */
338
339 static void put_rsb(struct dlm_rsb *r)
340 {
341         struct dlm_ls *ls = r->res_ls;
342         uint32_t bucket = r->res_bucket;
343         int rv;
344
345         rv = kref_put_lock(&r->res_ref, toss_rsb,
346                            &ls->ls_rsbtbl[bucket].lock);
347         if (rv)
348                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
349 }
350
351 void dlm_put_rsb(struct dlm_rsb *r)
352 {
353         put_rsb(r);
354 }
355
356 static int pre_rsb_struct(struct dlm_ls *ls)
357 {
358         struct dlm_rsb *r1, *r2;
359         int count = 0;
360
361         spin_lock(&ls->ls_new_rsb_spin);
362         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363                 spin_unlock(&ls->ls_new_rsb_spin);
364                 return 0;
365         }
366         spin_unlock(&ls->ls_new_rsb_spin);
367
368         r1 = dlm_allocate_rsb(ls);
369         r2 = dlm_allocate_rsb(ls);
370
371         spin_lock(&ls->ls_new_rsb_spin);
372         if (r1) {
373                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374                 ls->ls_new_rsb_count++;
375         }
376         if (r2) {
377                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378                 ls->ls_new_rsb_count++;
379         }
380         count = ls->ls_new_rsb_count;
381         spin_unlock(&ls->ls_new_rsb_spin);
382
383         if (!count)
384                 return -ENOMEM;
385         return 0;
386 }
387
388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
389    unlock any spinlocks, go back and call pre_rsb_struct again.
390    Otherwise, take an rsb off the list and return it. */
391
392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
393                           struct dlm_rsb **r_ret)
394 {
395         struct dlm_rsb *r;
396         int count;
397
398         spin_lock(&ls->ls_new_rsb_spin);
399         if (list_empty(&ls->ls_new_rsb)) {
400                 count = ls->ls_new_rsb_count;
401                 spin_unlock(&ls->ls_new_rsb_spin);
402                 log_debug(ls, "find_rsb retry %d %d %s",
403                           count, dlm_config.ci_new_rsb_count,
404                           (const char *)name);
405                 return -EAGAIN;
406         }
407
408         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
409         list_del(&r->res_hashchain);
410         /* Convert the empty list_head to a NULL rb_node for tree usage: */
411         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
412         ls->ls_new_rsb_count--;
413         spin_unlock(&ls->ls_new_rsb_spin);
414
415         r->res_ls = ls;
416         r->res_length = len;
417         memcpy(r->res_name, name, len);
418         mutex_init(&r->res_mutex);
419
420         INIT_LIST_HEAD(&r->res_lookup);
421         INIT_LIST_HEAD(&r->res_grantqueue);
422         INIT_LIST_HEAD(&r->res_convertqueue);
423         INIT_LIST_HEAD(&r->res_waitqueue);
424         INIT_LIST_HEAD(&r->res_root_list);
425         INIT_LIST_HEAD(&r->res_recover_list);
426
427         *r_ret = r;
428         return 0;
429 }
430
431 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
432 {
433         char maxname[DLM_RESNAME_MAXLEN];
434
435         memset(maxname, 0, DLM_RESNAME_MAXLEN);
436         memcpy(maxname, name, nlen);
437         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
438 }
439
440 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
441                         struct dlm_rsb **r_ret)
442 {
443         struct rb_node *node = tree->rb_node;
444         struct dlm_rsb *r;
445         int rc;
446
447         while (node) {
448                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
449                 rc = rsb_cmp(r, name, len);
450                 if (rc < 0)
451                         node = node->rb_left;
452                 else if (rc > 0)
453                         node = node->rb_right;
454                 else
455                         goto found;
456         }
457         *r_ret = NULL;
458         return -EBADR;
459
460  found:
461         *r_ret = r;
462         return 0;
463 }
464
465 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
466 {
467         struct rb_node **newn = &tree->rb_node;
468         struct rb_node *parent = NULL;
469         int rc;
470
471         while (*newn) {
472                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
473                                                res_hashnode);
474
475                 parent = *newn;
476                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
477                 if (rc < 0)
478                         newn = &parent->rb_left;
479                 else if (rc > 0)
480                         newn = &parent->rb_right;
481                 else {
482                         log_print("rsb_insert match");
483                         dlm_dump_rsb(rsb);
484                         dlm_dump_rsb(cur);
485                         return -EEXIST;
486                 }
487         }
488
489         rb_link_node(&rsb->res_hashnode, parent, newn);
490         rb_insert_color(&rsb->res_hashnode, tree);
491         return 0;
492 }
493
494 /*
495  * Find rsb in rsbtbl and potentially create/add one
496  *
497  * Delaying the release of rsb's has a similar benefit to applications keeping
498  * NL locks on an rsb, but without the guarantee that the cached master value
499  * will still be valid when the rsb is reused.  Apps aren't always smart enough
500  * to keep NL locks on an rsb that they may lock again shortly; this can lead
501  * to excessive master lookups and removals if we don't delay the release.
502  *
503  * Searching for an rsb means looking through both the normal list and toss
504  * list.  When found on the toss list the rsb is moved to the normal list with
505  * ref count of 1; when found on normal list the ref count is incremented.
506  *
507  * rsb's on the keep list are being used locally and refcounted.
508  * rsb's on the toss list are not being used locally, and are not refcounted.
509  *
510  * The toss list rsb's were either
511  * - previously used locally but not any more (were on keep list, then
512  *   moved to toss list when last refcount dropped)
513  * - created and put on toss list as a directory record for a lookup
514  *   (we are the dir node for the res, but are not using the res right now,
515  *   but some other node is)
516  *
517  * The purpose of find_rsb() is to return a refcounted rsb for local use.
518  * So, if the given rsb is on the toss list, it is moved to the keep list
519  * before being returned.
520  *
521  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
522  * more refcounts exist, so the rsb is moved from the keep list to the
523  * toss list.
524  *
525  * rsb's on both keep and toss lists are used for doing a name to master
526  * lookups.  rsb's that are in use locally (and being refcounted) are on
527  * the keep list, rsb's that are not in use locally (not refcounted) and
528  * only exist for name/master lookups are on the toss list.
529  *
530  * rsb's on the toss list who's dir_nodeid is not local can have stale
531  * name/master mappings.  So, remote requests on such rsb's can potentially
532  * return with an error, which means the mapping is stale and needs to
533  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
534  * first_lkid is to keep only a single outstanding request on an rsb
535  * while that rsb has a potentially stale master.)
536  */
537
538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
539                         uint32_t hash, uint32_t b,
540                         int dir_nodeid, int from_nodeid,
541                         unsigned int flags, struct dlm_rsb **r_ret)
542 {
543         struct dlm_rsb *r = NULL;
544         int our_nodeid = dlm_our_nodeid();
545         int from_local = 0;
546         int from_other = 0;
547         int from_dir = 0;
548         int create = 0;
549         int error;
550
551         if (flags & R_RECEIVE_REQUEST) {
552                 if (from_nodeid == dir_nodeid)
553                         from_dir = 1;
554                 else
555                         from_other = 1;
556         } else if (flags & R_REQUEST) {
557                 from_local = 1;
558         }
559
560         /*
561          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
562          * from_nodeid has sent us a lock in dlm_recover_locks, believing
563          * we're the new master.  Our local recovery may not have set
564          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
565          * create the rsb; dlm_recover_process_copy() will handle EBADR
566          * by resending.
567          *
568          * If someone sends us a request, we are the dir node, and we do
569          * not find the rsb anywhere, then recreate it.  This happens if
570          * someone sends us a request after we have removed/freed an rsb
571          * from our toss list.  (They sent a request instead of lookup
572          * because they are using an rsb from their toss list.)
573          */
574
575         if (from_local || from_dir ||
576             (from_other && (dir_nodeid == our_nodeid))) {
577                 create = 1;
578         }
579
580  retry:
581         if (create) {
582                 error = pre_rsb_struct(ls);
583                 if (error < 0)
584                         goto out;
585         }
586
587         spin_lock(&ls->ls_rsbtbl[b].lock);
588
589         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
590         if (error)
591                 goto do_toss;
592         
593         /*
594          * rsb is active, so we can't check master_nodeid without lock_rsb.
595          */
596
597         kref_get(&r->res_ref);
598         goto out_unlock;
599
600
601  do_toss:
602         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
603         if (error)
604                 goto do_new;
605
606         /*
607          * rsb found inactive (master_nodeid may be out of date unless
608          * we are the dir_nodeid or were the master)  No other thread
609          * is using this rsb because it's on the toss list, so we can
610          * look at or update res_master_nodeid without lock_rsb.
611          */
612
613         if ((r->res_master_nodeid != our_nodeid) && from_other) {
614                 /* our rsb was not master, and another node (not the dir node)
615                    has sent us a request */
616                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
617                           from_nodeid, r->res_master_nodeid, dir_nodeid,
618                           r->res_name);
619                 error = -ENOTBLK;
620                 goto out_unlock;
621         }
622
623         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
624                 /* don't think this should ever happen */
625                 log_error(ls, "find_rsb toss from_dir %d master %d",
626                           from_nodeid, r->res_master_nodeid);
627                 dlm_print_rsb(r);
628                 /* fix it and go on */
629                 r->res_master_nodeid = our_nodeid;
630                 r->res_nodeid = 0;
631                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
632                 r->res_first_lkid = 0;
633         }
634
635         if (from_local && (r->res_master_nodeid != our_nodeid)) {
636                 /* Because we have held no locks on this rsb,
637                    res_master_nodeid could have become stale. */
638                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
639                 r->res_first_lkid = 0;
640         }
641
642         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
644         goto out_unlock;
645
646
647  do_new:
648         /*
649          * rsb not found
650          */
651
652         if (error == -EBADR && !create)
653                 goto out_unlock;
654
655         error = get_rsb_struct(ls, name, len, &r);
656         if (error == -EAGAIN) {
657                 spin_unlock(&ls->ls_rsbtbl[b].lock);
658                 goto retry;
659         }
660         if (error)
661                 goto out_unlock;
662
663         r->res_hash = hash;
664         r->res_bucket = b;
665         r->res_dir_nodeid = dir_nodeid;
666         kref_init(&r->res_ref);
667
668         if (from_dir) {
669                 /* want to see how often this happens */
670                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
671                           from_nodeid, r->res_name);
672                 r->res_master_nodeid = our_nodeid;
673                 r->res_nodeid = 0;
674                 goto out_add;
675         }
676
677         if (from_other && (dir_nodeid != our_nodeid)) {
678                 /* should never happen */
679                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
680                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
681                 dlm_free_rsb(r);
682                 r = NULL;
683                 error = -ENOTBLK;
684                 goto out_unlock;
685         }
686
687         if (from_other) {
688                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
689                           from_nodeid, dir_nodeid, r->res_name);
690         }
691
692         if (dir_nodeid == our_nodeid) {
693                 /* When we are the dir nodeid, we can set the master
694                    node immediately */
695                 r->res_master_nodeid = our_nodeid;
696                 r->res_nodeid = 0;
697         } else {
698                 /* set_master will send_lookup to dir_nodeid */
699                 r->res_master_nodeid = 0;
700                 r->res_nodeid = -1;
701         }
702
703  out_add:
704         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
705  out_unlock:
706         spin_unlock(&ls->ls_rsbtbl[b].lock);
707  out:
708         *r_ret = r;
709         return error;
710 }
711
712 /* During recovery, other nodes can send us new MSTCPY locks (from
713    dlm_recover_locks) before we've made ourself master (in
714    dlm_recover_masters). */
715
716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
717                           uint32_t hash, uint32_t b,
718                           int dir_nodeid, int from_nodeid,
719                           unsigned int flags, struct dlm_rsb **r_ret)
720 {
721         struct dlm_rsb *r = NULL;
722         int our_nodeid = dlm_our_nodeid();
723         int recover = (flags & R_RECEIVE_RECOVER);
724         int error;
725
726  retry:
727         error = pre_rsb_struct(ls);
728         if (error < 0)
729                 goto out;
730
731         spin_lock(&ls->ls_rsbtbl[b].lock);
732
733         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
734         if (error)
735                 goto do_toss;
736
737         /*
738          * rsb is active, so we can't check master_nodeid without lock_rsb.
739          */
740
741         kref_get(&r->res_ref);
742         goto out_unlock;
743
744
745  do_toss:
746         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
747         if (error)
748                 goto do_new;
749
750         /*
751          * rsb found inactive. No other thread is using this rsb because
752          * it's on the toss list, so we can look at or update
753          * res_master_nodeid without lock_rsb.
754          */
755
756         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
757                 /* our rsb is not master, and another node has sent us a
758                    request; this should never happen */
759                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
760                           from_nodeid, r->res_master_nodeid, dir_nodeid);
761                 dlm_print_rsb(r);
762                 error = -ENOTBLK;
763                 goto out_unlock;
764         }
765
766         if (!recover && (r->res_master_nodeid != our_nodeid) &&
767             (dir_nodeid == our_nodeid)) {
768                 /* our rsb is not master, and we are dir; may as well fix it;
769                    this should never happen */
770                 log_error(ls, "find_rsb toss our %d master %d dir %d",
771                           our_nodeid, r->res_master_nodeid, dir_nodeid);
772                 dlm_print_rsb(r);
773                 r->res_master_nodeid = our_nodeid;
774                 r->res_nodeid = 0;
775         }
776
777         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
779         goto out_unlock;
780
781
782  do_new:
783         /*
784          * rsb not found
785          */
786
787         error = get_rsb_struct(ls, name, len, &r);
788         if (error == -EAGAIN) {
789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
790                 goto retry;
791         }
792         if (error)
793                 goto out_unlock;
794
795         r->res_hash = hash;
796         r->res_bucket = b;
797         r->res_dir_nodeid = dir_nodeid;
798         r->res_master_nodeid = dir_nodeid;
799         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
800         kref_init(&r->res_ref);
801
802         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
803  out_unlock:
804         spin_unlock(&ls->ls_rsbtbl[b].lock);
805  out:
806         *r_ret = r;
807         return error;
808 }
809
810 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
811                     int from_nodeid, unsigned int flags,
812                     struct dlm_rsb **r_ret)
813 {
814         uint32_t hash, b;
815         int dir_nodeid;
816
817         if (len > DLM_RESNAME_MAXLEN)
818                 return -EINVAL;
819
820         hash = jhash(name, len, 0);
821         b = hash & (ls->ls_rsbtbl_size - 1);
822
823         dir_nodeid = dlm_hash2nodeid(ls, hash);
824
825         if (dlm_no_directory(ls))
826                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
827                                       from_nodeid, flags, r_ret);
828         else
829                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
830                                       from_nodeid, flags, r_ret);
831 }
832
833 /* we have received a request and found that res_master_nodeid != our_nodeid,
834    so we need to return an error or make ourself the master */
835
836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
837                                   int from_nodeid)
838 {
839         if (dlm_no_directory(ls)) {
840                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
841                           from_nodeid, r->res_master_nodeid,
842                           r->res_dir_nodeid);
843                 dlm_print_rsb(r);
844                 return -ENOTBLK;
845         }
846
847         if (from_nodeid != r->res_dir_nodeid) {
848                 /* our rsb is not master, and another node (not the dir node)
849                    has sent us a request.  this is much more common when our
850                    master_nodeid is zero, so limit debug to non-zero.  */
851
852                 if (r->res_master_nodeid) {
853                         log_debug(ls, "validate master from_other %d master %d "
854                                   "dir %d first %x %s", from_nodeid,
855                                   r->res_master_nodeid, r->res_dir_nodeid,
856                                   r->res_first_lkid, r->res_name);
857                 }
858                 return -ENOTBLK;
859         } else {
860                 /* our rsb is not master, but the dir nodeid has sent us a
861                    request; this could happen with master 0 / res_nodeid -1 */
862
863                 if (r->res_master_nodeid) {
864                         log_error(ls, "validate master from_dir %d master %d "
865                                   "first %x %s",
866                                   from_nodeid, r->res_master_nodeid,
867                                   r->res_first_lkid, r->res_name);
868                 }
869
870                 r->res_master_nodeid = dlm_our_nodeid();
871                 r->res_nodeid = 0;
872                 return 0;
873         }
874 }
875
876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
877                                 int from_nodeid, bool toss_list, unsigned int flags,
878                                 int *r_nodeid, int *result)
879 {
880         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
881         int from_master = (flags & DLM_LU_RECOVER_DIR);
882
883         if (r->res_dir_nodeid != our_nodeid) {
884                 /* should not happen, but may as well fix it and carry on */
885                 log_error(ls, "%s res_dir %d our %d %s", __func__,
886                           r->res_dir_nodeid, our_nodeid, r->res_name);
887                 r->res_dir_nodeid = our_nodeid;
888         }
889
890         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
891                 /* Recovery uses this function to set a new master when
892                  * the previous master failed.  Setting NEW_MASTER will
893                  * force dlm_recover_masters to call recover_master on this
894                  * rsb even though the res_nodeid is no longer removed.
895                  */
896
897                 r->res_master_nodeid = from_nodeid;
898                 r->res_nodeid = from_nodeid;
899                 rsb_set_flag(r, RSB_NEW_MASTER);
900
901                 if (toss_list) {
902                         /* I don't think we should ever find it on toss list. */
903                         log_error(ls, "%s fix_master on toss", __func__);
904                         dlm_dump_rsb(r);
905                 }
906         }
907
908         if (from_master && (r->res_master_nodeid != from_nodeid)) {
909                 /* this will happen if from_nodeid became master during
910                  * a previous recovery cycle, and we aborted the previous
911                  * cycle before recovering this master value
912                  */
913
914                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
915                           __func__, from_nodeid, r->res_master_nodeid,
916                           r->res_nodeid, r->res_first_lkid, r->res_name);
917
918                 if (r->res_master_nodeid == our_nodeid) {
919                         log_error(ls, "from_master %d our_master", from_nodeid);
920                         dlm_dump_rsb(r);
921                         goto ret_assign;
922                 }
923
924                 r->res_master_nodeid = from_nodeid;
925                 r->res_nodeid = from_nodeid;
926                 rsb_set_flag(r, RSB_NEW_MASTER);
927         }
928
929         if (!r->res_master_nodeid) {
930                 /* this will happen if recovery happens while we're looking
931                  * up the master for this rsb
932                  */
933
934                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
935                           from_nodeid, r->res_first_lkid, r->res_name);
936                 r->res_master_nodeid = from_nodeid;
937                 r->res_nodeid = from_nodeid;
938         }
939
940         if (!from_master && !fix_master &&
941             (r->res_master_nodeid == from_nodeid)) {
942                 /* this can happen when the master sends remove, the dir node
943                  * finds the rsb on the keep list and ignores the remove,
944                  * and the former master sends a lookup
945                  */
946
947                 log_limit(ls, "%s from master %d flags %x first %x %s",
948                           __func__, from_nodeid, flags, r->res_first_lkid,
949                           r->res_name);
950         }
951
952  ret_assign:
953         *r_nodeid = r->res_master_nodeid;
954         if (result)
955                 *result = DLM_LU_MATCH;
956 }
957
958 /*
959  * We're the dir node for this res and another node wants to know the
960  * master nodeid.  During normal operation (non recovery) this is only
961  * called from receive_lookup(); master lookups when the local node is
962  * the dir node are done by find_rsb().
963  *
964  * normal operation, we are the dir node for a resource
965  * . _request_lock
966  * . set_master
967  * . send_lookup
968  * . receive_lookup
969  * . dlm_master_lookup flags 0
970  *
971  * recover directory, we are rebuilding dir for all resources
972  * . dlm_recover_directory
973  * . dlm_rcom_names
974  *   remote node sends back the rsb names it is master of and we are dir of
975  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
976  *   we either create new rsb setting remote node as master, or find existing
977  *   rsb and set master to be the remote node.
978  *
979  * recover masters, we are finding the new master for resources
980  * . dlm_recover_masters
981  * . recover_master
982  * . dlm_send_rcom_lookup
983  * . receive_rcom_lookup
984  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985  */
986
987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
988                       unsigned int flags, int *r_nodeid, int *result)
989 {
990         struct dlm_rsb *r = NULL;
991         uint32_t hash, b;
992         int our_nodeid = dlm_our_nodeid();
993         int dir_nodeid, error;
994
995         if (len > DLM_RESNAME_MAXLEN)
996                 return -EINVAL;
997
998         if (from_nodeid == our_nodeid) {
999                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1000                           our_nodeid, flags);
1001                 return -EINVAL;
1002         }
1003
1004         hash = jhash(name, len, 0);
1005         b = hash & (ls->ls_rsbtbl_size - 1);
1006
1007         dir_nodeid = dlm_hash2nodeid(ls, hash);
1008         if (dir_nodeid != our_nodeid) {
1009                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1010                           from_nodeid, dir_nodeid, our_nodeid, hash,
1011                           ls->ls_num_nodes);
1012                 *r_nodeid = -1;
1013                 return -EINVAL;
1014         }
1015
1016  retry:
1017         error = pre_rsb_struct(ls);
1018         if (error < 0)
1019                 return error;
1020
1021         spin_lock(&ls->ls_rsbtbl[b].lock);
1022         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1023         if (!error) {
1024                 /* because the rsb is active, we need to lock_rsb before
1025                  * checking/changing re_master_nodeid
1026                  */
1027
1028                 hold_rsb(r);
1029                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1030                 lock_rsb(r);
1031
1032                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1033                                     flags, r_nodeid, result);
1034
1035                 /* the rsb was active */
1036                 unlock_rsb(r);
1037                 put_rsb(r);
1038
1039                 return 0;
1040         }
1041
1042         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1043         if (error)
1044                 goto not_found;
1045
1046         /* because the rsb is inactive (on toss list), it's not refcounted
1047          * and lock_rsb is not used, but is protected by the rsbtbl lock
1048          */
1049
1050         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1051                             r_nodeid, result);
1052
1053         r->res_toss_time = jiffies;
1054         /* the rsb was inactive (on toss list) */
1055         spin_unlock(&ls->ls_rsbtbl[b].lock);
1056
1057         return 0;
1058
1059  not_found:
1060         error = get_rsb_struct(ls, name, len, &r);
1061         if (error == -EAGAIN) {
1062                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1063                 goto retry;
1064         }
1065         if (error)
1066                 goto out_unlock;
1067
1068         r->res_hash = hash;
1069         r->res_bucket = b;
1070         r->res_dir_nodeid = our_nodeid;
1071         r->res_master_nodeid = from_nodeid;
1072         r->res_nodeid = from_nodeid;
1073         kref_init(&r->res_ref);
1074         r->res_toss_time = jiffies;
1075
1076         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1077         if (error) {
1078                 /* should never happen */
1079                 dlm_free_rsb(r);
1080                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1081                 goto retry;
1082         }
1083
1084         if (result)
1085                 *result = DLM_LU_ADD;
1086         *r_nodeid = from_nodeid;
1087  out_unlock:
1088         spin_unlock(&ls->ls_rsbtbl[b].lock);
1089         return error;
1090 }
1091
1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1093 {
1094         struct rb_node *n;
1095         struct dlm_rsb *r;
1096         int i;
1097
1098         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099                 spin_lock(&ls->ls_rsbtbl[i].lock);
1100                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1101                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102                         if (r->res_hash == hash)
1103                                 dlm_dump_rsb(r);
1104                 }
1105                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106         }
1107 }
1108
1109 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1110 {
1111         struct dlm_rsb *r = NULL;
1112         uint32_t hash, b;
1113         int error;
1114
1115         hash = jhash(name, len, 0);
1116         b = hash & (ls->ls_rsbtbl_size - 1);
1117
1118         spin_lock(&ls->ls_rsbtbl[b].lock);
1119         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1120         if (!error)
1121                 goto out_dump;
1122
1123         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1124         if (error)
1125                 goto out;
1126  out_dump:
1127         dlm_dump_rsb(r);
1128  out:
1129         spin_unlock(&ls->ls_rsbtbl[b].lock);
1130 }
1131
1132 static void toss_rsb(struct kref *kref)
1133 {
1134         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1135         struct dlm_ls *ls = r->res_ls;
1136
1137         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1138         kref_init(&r->res_ref);
1139         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1141         r->res_toss_time = jiffies;
1142         set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1143         if (r->res_lvbptr) {
1144                 dlm_free_lvb(r->res_lvbptr);
1145                 r->res_lvbptr = NULL;
1146         }
1147 }
1148
1149 /* See comment for unhold_lkb */
1150
1151 static void unhold_rsb(struct dlm_rsb *r)
1152 {
1153         int rv;
1154         rv = kref_put(&r->res_ref, toss_rsb);
1155         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1156 }
1157
1158 static void kill_rsb(struct kref *kref)
1159 {
1160         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1161
1162         /* All work is done after the return from kref_put() so we
1163            can release the write_lock before the remove and free. */
1164
1165         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1166         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1167         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1168         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1169         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1170         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1171 }
1172
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174    The rsb must exist as long as any lkb's for it do. */
1175
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177 {
1178         hold_rsb(r);
1179         lkb->lkb_resource = r;
1180 }
1181
1182 static void detach_lkb(struct dlm_lkb *lkb)
1183 {
1184         if (lkb->lkb_resource) {
1185                 put_rsb(lkb->lkb_resource);
1186                 lkb->lkb_resource = NULL;
1187         }
1188 }
1189
1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1191                        int start, int end)
1192 {
1193         struct dlm_lkb *lkb;
1194         int rv;
1195
1196         lkb = dlm_allocate_lkb(ls);
1197         if (!lkb)
1198                 return -ENOMEM;
1199
1200         lkb->lkb_last_bast_mode = -1;
1201         lkb->lkb_nodeid = -1;
1202         lkb->lkb_grmode = DLM_LOCK_IV;
1203         kref_init(&lkb->lkb_ref);
1204         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207         INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208         spin_lock_init(&lkb->lkb_cb_lock);
1209         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1210
1211         idr_preload(GFP_NOFS);
1212         spin_lock(&ls->ls_lkbidr_spin);
1213         rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1214         if (rv >= 0)
1215                 lkb->lkb_id = rv;
1216         spin_unlock(&ls->ls_lkbidr_spin);
1217         idr_preload_end();
1218
1219         if (rv < 0) {
1220                 log_error(ls, "create_lkb idr error %d", rv);
1221                 dlm_free_lkb(lkb);
1222                 return rv;
1223         }
1224
1225         *lkb_ret = lkb;
1226         return 0;
1227 }
1228
1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1230 {
1231         return _create_lkb(ls, lkb_ret, 1, 0);
1232 }
1233
1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1235 {
1236         struct dlm_lkb *lkb;
1237
1238         spin_lock(&ls->ls_lkbidr_spin);
1239         lkb = idr_find(&ls->ls_lkbidr, lkid);
1240         if (lkb)
1241                 kref_get(&lkb->lkb_ref);
1242         spin_unlock(&ls->ls_lkbidr_spin);
1243
1244         *lkb_ret = lkb;
1245         return lkb ? 0 : -ENOENT;
1246 }
1247
1248 static void kill_lkb(struct kref *kref)
1249 {
1250         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1251
1252         /* All work is done after the return from kref_put() so we
1253            can release the write_lock before the detach_lkb */
1254
1255         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1256 }
1257
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1259    it so we need to provide the lockspace explicitly */
1260
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1262 {
1263         uint32_t lkid = lkb->lkb_id;
1264         int rv;
1265
1266         rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1267                            &ls->ls_lkbidr_spin);
1268         if (rv) {
1269                 idr_remove(&ls->ls_lkbidr, lkid);
1270                 spin_unlock(&ls->ls_lkbidr_spin);
1271
1272                 detach_lkb(lkb);
1273
1274                 /* for local/process lkbs, lvbptr points to caller's lksb */
1275                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276                         dlm_free_lvb(lkb->lkb_lvbptr);
1277                 dlm_free_lkb(lkb);
1278         }
1279
1280         return rv;
1281 }
1282
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1284 {
1285         struct dlm_ls *ls;
1286
1287         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1289
1290         ls = lkb->lkb_resource->res_ls;
1291         return __put_lkb(ls, lkb);
1292 }
1293
1294 /* This is only called to add a reference when the code already holds
1295    a valid reference to the lkb, so there's no need for locking. */
1296
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1298 {
1299         kref_get(&lkb->lkb_ref);
1300 }
1301
1302 static void unhold_lkb_assert(struct kref *kref)
1303 {
1304         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1305
1306         DLM_ASSERT(false, dlm_print_lkb(lkb););
1307 }
1308
1309 /* This is called when we need to remove a reference and are certain
1310    it's not the last ref.  e.g. del_lkb is always called between a
1311    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312    put_lkb would work fine, but would involve unnecessary locking */
1313
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1317 }
1318
1319 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1320                             int mode)
1321 {
1322         struct dlm_lkb *lkb = NULL, *iter;
1323
1324         list_for_each_entry(iter, head, lkb_statequeue)
1325                 if (iter->lkb_rqmode < mode) {
1326                         lkb = iter;
1327                         list_add_tail(new, &iter->lkb_statequeue);
1328                         break;
1329                 }
1330
1331         if (!lkb)
1332                 list_add_tail(new, head);
1333 }
1334
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1336
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1338 {
1339         kref_get(&lkb->lkb_ref);
1340
1341         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1342
1343         lkb->lkb_timestamp = ktime_get();
1344
1345         lkb->lkb_status = status;
1346
1347         switch (status) {
1348         case DLM_LKSTS_WAITING:
1349                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1351                 else
1352                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1353                 break;
1354         case DLM_LKSTS_GRANTED:
1355                 /* convention says granted locks kept in order of grmode */
1356                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357                                 lkb->lkb_grmode);
1358                 break;
1359         case DLM_LKSTS_CONVERT:
1360                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1362                 else
1363                         list_add_tail(&lkb->lkb_statequeue,
1364                                       &r->res_convertqueue);
1365                 break;
1366         default:
1367                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1368         }
1369 }
1370
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 {
1373         lkb->lkb_status = 0;
1374         list_del(&lkb->lkb_statequeue);
1375         unhold_lkb(lkb);
1376 }
1377
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1379 {
1380         hold_lkb(lkb);
1381         del_lkb(r, lkb);
1382         add_lkb(r, lkb, sts);
1383         unhold_lkb(lkb);
1384 }
1385
1386 static int msg_reply_type(int mstype)
1387 {
1388         switch (mstype) {
1389         case DLM_MSG_REQUEST:
1390                 return DLM_MSG_REQUEST_REPLY;
1391         case DLM_MSG_CONVERT:
1392                 return DLM_MSG_CONVERT_REPLY;
1393         case DLM_MSG_UNLOCK:
1394                 return DLM_MSG_UNLOCK_REPLY;
1395         case DLM_MSG_CANCEL:
1396                 return DLM_MSG_CANCEL_REPLY;
1397         case DLM_MSG_LOOKUP:
1398                 return DLM_MSG_LOOKUP_REPLY;
1399         }
1400         return -1;
1401 }
1402
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1404    a reply from a remote node */
1405
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1407 {
1408         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1409         int error = 0;
1410         int wc;
1411
1412         mutex_lock(&ls->ls_waiters_mutex);
1413
1414         if (is_overlap_unlock(lkb) ||
1415             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1416                 error = -EINVAL;
1417                 goto out;
1418         }
1419
1420         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1421                 switch (mstype) {
1422                 case DLM_MSG_UNLOCK:
1423                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1424                         break;
1425                 case DLM_MSG_CANCEL:
1426                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1427                         break;
1428                 default:
1429                         error = -EBUSY;
1430                         goto out;
1431                 }
1432                 wc = atomic_inc_return(&lkb->lkb_wait_count);
1433                 hold_lkb(lkb);
1434
1435                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1436                           lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
1437                           dlm_iflags_val(lkb));
1438                 goto out;
1439         }
1440
1441         wc = atomic_fetch_inc(&lkb->lkb_wait_count);
1442         DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
1443         lkb->lkb_wait_type = mstype;
1444         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1445         hold_lkb(lkb);
1446         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1447  out:
1448         if (error)
1449                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1450                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1451                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1452         mutex_unlock(&ls->ls_waiters_mutex);
1453         return error;
1454 }
1455
1456 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1457    list as part of process_requestqueue (e.g. a lookup that has an optimized
1458    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1459    set RESEND and dlm_recover_waiters_post() */
1460
1461 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1462                                 struct dlm_message *ms)
1463 {
1464         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1465         int overlap_done = 0;
1466
1467         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1468             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1470                 overlap_done = 1;
1471                 goto out_del;
1472         }
1473
1474         if (mstype == DLM_MSG_CANCEL_REPLY &&
1475             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1476                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1477                 overlap_done = 1;
1478                 goto out_del;
1479         }
1480
1481         /* Cancel state was preemptively cleared by a successful convert,
1482            see next comment, nothing to do. */
1483
1484         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1485             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1486                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1487                           lkb->lkb_id, lkb->lkb_wait_type);
1488                 return -1;
1489         }
1490
1491         /* Remove for the convert reply, and premptively remove for the
1492            cancel reply.  A convert has been granted while there's still
1493            an outstanding cancel on it (the cancel is moot and the result
1494            in the cancel reply should be 0).  We preempt the cancel reply
1495            because the app gets the convert result and then can follow up
1496            with another op, like convert.  This subsequent op would see the
1497            lingering state of the cancel and fail with -EBUSY. */
1498
1499         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1500             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1501             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1502                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1503                           lkb->lkb_id);
1504                 lkb->lkb_wait_type = 0;
1505                 atomic_dec(&lkb->lkb_wait_count);
1506                 unhold_lkb(lkb);
1507                 goto out_del;
1508         }
1509
1510         /* N.B. type of reply may not always correspond to type of original
1511            msg due to lookup->request optimization, verify others? */
1512
1513         if (lkb->lkb_wait_type) {
1514                 lkb->lkb_wait_type = 0;
1515                 goto out_del;
1516         }
1517
1518         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1519                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1520                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1521         return -1;
1522
1523  out_del:
1524         /* the force-unlock/cancel has completed and we haven't recvd a reply
1525            to the op that was in progress prior to the unlock/cancel; we
1526            give up on any reply to the earlier op.  FIXME: not sure when/how
1527            this would happen */
1528
1529         if (overlap_done && lkb->lkb_wait_type) {
1530                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1531                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1532                 atomic_dec(&lkb->lkb_wait_count);
1533                 unhold_lkb(lkb);
1534                 lkb->lkb_wait_type = 0;
1535         }
1536
1537         DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
1538
1539         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1540         if (atomic_dec_and_test(&lkb->lkb_wait_count))
1541                 list_del_init(&lkb->lkb_wait_reply);
1542         unhold_lkb(lkb);
1543         return 0;
1544 }
1545
1546 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1547 {
1548         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1549         int error;
1550
1551         mutex_lock(&ls->ls_waiters_mutex);
1552         error = _remove_from_waiters(lkb, mstype, NULL);
1553         mutex_unlock(&ls->ls_waiters_mutex);
1554         return error;
1555 }
1556
1557 /* Handles situations where we might be processing a "fake" or "local" reply in
1558    which we can't try to take waiters_mutex again. */
1559
1560 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
1561                                   bool local)
1562 {
1563         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564         int error;
1565
1566         if (!local)
1567                 mutex_lock(&ls->ls_waiters_mutex);
1568         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1569         if (!local)
1570                 mutex_unlock(&ls->ls_waiters_mutex);
1571         return error;
1572 }
1573
1574 static void shrink_bucket(struct dlm_ls *ls, int b)
1575 {
1576         struct rb_node *n, *next;
1577         struct dlm_rsb *r;
1578         char *name;
1579         int our_nodeid = dlm_our_nodeid();
1580         int remote_count = 0;
1581         int need_shrink = 0;
1582         int i, len, rv;
1583
1584         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1585
1586         spin_lock(&ls->ls_rsbtbl[b].lock);
1587
1588         if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1589                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1590                 return;
1591         }
1592
1593         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1594                 next = rb_next(n);
1595                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1596
1597                 /* If we're the directory record for this rsb, and
1598                    we're not the master of it, then we need to wait
1599                    for the master node to send us a dir remove for
1600                    before removing the dir record. */
1601
1602                 if (!dlm_no_directory(ls) &&
1603                     (r->res_master_nodeid != our_nodeid) &&
1604                     (dlm_dir_nodeid(r) == our_nodeid)) {
1605                         continue;
1606                 }
1607
1608                 need_shrink = 1;
1609
1610                 if (!time_after_eq(jiffies, r->res_toss_time +
1611                                    dlm_config.ci_toss_secs * HZ)) {
1612                         continue;
1613                 }
1614
1615                 if (!dlm_no_directory(ls) &&
1616                     (r->res_master_nodeid == our_nodeid) &&
1617                     (dlm_dir_nodeid(r) != our_nodeid)) {
1618
1619                         /* We're the master of this rsb but we're not
1620                            the directory record, so we need to tell the
1621                            dir node to remove the dir record. */
1622
1623                         ls->ls_remove_lens[remote_count] = r->res_length;
1624                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1625                                DLM_RESNAME_MAXLEN);
1626                         remote_count++;
1627
1628                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1629                                 break;
1630                         continue;
1631                 }
1632
1633                 if (!kref_put(&r->res_ref, kill_rsb)) {
1634                         log_error(ls, "tossed rsb in use %s", r->res_name);
1635                         continue;
1636                 }
1637
1638                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1639                 dlm_free_rsb(r);
1640         }
1641
1642         if (need_shrink)
1643                 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1644         else
1645                 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1646         spin_unlock(&ls->ls_rsbtbl[b].lock);
1647
1648         /*
1649          * While searching for rsb's to free, we found some that require
1650          * remote removal.  We leave them in place and find them again here
1651          * so there is a very small gap between removing them from the toss
1652          * list and sending the removal.  Keeping this gap small is
1653          * important to keep us (the master node) from being out of sync
1654          * with the remote dir node for very long.
1655          */
1656
1657         for (i = 0; i < remote_count; i++) {
1658                 name = ls->ls_remove_names[i];
1659                 len = ls->ls_remove_lens[i];
1660
1661                 spin_lock(&ls->ls_rsbtbl[b].lock);
1662                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1663                 if (rv) {
1664                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1665                         log_debug(ls, "remove_name not toss %s", name);
1666                         continue;
1667                 }
1668
1669                 if (r->res_master_nodeid != our_nodeid) {
1670                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1671                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1672                                   r->res_master_nodeid, r->res_dir_nodeid,
1673                                   our_nodeid, name);
1674                         continue;
1675                 }
1676
1677                 if (r->res_dir_nodeid == our_nodeid) {
1678                         /* should never happen */
1679                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1680                         log_error(ls, "remove_name dir %d master %d our %d %s",
1681                                   r->res_dir_nodeid, r->res_master_nodeid,
1682                                   our_nodeid, name);
1683                         continue;
1684                 }
1685
1686                 if (!time_after_eq(jiffies, r->res_toss_time +
1687                                    dlm_config.ci_toss_secs * HZ)) {
1688                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1689                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1690                                   r->res_toss_time, jiffies, name);
1691                         continue;
1692                 }
1693
1694                 if (!kref_put(&r->res_ref, kill_rsb)) {
1695                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1696                         log_error(ls, "remove_name in use %s", name);
1697                         continue;
1698                 }
1699
1700                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1701                 send_remove(r);
1702                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1703
1704                 dlm_free_rsb(r);
1705         }
1706 }
1707
1708 void dlm_scan_rsbs(struct dlm_ls *ls)
1709 {
1710         int i;
1711
1712         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1713                 shrink_bucket(ls, i);
1714                 if (dlm_locking_stopped(ls))
1715                         break;
1716                 cond_resched();
1717         }
1718 }
1719
1720 /* lkb is master or local copy */
1721
1722 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724         int b, len = r->res_ls->ls_lvblen;
1725
1726         /* b=1 lvb returned to caller
1727            b=0 lvb written to rsb or invalidated
1728            b=-1 do nothing */
1729
1730         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1731
1732         if (b == 1) {
1733                 if (!lkb->lkb_lvbptr)
1734                         return;
1735
1736                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1737                         return;
1738
1739                 if (!r->res_lvbptr)
1740                         return;
1741
1742                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1743                 lkb->lkb_lvbseq = r->res_lvbseq;
1744
1745         } else if (b == 0) {
1746                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1747                         rsb_set_flag(r, RSB_VALNOTVALID);
1748                         return;
1749                 }
1750
1751                 if (!lkb->lkb_lvbptr)
1752                         return;
1753
1754                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1755                         return;
1756
1757                 if (!r->res_lvbptr)
1758                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1759
1760                 if (!r->res_lvbptr)
1761                         return;
1762
1763                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1764                 r->res_lvbseq++;
1765                 lkb->lkb_lvbseq = r->res_lvbseq;
1766                 rsb_clear_flag(r, RSB_VALNOTVALID);
1767         }
1768
1769         if (rsb_flag(r, RSB_VALNOTVALID))
1770                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1771 }
1772
1773 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1774 {
1775         if (lkb->lkb_grmode < DLM_LOCK_PW)
1776                 return;
1777
1778         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1779                 rsb_set_flag(r, RSB_VALNOTVALID);
1780                 return;
1781         }
1782
1783         if (!lkb->lkb_lvbptr)
1784                 return;
1785
1786         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1787                 return;
1788
1789         if (!r->res_lvbptr)
1790                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1791
1792         if (!r->res_lvbptr)
1793                 return;
1794
1795         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1796         r->res_lvbseq++;
1797         rsb_clear_flag(r, RSB_VALNOTVALID);
1798 }
1799
1800 /* lkb is process copy (pc) */
1801
1802 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1803                             struct dlm_message *ms)
1804 {
1805         int b;
1806
1807         if (!lkb->lkb_lvbptr)
1808                 return;
1809
1810         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1811                 return;
1812
1813         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1814         if (b == 1) {
1815                 int len = receive_extralen(ms);
1816                 if (len > r->res_ls->ls_lvblen)
1817                         len = r->res_ls->ls_lvblen;
1818                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1819                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1820         }
1821 }
1822
1823 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1824    remove_lock -- used for unlock, removes lkb from granted
1825    revert_lock -- used for cancel, moves lkb from convert to granted
1826    grant_lock  -- used for request and convert, adds lkb to granted or
1827                   moves lkb from convert or waiting to granted
1828
1829    Each of these is used for master or local copy lkb's.  There is
1830    also a _pc() variation used to make the corresponding change on
1831    a process copy (pc) lkb. */
1832
1833 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1834 {
1835         del_lkb(r, lkb);
1836         lkb->lkb_grmode = DLM_LOCK_IV;
1837         /* this unhold undoes the original ref from create_lkb()
1838            so this leads to the lkb being freed */
1839         unhold_lkb(lkb);
1840 }
1841
1842 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1843 {
1844         set_lvb_unlock(r, lkb);
1845         _remove_lock(r, lkb);
1846 }
1847
1848 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1849 {
1850         _remove_lock(r, lkb);
1851 }
1852
1853 /* returns: 0 did nothing
1854             1 moved lock to granted
1855            -1 removed lock */
1856
1857 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1858 {
1859         int rv = 0;
1860
1861         lkb->lkb_rqmode = DLM_LOCK_IV;
1862
1863         switch (lkb->lkb_status) {
1864         case DLM_LKSTS_GRANTED:
1865                 break;
1866         case DLM_LKSTS_CONVERT:
1867                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1868                 rv = 1;
1869                 break;
1870         case DLM_LKSTS_WAITING:
1871                 del_lkb(r, lkb);
1872                 lkb->lkb_grmode = DLM_LOCK_IV;
1873                 /* this unhold undoes the original ref from create_lkb()
1874                    so this leads to the lkb being freed */
1875                 unhold_lkb(lkb);
1876                 rv = -1;
1877                 break;
1878         default:
1879                 log_print("invalid status for revert %d", lkb->lkb_status);
1880         }
1881         return rv;
1882 }
1883
1884 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886         return revert_lock(r, lkb);
1887 }
1888
1889 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890 {
1891         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1892                 lkb->lkb_grmode = lkb->lkb_rqmode;
1893                 if (lkb->lkb_status)
1894                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1895                 else
1896                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1897         }
1898
1899         lkb->lkb_rqmode = DLM_LOCK_IV;
1900         lkb->lkb_highbast = 0;
1901 }
1902
1903 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1904 {
1905         set_lvb_lock(r, lkb);
1906         _grant_lock(r, lkb);
1907 }
1908
1909 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1910                           struct dlm_message *ms)
1911 {
1912         set_lvb_lock_pc(r, lkb, ms);
1913         _grant_lock(r, lkb);
1914 }
1915
1916 /* called by grant_pending_locks() which means an async grant message must
1917    be sent to the requesting node in addition to granting the lock if the
1918    lkb belongs to a remote node. */
1919
1920 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1921 {
1922         grant_lock(r, lkb);
1923         if (is_master_copy(lkb))
1924                 send_grant(r, lkb);
1925         else
1926                 queue_cast(r, lkb, 0);
1927 }
1928
1929 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1930    change the granted/requested modes.  We're munging things accordingly in
1931    the process copy.
1932    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1933    conversion deadlock
1934    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1935    compatible with other granted locks */
1936
1937 static void munge_demoted(struct dlm_lkb *lkb)
1938 {
1939         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1940                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1941                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1942                 return;
1943         }
1944
1945         lkb->lkb_grmode = DLM_LOCK_NL;
1946 }
1947
1948 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1949 {
1950         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1951             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1952                 log_print("munge_altmode %x invalid reply type %d",
1953                           lkb->lkb_id, le32_to_cpu(ms->m_type));
1954                 return;
1955         }
1956
1957         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1958                 lkb->lkb_rqmode = DLM_LOCK_PR;
1959         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1960                 lkb->lkb_rqmode = DLM_LOCK_CW;
1961         else {
1962                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1963                 dlm_print_lkb(lkb);
1964         }
1965 }
1966
1967 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1968 {
1969         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1970                                            lkb_statequeue);
1971         if (lkb->lkb_id == first->lkb_id)
1972                 return 1;
1973
1974         return 0;
1975 }
1976
1977 /* Check if the given lkb conflicts with another lkb on the queue. */
1978
1979 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1980 {
1981         struct dlm_lkb *this;
1982
1983         list_for_each_entry(this, head, lkb_statequeue) {
1984                 if (this == lkb)
1985                         continue;
1986                 if (!modes_compat(this, lkb))
1987                         return 1;
1988         }
1989         return 0;
1990 }
1991
1992 /*
1993  * "A conversion deadlock arises with a pair of lock requests in the converting
1994  * queue for one resource.  The granted mode of each lock blocks the requested
1995  * mode of the other lock."
1996  *
1997  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1998  * convert queue from being granted, then deadlk/demote lkb.
1999  *
2000  * Example:
2001  * Granted Queue: empty
2002  * Convert Queue: NL->EX (first lock)
2003  *                PR->EX (second lock)
2004  *
2005  * The first lock can't be granted because of the granted mode of the second
2006  * lock and the second lock can't be granted because it's not first in the
2007  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2008  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2009  * flag set and return DEMOTED in the lksb flags.
2010  *
2011  * Originally, this function detected conv-deadlk in a more limited scope:
2012  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2013  * - if lkb1 was the first entry in the queue (not just earlier), and was
2014  *   blocked by the granted mode of lkb2, and there was nothing on the
2015  *   granted queue preventing lkb1 from being granted immediately, i.e.
2016  *   lkb2 was the only thing preventing lkb1 from being granted.
2017  *
2018  * That second condition meant we'd only say there was conv-deadlk if
2019  * resolving it (by demotion) would lead to the first lock on the convert
2020  * queue being granted right away.  It allowed conversion deadlocks to exist
2021  * between locks on the convert queue while they couldn't be granted anyway.
2022  *
2023  * Now, we detect and take action on conversion deadlocks immediately when
2024  * they're created, even if they may not be immediately consequential.  If
2025  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2026  * mode that would prevent lkb1's conversion from being granted, we do a
2027  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2028  * I think this means that the lkb_is_ahead condition below should always
2029  * be zero, i.e. there will never be conv-deadlk between two locks that are
2030  * both already on the convert queue.
2031  */
2032
2033 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2034 {
2035         struct dlm_lkb *lkb1;
2036         int lkb_is_ahead = 0;
2037
2038         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2039                 if (lkb1 == lkb2) {
2040                         lkb_is_ahead = 1;
2041                         continue;
2042                 }
2043
2044                 if (!lkb_is_ahead) {
2045                         if (!modes_compat(lkb2, lkb1))
2046                                 return 1;
2047                 } else {
2048                         if (!modes_compat(lkb2, lkb1) &&
2049                             !modes_compat(lkb1, lkb2))
2050                                 return 1;
2051                 }
2052         }
2053         return 0;
2054 }
2055
2056 /*
2057  * Return 1 if the lock can be granted, 0 otherwise.
2058  * Also detect and resolve conversion deadlocks.
2059  *
2060  * lkb is the lock to be granted
2061  *
2062  * now is 1 if the function is being called in the context of the
2063  * immediate request, it is 0 if called later, after the lock has been
2064  * queued.
2065  *
2066  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2067  * after recovery.
2068  *
2069  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2070  */
2071
2072 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2073                            int recover)
2074 {
2075         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2076
2077         /*
2078          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2079          * a new request for a NL mode lock being blocked.
2080          *
2081          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2082          * request, then it would be granted.  In essence, the use of this flag
2083          * tells the Lock Manager to expedite theis request by not considering
2084          * what may be in the CONVERTING or WAITING queues...  As of this
2085          * writing, the EXPEDITE flag can be used only with new requests for NL
2086          * mode locks.  This flag is not valid for conversion requests.
2087          *
2088          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2089          * conversion or used with a non-NL requested mode.  We also know an
2090          * EXPEDITE request is always granted immediately, so now must always
2091          * be 1.  The full condition to grant an expedite request: (now &&
2092          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2093          * therefore be shortened to just checking the flag.
2094          */
2095
2096         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2097                 return 1;
2098
2099         /*
2100          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2101          * added to the remaining conditions.
2102          */
2103
2104         if (queue_conflict(&r->res_grantqueue, lkb))
2105                 return 0;
2106
2107         /*
2108          * 6-3: By default, a conversion request is immediately granted if the
2109          * requested mode is compatible with the modes of all other granted
2110          * locks
2111          */
2112
2113         if (queue_conflict(&r->res_convertqueue, lkb))
2114                 return 0;
2115
2116         /*
2117          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2118          * locks for a recovered rsb, on which lkb's have been rebuilt.
2119          * The lkb's may have been rebuilt on the queues in a different
2120          * order than they were in on the previous master.  So, granting
2121          * queued conversions in order after recovery doesn't make sense
2122          * since the order hasn't been preserved anyway.  The new order
2123          * could also have created a new "in place" conversion deadlock.
2124          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2125          * After recovery, there would be no granted locks, and possibly
2126          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2127          * recovery, grant conversions without considering order.
2128          */
2129
2130         if (conv && recover)
2131                 return 1;
2132
2133         /*
2134          * 6-5: But the default algorithm for deciding whether to grant or
2135          * queue conversion requests does not by itself guarantee that such
2136          * requests are serviced on a "first come first serve" basis.  This, in
2137          * turn, can lead to a phenomenon known as "indefinate postponement".
2138          *
2139          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2140          * the system service employed to request a lock conversion.  This flag
2141          * forces certain conversion requests to be queued, even if they are
2142          * compatible with the granted modes of other locks on the same
2143          * resource.  Thus, the use of this flag results in conversion requests
2144          * being ordered on a "first come first servce" basis.
2145          *
2146          * DCT: This condition is all about new conversions being able to occur
2147          * "in place" while the lock remains on the granted queue (assuming
2148          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2149          * doesn't _have_ to go onto the convert queue where it's processed in
2150          * order.  The "now" variable is necessary to distinguish converts
2151          * being received and processed for the first time now, because once a
2152          * convert is moved to the conversion queue the condition below applies
2153          * requiring fifo granting.
2154          */
2155
2156         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2157                 return 1;
2158
2159         /*
2160          * Even if the convert is compat with all granted locks,
2161          * QUECVT forces it behind other locks on the convert queue.
2162          */
2163
2164         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2165                 if (list_empty(&r->res_convertqueue))
2166                         return 1;
2167                 else
2168                         return 0;
2169         }
2170
2171         /*
2172          * The NOORDER flag is set to avoid the standard vms rules on grant
2173          * order.
2174          */
2175
2176         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2177                 return 1;
2178
2179         /*
2180          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2181          * granted until all other conversion requests ahead of it are granted
2182          * and/or canceled.
2183          */
2184
2185         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2186                 return 1;
2187
2188         /*
2189          * 6-4: By default, a new request is immediately granted only if all
2190          * three of the following conditions are satisfied when the request is
2191          * issued:
2192          * - The queue of ungranted conversion requests for the resource is
2193          *   empty.
2194          * - The queue of ungranted new requests for the resource is empty.
2195          * - The mode of the new request is compatible with the most
2196          *   restrictive mode of all granted locks on the resource.
2197          */
2198
2199         if (now && !conv && list_empty(&r->res_convertqueue) &&
2200             list_empty(&r->res_waitqueue))
2201                 return 1;
2202
2203         /*
2204          * 6-4: Once a lock request is in the queue of ungranted new requests,
2205          * it cannot be granted until the queue of ungranted conversion
2206          * requests is empty, all ungranted new requests ahead of it are
2207          * granted and/or canceled, and it is compatible with the granted mode
2208          * of the most restrictive lock granted on the resource.
2209          */
2210
2211         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2212             first_in_list(lkb, &r->res_waitqueue))
2213                 return 1;
2214
2215         return 0;
2216 }
2217
2218 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2219                           int recover, int *err)
2220 {
2221         int rv;
2222         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2223         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2224
2225         if (err)
2226                 *err = 0;
2227
2228         rv = _can_be_granted(r, lkb, now, recover);
2229         if (rv)
2230                 goto out;
2231
2232         /*
2233          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2234          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2235          * cancels one of the locks.
2236          */
2237
2238         if (is_convert && can_be_queued(lkb) &&
2239             conversion_deadlock_detect(r, lkb)) {
2240                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2241                         lkb->lkb_grmode = DLM_LOCK_NL;
2242                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2243                 } else if (err) {
2244                         *err = -EDEADLK;
2245                 } else {
2246                         log_print("can_be_granted deadlock %x now %d",
2247                                   lkb->lkb_id, now);
2248                         dlm_dump_rsb(r);
2249                 }
2250                 goto out;
2251         }
2252
2253         /*
2254          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2255          * to grant a request in a mode other than the normal rqmode.  It's a
2256          * simple way to provide a big optimization to applications that can
2257          * use them.
2258          */
2259
2260         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2261                 alt = DLM_LOCK_PR;
2262         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2263                 alt = DLM_LOCK_CW;
2264
2265         if (alt) {
2266                 lkb->lkb_rqmode = alt;
2267                 rv = _can_be_granted(r, lkb, now, 0);
2268                 if (rv)
2269                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2270                 else
2271                         lkb->lkb_rqmode = rqmode;
2272         }
2273  out:
2274         return rv;
2275 }
2276
2277 /* Returns the highest requested mode of all blocked conversions; sets
2278    cw if there's a blocked conversion to DLM_LOCK_CW. */
2279
2280 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2281                                  unsigned int *count)
2282 {
2283         struct dlm_lkb *lkb, *s;
2284         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2285         int hi, demoted, quit, grant_restart, demote_restart;
2286         int deadlk;
2287
2288         quit = 0;
2289  restart:
2290         grant_restart = 0;
2291         demote_restart = 0;
2292         hi = DLM_LOCK_IV;
2293
2294         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2295                 demoted = is_demoted(lkb);
2296                 deadlk = 0;
2297
2298                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2299                         grant_lock_pending(r, lkb);
2300                         grant_restart = 1;
2301                         if (count)
2302                                 (*count)++;
2303                         continue;
2304                 }
2305
2306                 if (!demoted && is_demoted(lkb)) {
2307                         log_print("WARN: pending demoted %x node %d %s",
2308                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2309                         demote_restart = 1;
2310                         continue;
2311                 }
2312
2313                 if (deadlk) {
2314                         /*
2315                          * If DLM_LKB_NODLKWT flag is set and conversion
2316                          * deadlock is detected, we request blocking AST and
2317                          * down (or cancel) conversion.
2318                          */
2319                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2320                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2321                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2322                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2323                                 }
2324                         } else {
2325                                 log_print("WARN: pending deadlock %x node %d %s",
2326                                           lkb->lkb_id, lkb->lkb_nodeid,
2327                                           r->res_name);
2328                                 dlm_dump_rsb(r);
2329                         }
2330                         continue;
2331                 }
2332
2333                 hi = max_t(int, lkb->lkb_rqmode, hi);
2334
2335                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2336                         *cw = 1;
2337         }
2338
2339         if (grant_restart)
2340                 goto restart;
2341         if (demote_restart && !quit) {
2342                 quit = 1;
2343                 goto restart;
2344         }
2345
2346         return max_t(int, high, hi);
2347 }
2348
2349 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2350                               unsigned int *count)
2351 {
2352         struct dlm_lkb *lkb, *s;
2353
2354         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2355                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2356                         grant_lock_pending(r, lkb);
2357                         if (count)
2358                                 (*count)++;
2359                 } else {
2360                         high = max_t(int, lkb->lkb_rqmode, high);
2361                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2362                                 *cw = 1;
2363                 }
2364         }
2365
2366         return high;
2367 }
2368
2369 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2370    on either the convert or waiting queue.
2371    high is the largest rqmode of all locks blocked on the convert or
2372    waiting queue. */
2373
2374 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2375 {
2376         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2377                 if (gr->lkb_highbast < DLM_LOCK_EX)
2378                         return 1;
2379                 return 0;
2380         }
2381
2382         if (gr->lkb_highbast < high &&
2383             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2384                 return 1;
2385         return 0;
2386 }
2387
2388 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2389 {
2390         struct dlm_lkb *lkb, *s;
2391         int high = DLM_LOCK_IV;
2392         int cw = 0;
2393
2394         if (!is_master(r)) {
2395                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2396                 dlm_dump_rsb(r);
2397                 return;
2398         }
2399
2400         high = grant_pending_convert(r, high, &cw, count);
2401         high = grant_pending_wait(r, high, &cw, count);
2402
2403         if (high == DLM_LOCK_IV)
2404                 return;
2405
2406         /*
2407          * If there are locks left on the wait/convert queue then send blocking
2408          * ASTs to granted locks based on the largest requested mode (high)
2409          * found above.
2410          */
2411
2412         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2413                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2414                         if (cw && high == DLM_LOCK_PR &&
2415                             lkb->lkb_grmode == DLM_LOCK_PR)
2416                                 queue_bast(r, lkb, DLM_LOCK_CW);
2417                         else
2418                                 queue_bast(r, lkb, high);
2419                         lkb->lkb_highbast = high;
2420                 }
2421         }
2422 }
2423
2424 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2425 {
2426         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2427             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2428                 if (gr->lkb_highbast < DLM_LOCK_EX)
2429                         return 1;
2430                 return 0;
2431         }
2432
2433         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2434                 return 1;
2435         return 0;
2436 }
2437
2438 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2439                             struct dlm_lkb *lkb)
2440 {
2441         struct dlm_lkb *gr;
2442
2443         list_for_each_entry(gr, head, lkb_statequeue) {
2444                 /* skip self when sending basts to convertqueue */
2445                 if (gr == lkb)
2446                         continue;
2447                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2448                         queue_bast(r, gr, lkb->lkb_rqmode);
2449                         gr->lkb_highbast = lkb->lkb_rqmode;
2450                 }
2451         }
2452 }
2453
2454 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2455 {
2456         send_bast_queue(r, &r->res_grantqueue, lkb);
2457 }
2458
2459 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2460 {
2461         send_bast_queue(r, &r->res_grantqueue, lkb);
2462         send_bast_queue(r, &r->res_convertqueue, lkb);
2463 }
2464
2465 /* set_master(r, lkb) -- set the master nodeid of a resource
2466
2467    The purpose of this function is to set the nodeid field in the given
2468    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2469    known, it can just be copied to the lkb and the function will return
2470    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2471    before it can be copied to the lkb.
2472
2473    When the rsb nodeid is being looked up remotely, the initial lkb
2474    causing the lookup is kept on the ls_waiters list waiting for the
2475    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2476    on the rsb's res_lookup list until the master is verified.
2477
2478    Return values:
2479    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2480    1: the rsb master is not available and the lkb has been placed on
2481       a wait queue
2482 */
2483
2484 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2485 {
2486         int our_nodeid = dlm_our_nodeid();
2487
2488         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2489                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2490                 r->res_first_lkid = lkb->lkb_id;
2491                 lkb->lkb_nodeid = r->res_nodeid;
2492                 return 0;
2493         }
2494
2495         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2496                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2497                 return 1;
2498         }
2499
2500         if (r->res_master_nodeid == our_nodeid) {
2501                 lkb->lkb_nodeid = 0;
2502                 return 0;
2503         }
2504
2505         if (r->res_master_nodeid) {
2506                 lkb->lkb_nodeid = r->res_master_nodeid;
2507                 return 0;
2508         }
2509
2510         if (dlm_dir_nodeid(r) == our_nodeid) {
2511                 /* This is a somewhat unusual case; find_rsb will usually
2512                    have set res_master_nodeid when dir nodeid is local, but
2513                    there are cases where we become the dir node after we've
2514                    past find_rsb and go through _request_lock again.
2515                    confirm_master() or process_lookup_list() needs to be
2516                    called after this. */
2517                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2518                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2519                           r->res_name);
2520                 r->res_master_nodeid = our_nodeid;
2521                 r->res_nodeid = 0;
2522                 lkb->lkb_nodeid = 0;
2523                 return 0;
2524         }
2525
2526         r->res_first_lkid = lkb->lkb_id;
2527         send_lookup(r, lkb);
2528         return 1;
2529 }
2530
2531 static void process_lookup_list(struct dlm_rsb *r)
2532 {
2533         struct dlm_lkb *lkb, *safe;
2534
2535         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2536                 list_del_init(&lkb->lkb_rsb_lookup);
2537                 _request_lock(r, lkb);
2538                 schedule();
2539         }
2540 }
2541
2542 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2543
2544 static void confirm_master(struct dlm_rsb *r, int error)
2545 {
2546         struct dlm_lkb *lkb;
2547
2548         if (!r->res_first_lkid)
2549                 return;
2550
2551         switch (error) {
2552         case 0:
2553         case -EINPROGRESS:
2554                 r->res_first_lkid = 0;
2555                 process_lookup_list(r);
2556                 break;
2557
2558         case -EAGAIN:
2559         case -EBADR:
2560         case -ENOTBLK:
2561                 /* the remote request failed and won't be retried (it was
2562                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2563                    lkb the first_lkid */
2564
2565                 r->res_first_lkid = 0;
2566
2567                 if (!list_empty(&r->res_lookup)) {
2568                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2569                                          lkb_rsb_lookup);
2570                         list_del_init(&lkb->lkb_rsb_lookup);
2571                         r->res_first_lkid = lkb->lkb_id;
2572                         _request_lock(r, lkb);
2573                 }
2574                 break;
2575
2576         default:
2577                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2578         }
2579 }
2580
2581 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2582                          int namelen, void (*ast)(void *astparam),
2583                          void *astparam,
2584                          void (*bast)(void *astparam, int mode),
2585                          struct dlm_args *args)
2586 {
2587         int rv = -EINVAL;
2588
2589         /* check for invalid arg usage */
2590
2591         if (mode < 0 || mode > DLM_LOCK_EX)
2592                 goto out;
2593
2594         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2595                 goto out;
2596
2597         if (flags & DLM_LKF_CANCEL)
2598                 goto out;
2599
2600         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2601                 goto out;
2602
2603         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2604                 goto out;
2605
2606         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2607                 goto out;
2608
2609         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2610                 goto out;
2611
2612         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2613                 goto out;
2614
2615         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2616                 goto out;
2617
2618         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2619                 goto out;
2620
2621         if (!ast || !lksb)
2622                 goto out;
2623
2624         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2625                 goto out;
2626
2627         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2628                 goto out;
2629
2630         /* these args will be copied to the lkb in validate_lock_args,
2631            it cannot be done now because when converting locks, fields in
2632            an active lkb cannot be modified before locking the rsb */
2633
2634         args->flags = flags;
2635         args->astfn = ast;
2636         args->astparam = astparam;
2637         args->bastfn = bast;
2638         args->mode = mode;
2639         args->lksb = lksb;
2640         rv = 0;
2641  out:
2642         return rv;
2643 }
2644
2645 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2646 {
2647         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2648                       DLM_LKF_FORCEUNLOCK))
2649                 return -EINVAL;
2650
2651         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2652                 return -EINVAL;
2653
2654         args->flags = flags;
2655         args->astparam = astarg;
2656         return 0;
2657 }
2658
2659 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2660                               struct dlm_args *args)
2661 {
2662         int rv = -EBUSY;
2663
2664         if (args->flags & DLM_LKF_CONVERT) {
2665                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2666                         goto out;
2667
2668                 /* lock not allowed if there's any op in progress */
2669                 if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
2670                         goto out;
2671
2672                 if (is_overlap(lkb))
2673                         goto out;
2674
2675                 rv = -EINVAL;
2676                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2677                         goto out;
2678
2679                 if (args->flags & DLM_LKF_QUECVT &&
2680                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2681                         goto out;
2682         }
2683
2684         lkb->lkb_exflags = args->flags;
2685         dlm_set_sbflags_val(lkb, 0);
2686         lkb->lkb_astfn = args->astfn;
2687         lkb->lkb_astparam = args->astparam;
2688         lkb->lkb_bastfn = args->bastfn;
2689         lkb->lkb_rqmode = args->mode;
2690         lkb->lkb_lksb = args->lksb;
2691         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2692         lkb->lkb_ownpid = (int) current->pid;
2693         rv = 0;
2694  out:
2695         switch (rv) {
2696         case 0:
2697                 break;
2698         case -EINVAL:
2699                 /* annoy the user because dlm usage is wrong */
2700                 WARN_ON(1);
2701                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2702                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2703                           lkb->lkb_status, lkb->lkb_wait_type,
2704                           lkb->lkb_resource->res_name);
2705                 break;
2706         default:
2707                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2708                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2709                           lkb->lkb_status, lkb->lkb_wait_type,
2710                           lkb->lkb_resource->res_name);
2711                 break;
2712         }
2713
2714         return rv;
2715 }
2716
2717 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2718    for success */
2719
2720 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2721    because there may be a lookup in progress and it's valid to do
2722    cancel/unlockf on it */
2723
2724 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2725 {
2726         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2727         int rv = -EBUSY;
2728
2729         /* normal unlock not allowed if there's any op in progress */
2730         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2731             (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
2732                 goto out;
2733
2734         /* an lkb may be waiting for an rsb lookup to complete where the
2735            lookup was initiated by another lock */
2736
2737         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2738                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2739                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2740                         list_del_init(&lkb->lkb_rsb_lookup);
2741                         queue_cast(lkb->lkb_resource, lkb,
2742                                    args->flags & DLM_LKF_CANCEL ?
2743                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2744                         unhold_lkb(lkb); /* undoes create_lkb() */
2745                 }
2746                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2747                 goto out;
2748         }
2749
2750         rv = -EINVAL;
2751         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2752                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2753                 dlm_print_lkb(lkb);
2754                 goto out;
2755         }
2756
2757         /* an lkb may still exist even though the lock is EOL'ed due to a
2758          * cancel, unlock or failed noqueue request; an app can't use these
2759          * locks; return same error as if the lkid had not been found at all
2760          */
2761
2762         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2763                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2764                 rv = -ENOENT;
2765                 goto out;
2766         }
2767
2768         /* cancel not allowed with another cancel/unlock in progress */
2769
2770         if (args->flags & DLM_LKF_CANCEL) {
2771                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2772                         goto out;
2773
2774                 if (is_overlap(lkb))
2775                         goto out;
2776
2777                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2778                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2779                         rv = -EBUSY;
2780                         goto out;
2781                 }
2782
2783                 /* there's nothing to cancel */
2784                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2785                     !lkb->lkb_wait_type) {
2786                         rv = -EBUSY;
2787                         goto out;
2788                 }
2789
2790                 switch (lkb->lkb_wait_type) {
2791                 case DLM_MSG_LOOKUP:
2792                 case DLM_MSG_REQUEST:
2793                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2794                         rv = -EBUSY;
2795                         goto out;
2796                 case DLM_MSG_UNLOCK:
2797                 case DLM_MSG_CANCEL:
2798                         goto out;
2799                 }
2800                 /* add_to_waiters() will set OVERLAP_CANCEL */
2801                 goto out_ok;
2802         }
2803
2804         /* do we need to allow a force-unlock if there's a normal unlock
2805            already in progress?  in what conditions could the normal unlock
2806            fail such that we'd want to send a force-unlock to be sure? */
2807
2808         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2809                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2810                         goto out;
2811
2812                 if (is_overlap_unlock(lkb))
2813                         goto out;
2814
2815                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2816                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2817                         rv = -EBUSY;
2818                         goto out;
2819                 }
2820
2821                 switch (lkb->lkb_wait_type) {
2822                 case DLM_MSG_LOOKUP:
2823                 case DLM_MSG_REQUEST:
2824                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2825                         rv = -EBUSY;
2826                         goto out;
2827                 case DLM_MSG_UNLOCK:
2828                         goto out;
2829                 }
2830                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2831         }
2832
2833  out_ok:
2834         /* an overlapping op shouldn't blow away exflags from other op */
2835         lkb->lkb_exflags |= args->flags;
2836         dlm_set_sbflags_val(lkb, 0);
2837         lkb->lkb_astparam = args->astparam;
2838         rv = 0;
2839  out:
2840         switch (rv) {
2841         case 0:
2842                 break;
2843         case -EINVAL:
2844                 /* annoy the user because dlm usage is wrong */
2845                 WARN_ON(1);
2846                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2847                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2848                           args->flags, lkb->lkb_wait_type,
2849                           lkb->lkb_resource->res_name);
2850                 break;
2851         default:
2852                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2853                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2854                           args->flags, lkb->lkb_wait_type,
2855                           lkb->lkb_resource->res_name);
2856                 break;
2857         }
2858
2859         return rv;
2860 }
2861
2862 /*
2863  * Four stage 4 varieties:
2864  * do_request(), do_convert(), do_unlock(), do_cancel()
2865  * These are called on the master node for the given lock and
2866  * from the central locking logic.
2867  */
2868
2869 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2870 {
2871         int error = 0;
2872
2873         if (can_be_granted(r, lkb, 1, 0, NULL)) {
2874                 grant_lock(r, lkb);
2875                 queue_cast(r, lkb, 0);
2876                 goto out;
2877         }
2878
2879         if (can_be_queued(lkb)) {
2880                 error = -EINPROGRESS;
2881                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2882                 goto out;
2883         }
2884
2885         error = -EAGAIN;
2886         queue_cast(r, lkb, -EAGAIN);
2887  out:
2888         return error;
2889 }
2890
2891 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2892                                int error)
2893 {
2894         switch (error) {
2895         case -EAGAIN:
2896                 if (force_blocking_asts(lkb))
2897                         send_blocking_asts_all(r, lkb);
2898                 break;
2899         case -EINPROGRESS:
2900                 send_blocking_asts(r, lkb);
2901                 break;
2902         }
2903 }
2904
2905 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906 {
2907         int error = 0;
2908         int deadlk = 0;
2909
2910         /* changing an existing lock may allow others to be granted */
2911
2912         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2913                 grant_lock(r, lkb);
2914                 queue_cast(r, lkb, 0);
2915                 goto out;
2916         }
2917
2918         /* can_be_granted() detected that this lock would block in a conversion
2919            deadlock, so we leave it on the granted queue and return EDEADLK in
2920            the ast for the convert. */
2921
2922         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2923                 /* it's left on the granted queue */
2924                 revert_lock(r, lkb);
2925                 queue_cast(r, lkb, -EDEADLK);
2926                 error = -EDEADLK;
2927                 goto out;
2928         }
2929
2930         /* is_demoted() means the can_be_granted() above set the grmode
2931            to NL, and left us on the granted queue.  This auto-demotion
2932            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2933            now grantable.  We have to try to grant other converting locks
2934            before we try again to grant this one. */
2935
2936         if (is_demoted(lkb)) {
2937                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2938                 if (_can_be_granted(r, lkb, 1, 0)) {
2939                         grant_lock(r, lkb);
2940                         queue_cast(r, lkb, 0);
2941                         goto out;
2942                 }
2943                 /* else fall through and move to convert queue */
2944         }
2945
2946         if (can_be_queued(lkb)) {
2947                 error = -EINPROGRESS;
2948                 del_lkb(r, lkb);
2949                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2950                 goto out;
2951         }
2952
2953         error = -EAGAIN;
2954         queue_cast(r, lkb, -EAGAIN);
2955  out:
2956         return error;
2957 }
2958
2959 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2960                                int error)
2961 {
2962         switch (error) {
2963         case 0:
2964                 grant_pending_locks(r, NULL);
2965                 /* grant_pending_locks also sends basts */
2966                 break;
2967         case -EAGAIN:
2968                 if (force_blocking_asts(lkb))
2969                         send_blocking_asts_all(r, lkb);
2970                 break;
2971         case -EINPROGRESS:
2972                 send_blocking_asts(r, lkb);
2973                 break;
2974         }
2975 }
2976
2977 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2978 {
2979         remove_lock(r, lkb);
2980         queue_cast(r, lkb, -DLM_EUNLOCK);
2981         return -DLM_EUNLOCK;
2982 }
2983
2984 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2985                               int error)
2986 {
2987         grant_pending_locks(r, NULL);
2988 }
2989
2990 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2991
2992 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2993 {
2994         int error;
2995
2996         error = revert_lock(r, lkb);
2997         if (error) {
2998                 queue_cast(r, lkb, -DLM_ECANCEL);
2999                 return -DLM_ECANCEL;
3000         }
3001         return 0;
3002 }
3003
3004 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3005                               int error)
3006 {
3007         if (error)
3008                 grant_pending_locks(r, NULL);
3009 }
3010
3011 /*
3012  * Four stage 3 varieties:
3013  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3014  */
3015
3016 /* add a new lkb to a possibly new rsb, called by requesting process */
3017
3018 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3019 {
3020         int error;
3021
3022         /* set_master: sets lkb nodeid from r */
3023
3024         error = set_master(r, lkb);
3025         if (error < 0)
3026                 goto out;
3027         if (error) {
3028                 error = 0;
3029                 goto out;
3030         }
3031
3032         if (is_remote(r)) {
3033                 /* receive_request() calls do_request() on remote node */
3034                 error = send_request(r, lkb);
3035         } else {
3036                 error = do_request(r, lkb);
3037                 /* for remote locks the request_reply is sent
3038                    between do_request and do_request_effects */
3039                 do_request_effects(r, lkb, error);
3040         }
3041  out:
3042         return error;
3043 }
3044
3045 /* change some property of an existing lkb, e.g. mode */
3046
3047 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 {
3049         int error;
3050
3051         if (is_remote(r)) {
3052                 /* receive_convert() calls do_convert() on remote node */
3053                 error = send_convert(r, lkb);
3054         } else {
3055                 error = do_convert(r, lkb);
3056                 /* for remote locks the convert_reply is sent
3057                    between do_convert and do_convert_effects */
3058                 do_convert_effects(r, lkb, error);
3059         }
3060
3061         return error;
3062 }
3063
3064 /* remove an existing lkb from the granted queue */
3065
3066 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067 {
3068         int error;
3069
3070         if (is_remote(r)) {
3071                 /* receive_unlock() calls do_unlock() on remote node */
3072                 error = send_unlock(r, lkb);
3073         } else {
3074                 error = do_unlock(r, lkb);
3075                 /* for remote locks the unlock_reply is sent
3076                    between do_unlock and do_unlock_effects */
3077                 do_unlock_effects(r, lkb, error);
3078         }
3079
3080         return error;
3081 }
3082
3083 /* remove an existing lkb from the convert or wait queue */
3084
3085 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3086 {
3087         int error;
3088
3089         if (is_remote(r)) {
3090                 /* receive_cancel() calls do_cancel() on remote node */
3091                 error = send_cancel(r, lkb);
3092         } else {
3093                 error = do_cancel(r, lkb);
3094                 /* for remote locks the cancel_reply is sent
3095                    between do_cancel and do_cancel_effects */
3096                 do_cancel_effects(r, lkb, error);
3097         }
3098
3099         return error;
3100 }
3101
3102 /*
3103  * Four stage 2 varieties:
3104  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3105  */
3106
3107 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3108                         const void *name, int len,
3109                         struct dlm_args *args)
3110 {
3111         struct dlm_rsb *r;
3112         int error;
3113
3114         error = validate_lock_args(ls, lkb, args);
3115         if (error)
3116                 return error;
3117
3118         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3119         if (error)
3120                 return error;
3121
3122         lock_rsb(r);
3123
3124         attach_lkb(r, lkb);
3125         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3126
3127         error = _request_lock(r, lkb);
3128
3129         unlock_rsb(r);
3130         put_rsb(r);
3131         return error;
3132 }
3133
3134 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3135                         struct dlm_args *args)
3136 {
3137         struct dlm_rsb *r;
3138         int error;
3139
3140         r = lkb->lkb_resource;
3141
3142         hold_rsb(r);
3143         lock_rsb(r);
3144
3145         error = validate_lock_args(ls, lkb, args);
3146         if (error)
3147                 goto out;
3148
3149         error = _convert_lock(r, lkb);
3150  out:
3151         unlock_rsb(r);
3152         put_rsb(r);
3153         return error;
3154 }
3155
3156 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3157                        struct dlm_args *args)
3158 {
3159         struct dlm_rsb *r;
3160         int error;
3161
3162         r = lkb->lkb_resource;
3163
3164         hold_rsb(r);
3165         lock_rsb(r);
3166
3167         error = validate_unlock_args(lkb, args);
3168         if (error)
3169                 goto out;
3170
3171         error = _unlock_lock(r, lkb);
3172  out:
3173         unlock_rsb(r);
3174         put_rsb(r);
3175         return error;
3176 }
3177
3178 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3179                        struct dlm_args *args)
3180 {
3181         struct dlm_rsb *r;
3182         int error;
3183
3184         r = lkb->lkb_resource;
3185
3186         hold_rsb(r);
3187         lock_rsb(r);
3188
3189         error = validate_unlock_args(lkb, args);
3190         if (error)
3191                 goto out;
3192
3193         error = _cancel_lock(r, lkb);
3194  out:
3195         unlock_rsb(r);
3196         put_rsb(r);
3197         return error;
3198 }
3199
3200 /*
3201  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3202  */
3203
3204 int dlm_lock(dlm_lockspace_t *lockspace,
3205              int mode,
3206              struct dlm_lksb *lksb,
3207              uint32_t flags,
3208              const void *name,
3209              unsigned int namelen,
3210              uint32_t parent_lkid,
3211              void (*ast) (void *astarg),
3212              void *astarg,
3213              void (*bast) (void *astarg, int mode))
3214 {
3215         struct dlm_ls *ls;
3216         struct dlm_lkb *lkb;
3217         struct dlm_args args;
3218         int error, convert = flags & DLM_LKF_CONVERT;
3219
3220         ls = dlm_find_lockspace_local(lockspace);
3221         if (!ls)
3222                 return -EINVAL;
3223
3224         dlm_lock_recovery(ls);
3225
3226         if (convert)
3227                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3228         else
3229                 error = create_lkb(ls, &lkb);
3230
3231         if (error)
3232                 goto out;
3233
3234         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3235
3236         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3237                               &args);
3238         if (error)
3239                 goto out_put;
3240
3241         if (convert)
3242                 error = convert_lock(ls, lkb, &args);
3243         else
3244                 error = request_lock(ls, lkb, name, namelen, &args);
3245
3246         if (error == -EINPROGRESS)
3247                 error = 0;
3248  out_put:
3249         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3250
3251         if (convert || error)
3252                 __put_lkb(ls, lkb);
3253         if (error == -EAGAIN || error == -EDEADLK)
3254                 error = 0;
3255  out:
3256         dlm_unlock_recovery(ls);
3257         dlm_put_lockspace(ls);
3258         return error;
3259 }
3260
3261 int dlm_unlock(dlm_lockspace_t *lockspace,
3262                uint32_t lkid,
3263                uint32_t flags,
3264                struct dlm_lksb *lksb,
3265                void *astarg)
3266 {
3267         struct dlm_ls *ls;
3268         struct dlm_lkb *lkb;
3269         struct dlm_args args;
3270         int error;
3271
3272         ls = dlm_find_lockspace_local(lockspace);
3273         if (!ls)
3274                 return -EINVAL;
3275
3276         dlm_lock_recovery(ls);
3277
3278         error = find_lkb(ls, lkid, &lkb);
3279         if (error)
3280                 goto out;
3281
3282         trace_dlm_unlock_start(ls, lkb, flags);
3283
3284         error = set_unlock_args(flags, astarg, &args);
3285         if (error)
3286                 goto out_put;
3287
3288         if (flags & DLM_LKF_CANCEL)
3289                 error = cancel_lock(ls, lkb, &args);
3290         else
3291                 error = unlock_lock(ls, lkb, &args);
3292
3293         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3294                 error = 0;
3295         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3296                 error = 0;
3297  out_put:
3298         trace_dlm_unlock_end(ls, lkb, flags, error);
3299
3300         dlm_put_lkb(lkb);
3301  out:
3302         dlm_unlock_recovery(ls);
3303         dlm_put_lockspace(ls);
3304         return error;
3305 }
3306
3307 /*
3308  * send/receive routines for remote operations and replies
3309  *
3310  * send_args
3311  * send_common
3312  * send_request                 receive_request
3313  * send_convert                 receive_convert
3314  * send_unlock                  receive_unlock
3315  * send_cancel                  receive_cancel
3316  * send_grant                   receive_grant
3317  * send_bast                    receive_bast
3318  * send_lookup                  receive_lookup
3319  * send_remove                  receive_remove
3320  *
3321  *                              send_common_reply
3322  * receive_request_reply        send_request_reply
3323  * receive_convert_reply        send_convert_reply
3324  * receive_unlock_reply         send_unlock_reply
3325  * receive_cancel_reply         send_cancel_reply
3326  * receive_lookup_reply         send_lookup_reply
3327  */
3328
3329 static int _create_message(struct dlm_ls *ls, int mb_len,
3330                            int to_nodeid, int mstype,
3331                            struct dlm_message **ms_ret,
3332                            struct dlm_mhandle **mh_ret,
3333                            gfp_t allocation)
3334 {
3335         struct dlm_message *ms;
3336         struct dlm_mhandle *mh;
3337         char *mb;
3338
3339         /* get_buffer gives us a message handle (mh) that we need to
3340            pass into midcomms_commit and a message buffer (mb) that we
3341            write our data into */
3342
3343         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
3344         if (!mh)
3345                 return -ENOBUFS;
3346
3347         ms = (struct dlm_message *) mb;
3348
3349         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3350         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3351         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3352         ms->m_header.h_length = cpu_to_le16(mb_len);
3353         ms->m_header.h_cmd = DLM_MSG;
3354
3355         ms->m_type = cpu_to_le32(mstype);
3356
3357         *mh_ret = mh;
3358         *ms_ret = ms;
3359         return 0;
3360 }
3361
3362 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3363                           int to_nodeid, int mstype,
3364                           struct dlm_message **ms_ret,
3365                           struct dlm_mhandle **mh_ret,
3366                           gfp_t allocation)
3367 {
3368         int mb_len = sizeof(struct dlm_message);
3369
3370         switch (mstype) {
3371         case DLM_MSG_REQUEST:
3372         case DLM_MSG_LOOKUP:
3373         case DLM_MSG_REMOVE:
3374                 mb_len += r->res_length;
3375                 break;
3376         case DLM_MSG_CONVERT:
3377         case DLM_MSG_UNLOCK:
3378         case DLM_MSG_REQUEST_REPLY:
3379         case DLM_MSG_CONVERT_REPLY:
3380         case DLM_MSG_GRANT:
3381                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3382                         mb_len += r->res_ls->ls_lvblen;
3383                 break;
3384         }
3385
3386         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3387                                ms_ret, mh_ret, allocation);
3388 }
3389
3390 /* further lowcomms enhancements or alternate implementations may make
3391    the return value from this function useful at some point */
3392
3393 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3394                         const void *name, int namelen)
3395 {
3396         dlm_midcomms_commit_mhandle(mh, name, namelen);
3397         return 0;
3398 }
3399
3400 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3401                       struct dlm_message *ms)
3402 {
3403         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3404         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3405         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3406         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3407         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3408         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3409         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3410         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3411         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3412         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3413         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3414         ms->m_hash     = cpu_to_le32(r->res_hash);
3415
3416         /* m_result and m_bastmode are set from function args,
3417            not from lkb fields */
3418
3419         if (lkb->lkb_bastfn)
3420                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3421         if (lkb->lkb_astfn)
3422                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3423
3424         /* compare with switch in create_message; send_remove() doesn't
3425            use send_args() */
3426
3427         switch (ms->m_type) {
3428         case cpu_to_le32(DLM_MSG_REQUEST):
3429         case cpu_to_le32(DLM_MSG_LOOKUP):
3430                 memcpy(ms->m_extra, r->res_name, r->res_length);
3431                 break;
3432         case cpu_to_le32(DLM_MSG_CONVERT):
3433         case cpu_to_le32(DLM_MSG_UNLOCK):
3434         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3435         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3436         case cpu_to_le32(DLM_MSG_GRANT):
3437                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3438                         break;
3439                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3440                 break;
3441         }
3442 }
3443
3444 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3445 {
3446         struct dlm_message *ms;
3447         struct dlm_mhandle *mh;
3448         int to_nodeid, error;
3449
3450         to_nodeid = r->res_nodeid;
3451
3452         error = add_to_waiters(lkb, mstype, to_nodeid);
3453         if (error)
3454                 return error;
3455
3456         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3457         if (error)
3458                 goto fail;
3459
3460         send_args(r, lkb, ms);
3461
3462         error = send_message(mh, ms, r->res_name, r->res_length);
3463         if (error)
3464                 goto fail;
3465         return 0;
3466
3467  fail:
3468         remove_from_waiters(lkb, msg_reply_type(mstype));
3469         return error;
3470 }
3471
3472 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3473 {
3474         return send_common(r, lkb, DLM_MSG_REQUEST);
3475 }
3476
3477 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3478 {
3479         int error;
3480
3481         error = send_common(r, lkb, DLM_MSG_CONVERT);
3482
3483         /* down conversions go without a reply from the master */
3484         if (!error && down_conversion(lkb)) {
3485                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3486                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3487                 r->res_ls->ls_local_ms.m_result = 0;
3488                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3489         }
3490
3491         return error;
3492 }
3493
3494 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3495    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3496    that the master is still correct. */
3497
3498 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3499 {
3500         return send_common(r, lkb, DLM_MSG_UNLOCK);
3501 }
3502
3503 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3504 {
3505         return send_common(r, lkb, DLM_MSG_CANCEL);
3506 }
3507
3508 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3509 {
3510         struct dlm_message *ms;
3511         struct dlm_mhandle *mh;
3512         int to_nodeid, error;
3513
3514         to_nodeid = lkb->lkb_nodeid;
3515
3516         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3517                                GFP_NOFS);
3518         if (error)
3519                 goto out;
3520
3521         send_args(r, lkb, ms);
3522
3523         ms->m_result = 0;
3524
3525         error = send_message(mh, ms, r->res_name, r->res_length);
3526  out:
3527         return error;
3528 }
3529
3530 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3531 {
3532         struct dlm_message *ms;
3533         struct dlm_mhandle *mh;
3534         int to_nodeid, error;
3535
3536         to_nodeid = lkb->lkb_nodeid;
3537
3538         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
3539                                GFP_NOFS);
3540         if (error)
3541                 goto out;
3542
3543         send_args(r, lkb, ms);
3544
3545         ms->m_bastmode = cpu_to_le32(mode);
3546
3547         error = send_message(mh, ms, r->res_name, r->res_length);
3548  out:
3549         return error;
3550 }
3551
3552 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3553 {
3554         struct dlm_message *ms;
3555         struct dlm_mhandle *mh;
3556         int to_nodeid, error;
3557
3558         to_nodeid = dlm_dir_nodeid(r);
3559
3560         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3561         if (error)
3562                 return error;
3563
3564         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
3565                                GFP_NOFS);
3566         if (error)
3567                 goto fail;
3568
3569         send_args(r, lkb, ms);
3570
3571         error = send_message(mh, ms, r->res_name, r->res_length);
3572         if (error)
3573                 goto fail;
3574         return 0;
3575
3576  fail:
3577         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3578         return error;
3579 }
3580
3581 static int send_remove(struct dlm_rsb *r)
3582 {
3583         struct dlm_message *ms;
3584         struct dlm_mhandle *mh;
3585         int to_nodeid, error;
3586
3587         to_nodeid = dlm_dir_nodeid(r);
3588
3589         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
3590                                GFP_ATOMIC);
3591         if (error)
3592                 goto out;
3593
3594         memcpy(ms->m_extra, r->res_name, r->res_length);
3595         ms->m_hash = cpu_to_le32(r->res_hash);
3596
3597         error = send_message(mh, ms, r->res_name, r->res_length);
3598  out:
3599         return error;
3600 }
3601
3602 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3603                              int mstype, int rv)
3604 {
3605         struct dlm_message *ms;
3606         struct dlm_mhandle *mh;
3607         int to_nodeid, error;
3608
3609         to_nodeid = lkb->lkb_nodeid;
3610
3611         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3612         if (error)
3613                 goto out;
3614
3615         send_args(r, lkb, ms);
3616
3617         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3618
3619         error = send_message(mh, ms, r->res_name, r->res_length);
3620  out:
3621         return error;
3622 }
3623
3624 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3625 {
3626         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3627 }
3628
3629 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3630 {
3631         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3632 }
3633
3634 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3635 {
3636         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3637 }
3638
3639 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3640 {
3641         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3642 }
3643
3644 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3645                              int ret_nodeid, int rv)
3646 {
3647         struct dlm_rsb *r = &ls->ls_local_rsb;
3648         struct dlm_message *ms;
3649         struct dlm_mhandle *mh;
3650         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3651
3652         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3653                                GFP_NOFS);
3654         if (error)
3655                 goto out;
3656
3657         ms->m_lkid = ms_in->m_lkid;
3658         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3659         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3660
3661         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3662  out:
3663         return error;
3664 }
3665
3666 /* which args we save from a received message depends heavily on the type
3667    of message, unlike the send side where we can safely send everything about
3668    the lkb for any type of message */
3669
3670 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3671 {
3672         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3673         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3674         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3675 }
3676
3677 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
3678                                 bool local)
3679 {
3680         if (local)
3681                 return;
3682
3683         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3684         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3685 }
3686
3687 static int receive_extralen(struct dlm_message *ms)
3688 {
3689         return (le16_to_cpu(ms->m_header.h_length) -
3690                 sizeof(struct dlm_message));
3691 }
3692
3693 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3694                        struct dlm_message *ms)
3695 {
3696         int len;
3697
3698         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3699                 if (!lkb->lkb_lvbptr)
3700                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3701                 if (!lkb->lkb_lvbptr)
3702                         return -ENOMEM;
3703                 len = receive_extralen(ms);
3704                 if (len > ls->ls_lvblen)
3705                         len = ls->ls_lvblen;
3706                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3707         }
3708         return 0;
3709 }
3710
3711 static void fake_bastfn(void *astparam, int mode)
3712 {
3713         log_print("fake_bastfn should not be called");
3714 }
3715
3716 static void fake_astfn(void *astparam)
3717 {
3718         log_print("fake_astfn should not be called");
3719 }
3720
3721 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3722                                 struct dlm_message *ms)
3723 {
3724         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3725         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3726         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3727         lkb->lkb_grmode = DLM_LOCK_IV;
3728         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3729
3730         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3731         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3732
3733         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3734                 /* lkb was just created so there won't be an lvb yet */
3735                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3736                 if (!lkb->lkb_lvbptr)
3737                         return -ENOMEM;
3738         }
3739
3740         return 0;
3741 }
3742
3743 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3744                                 struct dlm_message *ms)
3745 {
3746         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3747                 return -EBUSY;
3748
3749         if (receive_lvb(ls, lkb, ms))
3750                 return -ENOMEM;
3751
3752         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3753         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3754
3755         return 0;
3756 }
3757
3758 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3759                                struct dlm_message *ms)
3760 {
3761         if (receive_lvb(ls, lkb, ms))
3762                 return -ENOMEM;
3763         return 0;
3764 }
3765
3766 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3767    uses to send a reply and that the remote end uses to process the reply. */
3768
3769 static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3770 {
3771         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3772         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3773         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3774 }
3775
3776 /* This is called after the rsb is locked so that we can safely inspect
3777    fields in the lkb. */
3778
3779 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3780 {
3781         int from = le32_to_cpu(ms->m_header.h_nodeid);
3782         int error = 0;
3783
3784         /* currently mixing of user/kernel locks are not supported */
3785         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3786             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3787                 log_error(lkb->lkb_resource->res_ls,
3788                           "got user dlm message for a kernel lock");
3789                 error = -EINVAL;
3790                 goto out;
3791         }
3792
3793         switch (ms->m_type) {
3794         case cpu_to_le32(DLM_MSG_CONVERT):
3795         case cpu_to_le32(DLM_MSG_UNLOCK):
3796         case cpu_to_le32(DLM_MSG_CANCEL):
3797                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3798                         error = -EINVAL;
3799                 break;
3800
3801         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3802         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3803         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3804         case cpu_to_le32(DLM_MSG_GRANT):
3805         case cpu_to_le32(DLM_MSG_BAST):
3806                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3807                         error = -EINVAL;
3808                 break;
3809
3810         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3811                 if (!is_process_copy(lkb))
3812                         error = -EINVAL;
3813                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3814                         error = -EINVAL;
3815                 break;
3816
3817         default:
3818                 error = -EINVAL;
3819         }
3820
3821 out:
3822         if (error)
3823                 log_error(lkb->lkb_resource->res_ls,
3824                           "ignore invalid message %d from %d %x %x %x %d",
3825                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3826                           lkb->lkb_remid, dlm_iflags_val(lkb),
3827                           lkb->lkb_nodeid);
3828         return error;
3829 }
3830
3831 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3832 {
3833         struct dlm_lkb *lkb;
3834         struct dlm_rsb *r;
3835         int from_nodeid;
3836         int error, namelen = 0;
3837
3838         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3839
3840         error = create_lkb(ls, &lkb);
3841         if (error)
3842                 goto fail;
3843
3844         receive_flags(lkb, ms);
3845         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3846         error = receive_request_args(ls, lkb, ms);
3847         if (error) {
3848                 __put_lkb(ls, lkb);
3849                 goto fail;
3850         }
3851
3852         /* The dir node is the authority on whether we are the master
3853            for this rsb or not, so if the master sends us a request, we should
3854            recreate the rsb if we've destroyed it.   This race happens when we
3855            send a remove message to the dir node at the same time that the dir
3856            node sends us a request for the rsb. */
3857
3858         namelen = receive_extralen(ms);
3859
3860         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3861                          R_RECEIVE_REQUEST, &r);
3862         if (error) {
3863                 __put_lkb(ls, lkb);
3864                 goto fail;
3865         }
3866
3867         lock_rsb(r);
3868
3869         if (r->res_master_nodeid != dlm_our_nodeid()) {
3870                 error = validate_master_nodeid(ls, r, from_nodeid);
3871                 if (error) {
3872                         unlock_rsb(r);
3873                         put_rsb(r);
3874                         __put_lkb(ls, lkb);
3875                         goto fail;
3876                 }
3877         }
3878
3879         attach_lkb(r, lkb);
3880         error = do_request(r, lkb);
3881         send_request_reply(r, lkb, error);
3882         do_request_effects(r, lkb, error);
3883
3884         unlock_rsb(r);
3885         put_rsb(r);
3886
3887         if (error == -EINPROGRESS)
3888                 error = 0;
3889         if (error)
3890                 dlm_put_lkb(lkb);
3891         return 0;
3892
3893  fail:
3894         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3895            and do this receive_request again from process_lookup_list once
3896            we get the lookup reply.  This would avoid a many repeated
3897            ENOTBLK request failures when the lookup reply designating us
3898            as master is delayed. */
3899
3900         if (error != -ENOTBLK) {
3901                 log_limit(ls, "receive_request %x from %d %d",
3902                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
3903         }
3904
3905         setup_local_lkb(ls, ms);
3906         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3907         return error;
3908 }
3909
3910 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3911 {
3912         struct dlm_lkb *lkb;
3913         struct dlm_rsb *r;
3914         int error, reply = 1;
3915
3916         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3917         if (error)
3918                 goto fail;
3919
3920         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3921                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3922                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3923                           (unsigned long long)lkb->lkb_recover_seq,
3924                           le32_to_cpu(ms->m_header.h_nodeid),
3925                           le32_to_cpu(ms->m_lkid));
3926                 error = -ENOENT;
3927                 dlm_put_lkb(lkb);
3928                 goto fail;
3929         }
3930
3931         r = lkb->lkb_resource;
3932
3933         hold_rsb(r);
3934         lock_rsb(r);
3935
3936         error = validate_message(lkb, ms);
3937         if (error)
3938                 goto out;
3939
3940         receive_flags(lkb, ms);
3941
3942         error = receive_convert_args(ls, lkb, ms);
3943         if (error) {
3944                 send_convert_reply(r, lkb, error);
3945                 goto out;
3946         }
3947
3948         reply = !down_conversion(lkb);
3949
3950         error = do_convert(r, lkb);
3951         if (reply)
3952                 send_convert_reply(r, lkb, error);
3953         do_convert_effects(r, lkb, error);
3954  out:
3955         unlock_rsb(r);
3956         put_rsb(r);
3957         dlm_put_lkb(lkb);
3958         return 0;
3959
3960  fail:
3961         setup_local_lkb(ls, ms);
3962         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3963         return error;
3964 }
3965
3966 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3967 {
3968         struct dlm_lkb *lkb;
3969         struct dlm_rsb *r;
3970         int error;
3971
3972         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3973         if (error)
3974                 goto fail;
3975
3976         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3977                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3978                           lkb->lkb_id, lkb->lkb_remid,
3979                           le32_to_cpu(ms->m_header.h_nodeid),
3980                           le32_to_cpu(ms->m_lkid));
3981                 error = -ENOENT;
3982                 dlm_put_lkb(lkb);
3983                 goto fail;
3984         }
3985
3986         r = lkb->lkb_resource;
3987
3988         hold_rsb(r);
3989         lock_rsb(r);
3990
3991         error = validate_message(lkb, ms);
3992         if (error)
3993                 goto out;
3994
3995         receive_flags(lkb, ms);
3996
3997         error = receive_unlock_args(ls, lkb, ms);
3998         if (error) {
3999                 send_unlock_reply(r, lkb, error);
4000                 goto out;
4001         }
4002
4003         error = do_unlock(r, lkb);
4004         send_unlock_reply(r, lkb, error);
4005         do_unlock_effects(r, lkb, error);
4006  out:
4007         unlock_rsb(r);
4008         put_rsb(r);
4009         dlm_put_lkb(lkb);
4010         return 0;
4011
4012  fail:
4013         setup_local_lkb(ls, ms);
4014         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4015         return error;
4016 }
4017
4018 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4019 {
4020         struct dlm_lkb *lkb;
4021         struct dlm_rsb *r;
4022         int error;
4023
4024         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4025         if (error)
4026                 goto fail;
4027
4028         receive_flags(lkb, ms);
4029
4030         r = lkb->lkb_resource;
4031
4032         hold_rsb(r);
4033         lock_rsb(r);
4034
4035         error = validate_message(lkb, ms);
4036         if (error)
4037                 goto out;
4038
4039         error = do_cancel(r, lkb);
4040         send_cancel_reply(r, lkb, error);
4041         do_cancel_effects(r, lkb, error);
4042  out:
4043         unlock_rsb(r);
4044         put_rsb(r);
4045         dlm_put_lkb(lkb);
4046         return 0;
4047
4048  fail:
4049         setup_local_lkb(ls, ms);
4050         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4051         return error;
4052 }
4053
4054 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4055 {
4056         struct dlm_lkb *lkb;
4057         struct dlm_rsb *r;
4058         int error;
4059
4060         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4061         if (error)
4062                 return error;
4063
4064         r = lkb->lkb_resource;
4065
4066         hold_rsb(r);
4067         lock_rsb(r);
4068
4069         error = validate_message(lkb, ms);
4070         if (error)
4071                 goto out;
4072
4073         receive_flags_reply(lkb, ms, false);
4074         if (is_altmode(lkb))
4075                 munge_altmode(lkb, ms);
4076         grant_lock_pc(r, lkb, ms);
4077         queue_cast(r, lkb, 0);
4078  out:
4079         unlock_rsb(r);
4080         put_rsb(r);
4081         dlm_put_lkb(lkb);
4082         return 0;
4083 }
4084
4085 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4086 {
4087         struct dlm_lkb *lkb;
4088         struct dlm_rsb *r;
4089         int error;
4090
4091         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4092         if (error)
4093                 return error;
4094
4095         r = lkb->lkb_resource;
4096
4097         hold_rsb(r);
4098         lock_rsb(r);
4099
4100         error = validate_message(lkb, ms);
4101         if (error)
4102                 goto out;
4103
4104         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4105         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4106  out:
4107         unlock_rsb(r);
4108         put_rsb(r);
4109         dlm_put_lkb(lkb);
4110         return 0;
4111 }
4112
4113 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4114 {
4115         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4116
4117         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4118         our_nodeid = dlm_our_nodeid();
4119
4120         len = receive_extralen(ms);
4121
4122         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4123                                   &ret_nodeid, NULL);
4124
4125         /* Optimization: we're master so treat lookup as a request */
4126         if (!error && ret_nodeid == our_nodeid) {
4127                 receive_request(ls, ms);
4128                 return;
4129         }
4130         send_lookup_reply(ls, ms, ret_nodeid, error);
4131 }
4132
4133 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4134 {
4135         char name[DLM_RESNAME_MAXLEN+1];
4136         struct dlm_rsb *r;
4137         uint32_t hash, b;
4138         int rv, len, dir_nodeid, from_nodeid;
4139
4140         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4141
4142         len = receive_extralen(ms);
4143
4144         if (len > DLM_RESNAME_MAXLEN) {
4145                 log_error(ls, "receive_remove from %d bad len %d",
4146                           from_nodeid, len);
4147                 return;
4148         }
4149
4150         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4151         if (dir_nodeid != dlm_our_nodeid()) {
4152                 log_error(ls, "receive_remove from %d bad nodeid %d",
4153                           from_nodeid, dir_nodeid);
4154                 return;
4155         }
4156
4157         /* Look for name on rsbtbl.toss, if it's there, kill it.
4158            If it's on rsbtbl.keep, it's being used, and we should ignore this
4159            message.  This is an expected race between the dir node sending a
4160            request to the master node at the same time as the master node sends
4161            a remove to the dir node.  The resolution to that race is for the
4162            dir node to ignore the remove message, and the master node to
4163            recreate the master rsb when it gets a request from the dir node for
4164            an rsb it doesn't have. */
4165
4166         memset(name, 0, sizeof(name));
4167         memcpy(name, ms->m_extra, len);
4168
4169         hash = jhash(name, len, 0);
4170         b = hash & (ls->ls_rsbtbl_size - 1);
4171
4172         spin_lock(&ls->ls_rsbtbl[b].lock);
4173
4174         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4175         if (rv) {
4176                 /* verify the rsb is on keep list per comment above */
4177                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4178                 if (rv) {
4179                         /* should not happen */
4180                         log_error(ls, "receive_remove from %d not found %s",
4181                                   from_nodeid, name);
4182                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4183                         return;
4184                 }
4185                 if (r->res_master_nodeid != from_nodeid) {
4186                         /* should not happen */
4187                         log_error(ls, "receive_remove keep from %d master %d",
4188                                   from_nodeid, r->res_master_nodeid);
4189                         dlm_print_rsb(r);
4190                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4191                         return;
4192                 }
4193
4194                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4195                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4196                           name);
4197                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4198                 return;
4199         }
4200
4201         if (r->res_master_nodeid != from_nodeid) {
4202                 log_error(ls, "receive_remove toss from %d master %d",
4203                           from_nodeid, r->res_master_nodeid);
4204                 dlm_print_rsb(r);
4205                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4206                 return;
4207         }
4208
4209         if (kref_put(&r->res_ref, kill_rsb)) {
4210                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4211                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4212                 dlm_free_rsb(r);
4213         } else {
4214                 log_error(ls, "receive_remove from %d rsb ref error",
4215                           from_nodeid);
4216                 dlm_print_rsb(r);
4217                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4218         }
4219 }
4220
4221 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4222 {
4223         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4224 }
4225
4226 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4227 {
4228         struct dlm_lkb *lkb;
4229         struct dlm_rsb *r;
4230         int error, mstype, result;
4231         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4232
4233         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4234         if (error)
4235                 return error;
4236
4237         r = lkb->lkb_resource;
4238         hold_rsb(r);
4239         lock_rsb(r);
4240
4241         error = validate_message(lkb, ms);
4242         if (error)
4243                 goto out;
4244
4245         mstype = lkb->lkb_wait_type;
4246         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4247         if (error) {
4248                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4249                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4250                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4251                 dlm_dump_rsb(r);
4252                 goto out;
4253         }
4254
4255         /* Optimization: the dir node was also the master, so it took our
4256            lookup as a request and sent request reply instead of lookup reply */
4257         if (mstype == DLM_MSG_LOOKUP) {
4258                 r->res_master_nodeid = from_nodeid;
4259                 r->res_nodeid = from_nodeid;
4260                 lkb->lkb_nodeid = from_nodeid;
4261         }
4262
4263         /* this is the value returned from do_request() on the master */
4264         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4265
4266         switch (result) {
4267         case -EAGAIN:
4268                 /* request would block (be queued) on remote master */
4269                 queue_cast(r, lkb, -EAGAIN);
4270                 confirm_master(r, -EAGAIN);
4271                 unhold_lkb(lkb); /* undoes create_lkb() */
4272                 break;
4273
4274         case -EINPROGRESS:
4275         case 0:
4276                 /* request was queued or granted on remote master */
4277                 receive_flags_reply(lkb, ms, false);
4278                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4279                 if (is_altmode(lkb))
4280                         munge_altmode(lkb, ms);
4281                 if (result) {
4282                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4283                 } else {
4284                         grant_lock_pc(r, lkb, ms);
4285                         queue_cast(r, lkb, 0);
4286                 }
4287                 confirm_master(r, result);
4288                 break;
4289
4290         case -EBADR:
4291         case -ENOTBLK:
4292                 /* find_rsb failed to find rsb or rsb wasn't master */
4293                 log_limit(ls, "receive_request_reply %x from %d %d "
4294                           "master %d dir %d first %x %s", lkb->lkb_id,
4295                           from_nodeid, result, r->res_master_nodeid,
4296                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4297
4298                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4299                     r->res_master_nodeid != dlm_our_nodeid()) {
4300                         /* cause _request_lock->set_master->send_lookup */
4301                         r->res_master_nodeid = 0;
4302                         r->res_nodeid = -1;
4303                         lkb->lkb_nodeid = -1;
4304                 }
4305
4306                 if (is_overlap(lkb)) {
4307                         /* we'll ignore error in cancel/unlock reply */
4308                         queue_cast_overlap(r, lkb);
4309                         confirm_master(r, result);
4310                         unhold_lkb(lkb); /* undoes create_lkb() */
4311                 } else {
4312                         _request_lock(r, lkb);
4313
4314                         if (r->res_master_nodeid == dlm_our_nodeid())
4315                                 confirm_master(r, 0);
4316                 }
4317                 break;
4318
4319         default:
4320                 log_error(ls, "receive_request_reply %x error %d",
4321                           lkb->lkb_id, result);
4322         }
4323
4324         if ((result == 0 || result == -EINPROGRESS) &&
4325             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4326                 log_debug(ls, "receive_request_reply %x result %d unlock",
4327                           lkb->lkb_id, result);
4328                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4329                 send_unlock(r, lkb);
4330         } else if ((result == -EINPROGRESS) &&
4331                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4332                                       &lkb->lkb_iflags)) {
4333                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4334                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4335                 send_cancel(r, lkb);
4336         } else {
4337                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4338                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4339         }
4340  out:
4341         unlock_rsb(r);
4342         put_rsb(r);
4343         dlm_put_lkb(lkb);
4344         return 0;
4345 }
4346
4347 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4348                                     struct dlm_message *ms, bool local)
4349 {
4350         /* this is the value returned from do_convert() on the master */
4351         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4352         case -EAGAIN:
4353                 /* convert would block (be queued) on remote master */
4354                 queue_cast(r, lkb, -EAGAIN);
4355                 break;
4356
4357         case -EDEADLK:
4358                 receive_flags_reply(lkb, ms, local);
4359                 revert_lock_pc(r, lkb);
4360                 queue_cast(r, lkb, -EDEADLK);
4361                 break;
4362
4363         case -EINPROGRESS:
4364                 /* convert was queued on remote master */
4365                 receive_flags_reply(lkb, ms, local);
4366                 if (is_demoted(lkb))
4367                         munge_demoted(lkb);
4368                 del_lkb(r, lkb);
4369                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4370                 break;
4371
4372         case 0:
4373                 /* convert was granted on remote master */
4374                 receive_flags_reply(lkb, ms, local);
4375                 if (is_demoted(lkb))
4376                         munge_demoted(lkb);
4377                 grant_lock_pc(r, lkb, ms);
4378                 queue_cast(r, lkb, 0);
4379                 break;
4380
4381         default:
4382                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4383                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4384                           le32_to_cpu(ms->m_lkid),
4385                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4386                 dlm_print_rsb(r);
4387                 dlm_print_lkb(lkb);
4388         }
4389 }
4390
4391 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4392                                    bool local)
4393 {
4394         struct dlm_rsb *r = lkb->lkb_resource;
4395         int error;
4396
4397         hold_rsb(r);
4398         lock_rsb(r);
4399
4400         error = validate_message(lkb, ms);
4401         if (error)
4402                 goto out;
4403
4404         /* local reply can happen with waiters_mutex held */
4405         error = remove_from_waiters_ms(lkb, ms, local);
4406         if (error)
4407                 goto out;
4408
4409         __receive_convert_reply(r, lkb, ms, local);
4410  out:
4411         unlock_rsb(r);
4412         put_rsb(r);
4413 }
4414
4415 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4416 {
4417         struct dlm_lkb *lkb;
4418         int error;
4419
4420         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4421         if (error)
4422                 return error;
4423
4424         _receive_convert_reply(lkb, ms, false);
4425         dlm_put_lkb(lkb);
4426         return 0;
4427 }
4428
4429 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4430                                   bool local)
4431 {
4432         struct dlm_rsb *r = lkb->lkb_resource;
4433         int error;
4434
4435         hold_rsb(r);
4436         lock_rsb(r);
4437
4438         error = validate_message(lkb, ms);
4439         if (error)
4440                 goto out;
4441
4442         /* local reply can happen with waiters_mutex held */
4443         error = remove_from_waiters_ms(lkb, ms, local);
4444         if (error)
4445                 goto out;
4446
4447         /* this is the value returned from do_unlock() on the master */
4448
4449         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4450         case -DLM_EUNLOCK:
4451                 receive_flags_reply(lkb, ms, local);
4452                 remove_lock_pc(r, lkb);
4453                 queue_cast(r, lkb, -DLM_EUNLOCK);
4454                 break;
4455         case -ENOENT:
4456                 break;
4457         default:
4458                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4459                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4460         }
4461  out:
4462         unlock_rsb(r);
4463         put_rsb(r);
4464 }
4465
4466 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4467 {
4468         struct dlm_lkb *lkb;
4469         int error;
4470
4471         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4472         if (error)
4473                 return error;
4474
4475         _receive_unlock_reply(lkb, ms, false);
4476         dlm_put_lkb(lkb);
4477         return 0;
4478 }
4479
4480 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4481                                   bool local)
4482 {
4483         struct dlm_rsb *r = lkb->lkb_resource;
4484         int error;
4485
4486         hold_rsb(r);
4487         lock_rsb(r);
4488
4489         error = validate_message(lkb, ms);
4490         if (error)
4491                 goto out;
4492
4493         /* local reply can happen with waiters_mutex held */
4494         error = remove_from_waiters_ms(lkb, ms, local);
4495         if (error)
4496                 goto out;
4497
4498         /* this is the value returned from do_cancel() on the master */
4499
4500         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4501         case -DLM_ECANCEL:
4502                 receive_flags_reply(lkb, ms, local);
4503                 revert_lock_pc(r, lkb);
4504                 queue_cast(r, lkb, -DLM_ECANCEL);
4505                 break;
4506         case 0:
4507                 break;
4508         default:
4509                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4510                           lkb->lkb_id,
4511                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4512         }
4513  out:
4514         unlock_rsb(r);
4515         put_rsb(r);
4516 }
4517
4518 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4519 {
4520         struct dlm_lkb *lkb;
4521         int error;
4522
4523         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4524         if (error)
4525                 return error;
4526
4527         _receive_cancel_reply(lkb, ms, false);
4528         dlm_put_lkb(lkb);
4529         return 0;
4530 }
4531
4532 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4533 {
4534         struct dlm_lkb *lkb;
4535         struct dlm_rsb *r;
4536         int error, ret_nodeid;
4537         int do_lookup_list = 0;
4538
4539         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4540         if (error) {
4541                 log_error(ls, "%s no lkid %x", __func__,
4542                           le32_to_cpu(ms->m_lkid));
4543                 return;
4544         }
4545
4546         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4547            FIXME: will a non-zero error ever be returned? */
4548
4549         r = lkb->lkb_resource;
4550         hold_rsb(r);
4551         lock_rsb(r);
4552
4553         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4554         if (error)
4555                 goto out;
4556
4557         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4558
4559         /* We sometimes receive a request from the dir node for this
4560            rsb before we've received the dir node's loookup_reply for it.
4561            The request from the dir node implies we're the master, so we set
4562            ourself as master in receive_request_reply, and verify here that
4563            we are indeed the master. */
4564
4565         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4566                 /* This should never happen */
4567                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4568                           "master %d dir %d our %d first %x %s",
4569                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4570                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4571                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4572         }
4573
4574         if (ret_nodeid == dlm_our_nodeid()) {
4575                 r->res_master_nodeid = ret_nodeid;
4576                 r->res_nodeid = 0;
4577                 do_lookup_list = 1;
4578                 r->res_first_lkid = 0;
4579         } else if (ret_nodeid == -1) {
4580                 /* the remote node doesn't believe it's the dir node */
4581                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4582                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4583                 r->res_master_nodeid = 0;
4584                 r->res_nodeid = -1;
4585                 lkb->lkb_nodeid = -1;
4586         } else {
4587                 /* set_master() will set lkb_nodeid from r */
4588                 r->res_master_nodeid = ret_nodeid;
4589                 r->res_nodeid = ret_nodeid;
4590         }
4591
4592         if (is_overlap(lkb)) {
4593                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4594                           lkb->lkb_id, dlm_iflags_val(lkb));
4595                 queue_cast_overlap(r, lkb);
4596                 unhold_lkb(lkb); /* undoes create_lkb() */
4597                 goto out_list;
4598         }
4599
4600         _request_lock(r, lkb);
4601
4602  out_list:
4603         if (do_lookup_list)
4604                 process_lookup_list(r);
4605  out:
4606         unlock_rsb(r);
4607         put_rsb(r);
4608         dlm_put_lkb(lkb);
4609 }
4610
4611 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4612                              uint32_t saved_seq)
4613 {
4614         int error = 0, noent = 0;
4615
4616         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4617                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4618                           le32_to_cpu(ms->m_type),
4619                           le32_to_cpu(ms->m_header.h_nodeid),
4620                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4621                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4622                 return;
4623         }
4624
4625         switch (ms->m_type) {
4626
4627         /* messages sent to a master node */
4628
4629         case cpu_to_le32(DLM_MSG_REQUEST):
4630                 error = receive_request(ls, ms);
4631                 break;
4632
4633         case cpu_to_le32(DLM_MSG_CONVERT):
4634                 error = receive_convert(ls, ms);
4635                 break;
4636
4637         case cpu_to_le32(DLM_MSG_UNLOCK):
4638                 error = receive_unlock(ls, ms);
4639                 break;
4640
4641         case cpu_to_le32(DLM_MSG_CANCEL):
4642                 noent = 1;
4643                 error = receive_cancel(ls, ms);
4644                 break;
4645
4646         /* messages sent from a master node (replies to above) */
4647
4648         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4649                 error = receive_request_reply(ls, ms);
4650                 break;
4651
4652         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4653                 error = receive_convert_reply(ls, ms);
4654                 break;
4655
4656         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4657                 error = receive_unlock_reply(ls, ms);
4658                 break;
4659
4660         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4661                 error = receive_cancel_reply(ls, ms);
4662                 break;
4663
4664         /* messages sent from a master node (only two types of async msg) */
4665
4666         case cpu_to_le32(DLM_MSG_GRANT):
4667                 noent = 1;
4668                 error = receive_grant(ls, ms);
4669                 break;
4670
4671         case cpu_to_le32(DLM_MSG_BAST):
4672                 noent = 1;
4673                 error = receive_bast(ls, ms);
4674                 break;
4675
4676         /* messages sent to a dir node */
4677
4678         case cpu_to_le32(DLM_MSG_LOOKUP):
4679                 receive_lookup(ls, ms);
4680                 break;
4681
4682         case cpu_to_le32(DLM_MSG_REMOVE):
4683                 receive_remove(ls, ms);
4684                 break;
4685
4686         /* messages sent from a dir node (remove has no reply) */
4687
4688         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4689                 receive_lookup_reply(ls, ms);
4690                 break;
4691
4692         /* other messages */
4693
4694         case cpu_to_le32(DLM_MSG_PURGE):
4695                 receive_purge(ls, ms);
4696                 break;
4697
4698         default:
4699                 log_error(ls, "unknown message type %d",
4700                           le32_to_cpu(ms->m_type));
4701         }
4702
4703         /*
4704          * When checking for ENOENT, we're checking the result of
4705          * find_lkb(m_remid):
4706          *
4707          * The lock id referenced in the message wasn't found.  This may
4708          * happen in normal usage for the async messages and cancel, so
4709          * only use log_debug for them.
4710          *
4711          * Some errors are expected and normal.
4712          */
4713
4714         if (error == -ENOENT && noent) {
4715                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4716                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4717                           le32_to_cpu(ms->m_header.h_nodeid),
4718                           le32_to_cpu(ms->m_lkid), saved_seq);
4719         } else if (error == -ENOENT) {
4720                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4721                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4722                           le32_to_cpu(ms->m_header.h_nodeid),
4723                           le32_to_cpu(ms->m_lkid), saved_seq);
4724
4725                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4726                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4727         }
4728
4729         if (error == -EINVAL) {
4730                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4731                           "saved_seq %u",
4732                           le32_to_cpu(ms->m_type),
4733                           le32_to_cpu(ms->m_header.h_nodeid),
4734                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4735                           saved_seq);
4736         }
4737 }
4738
4739 /* If the lockspace is in recovery mode (locking stopped), then normal
4740    messages are saved on the requestqueue for processing after recovery is
4741    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4742    messages off the requestqueue before we process new ones. This occurs right
4743    after recovery completes when we transition from saving all messages on
4744    requestqueue, to processing all the saved messages, to processing new
4745    messages as they arrive. */
4746
4747 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4748                                 int nodeid)
4749 {
4750         if (dlm_locking_stopped(ls)) {
4751                 /* If we were a member of this lockspace, left, and rejoined,
4752                    other nodes may still be sending us messages from the
4753                    lockspace generation before we left. */
4754                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4755                         log_limit(ls, "receive %d from %d ignore old gen",
4756                                   le32_to_cpu(ms->m_type), nodeid);
4757                         return;
4758                 }
4759
4760                 dlm_add_requestqueue(ls, nodeid, ms);
4761         } else {
4762                 dlm_wait_requestqueue(ls);
4763                 _receive_message(ls, ms, 0);
4764         }
4765 }
4766
4767 /* This is called by dlm_recoverd to process messages that were saved on
4768    the requestqueue. */
4769
4770 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4771                                uint32_t saved_seq)
4772 {
4773         _receive_message(ls, ms, saved_seq);
4774 }
4775
4776 /* This is called by the midcomms layer when something is received for
4777    the lockspace.  It could be either a MSG (normal message sent as part of
4778    standard locking activity) or an RCOM (recovery message sent as part of
4779    lockspace recovery). */
4780
4781 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4782 {
4783         struct dlm_header *hd = &p->header;
4784         struct dlm_ls *ls;
4785         int type = 0;
4786
4787         switch (hd->h_cmd) {
4788         case DLM_MSG:
4789                 type = le32_to_cpu(p->message.m_type);
4790                 break;
4791         case DLM_RCOM:
4792                 type = le32_to_cpu(p->rcom.rc_type);
4793                 break;
4794         default:
4795                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4796                 return;
4797         }
4798
4799         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4800                 log_print("invalid h_nodeid %d from %d lockspace %x",
4801                           le32_to_cpu(hd->h_nodeid), nodeid,
4802                           le32_to_cpu(hd->u.h_lockspace));
4803                 return;
4804         }
4805
4806         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4807         if (!ls) {
4808                 if (dlm_config.ci_log_debug) {
4809                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4810                                 "%u from %d cmd %d type %d\n",
4811                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4812                                 hd->h_cmd, type);
4813                 }
4814
4815                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4816                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4817                 return;
4818         }
4819
4820         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4821            be inactive (in this ls) before transitioning to recovery mode */
4822
4823         down_read(&ls->ls_recv_active);
4824         if (hd->h_cmd == DLM_MSG)
4825                 dlm_receive_message(ls, &p->message, nodeid);
4826         else if (hd->h_cmd == DLM_RCOM)
4827                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4828         else
4829                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4830                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4831         up_read(&ls->ls_recv_active);
4832
4833         dlm_put_lockspace(ls);
4834 }
4835
4836 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4837                                    struct dlm_message *ms_local)
4838 {
4839         if (middle_conversion(lkb)) {
4840                 hold_lkb(lkb);
4841                 memset(ms_local, 0, sizeof(struct dlm_message));
4842                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
4843                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
4844                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4845                 _receive_convert_reply(lkb, ms_local, true);
4846
4847                 /* Same special case as in receive_rcom_lock_args() */
4848                 lkb->lkb_grmode = DLM_LOCK_IV;
4849                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4850                 unhold_lkb(lkb);
4851
4852         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4853                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4854         }
4855
4856         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4857            conversions are async; there's no reply from the remote master */
4858 }
4859
4860 /* A waiting lkb needs recovery if the master node has failed, or
4861    the master node is changing (only when no directory is used) */
4862
4863 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4864                                  int dir_nodeid)
4865 {
4866         if (dlm_no_directory(ls))
4867                 return 1;
4868
4869         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4870                 return 1;
4871
4872         return 0;
4873 }
4874
4875 /* Recovery for locks that are waiting for replies from nodes that are now
4876    gone.  We can just complete unlocks and cancels by faking a reply from the
4877    dead node.  Requests and up-conversions we flag to be resent after
4878    recovery.  Down-conversions can just be completed with a fake reply like
4879    unlocks.  Conversions between PR and CW need special attention. */
4880
4881 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4882 {
4883         struct dlm_lkb *lkb, *safe;
4884         struct dlm_message *ms_local;
4885         int wait_type, local_unlock_result, local_cancel_result;
4886         int dir_nodeid;
4887
4888         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
4889         if (!ms_local)
4890                 return;
4891
4892         mutex_lock(&ls->ls_waiters_mutex);
4893
4894         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4895
4896                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4897
4898                 /* exclude debug messages about unlocks because there can be so
4899                    many and they aren't very interesting */
4900
4901                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4902                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4903                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4904                                   lkb->lkb_id,
4905                                   lkb->lkb_remid,
4906                                   lkb->lkb_wait_type,
4907                                   lkb->lkb_resource->res_nodeid,
4908                                   lkb->lkb_nodeid,
4909                                   lkb->lkb_wait_nodeid,
4910                                   dir_nodeid);
4911                 }
4912
4913                 /* all outstanding lookups, regardless of destination  will be
4914                    resent after recovery is done */
4915
4916                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4917                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4918                         continue;
4919                 }
4920
4921                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4922                         continue;
4923
4924                 wait_type = lkb->lkb_wait_type;
4925                 local_unlock_result = -DLM_EUNLOCK;
4926                 local_cancel_result = -DLM_ECANCEL;
4927
4928                 /* Main reply may have been received leaving a zero wait_type,
4929                    but a reply for the overlapping op may not have been
4930                    received.  In that case we need to fake the appropriate
4931                    reply for the overlap op. */
4932
4933                 if (!wait_type) {
4934                         if (is_overlap_cancel(lkb)) {
4935                                 wait_type = DLM_MSG_CANCEL;
4936                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4937                                         local_cancel_result = 0;
4938                         }
4939                         if (is_overlap_unlock(lkb)) {
4940                                 wait_type = DLM_MSG_UNLOCK;
4941                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4942                                         local_unlock_result = -ENOENT;
4943                         }
4944
4945                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4946                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4947                                   local_cancel_result, local_unlock_result);
4948                 }
4949
4950                 switch (wait_type) {
4951
4952                 case DLM_MSG_REQUEST:
4953                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4954                         break;
4955
4956                 case DLM_MSG_CONVERT:
4957                         recover_convert_waiter(ls, lkb, ms_local);
4958                         break;
4959
4960                 case DLM_MSG_UNLOCK:
4961                         hold_lkb(lkb);
4962                         memset(ms_local, 0, sizeof(struct dlm_message));
4963                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
4964                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
4965                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4966                         _receive_unlock_reply(lkb, ms_local, true);
4967                         dlm_put_lkb(lkb);
4968                         break;
4969
4970                 case DLM_MSG_CANCEL:
4971                         hold_lkb(lkb);
4972                         memset(ms_local, 0, sizeof(struct dlm_message));
4973                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
4974                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
4975                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4976                         _receive_cancel_reply(lkb, ms_local, true);
4977                         dlm_put_lkb(lkb);
4978                         break;
4979
4980                 default:
4981                         log_error(ls, "invalid lkb wait_type %d %d",
4982                                   lkb->lkb_wait_type, wait_type);
4983                 }
4984                 schedule();
4985         }
4986         mutex_unlock(&ls->ls_waiters_mutex);
4987         kfree(ms_local);
4988 }
4989
4990 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4991 {
4992         struct dlm_lkb *lkb = NULL, *iter;
4993
4994         mutex_lock(&ls->ls_waiters_mutex);
4995         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
4996                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
4997                         hold_lkb(iter);
4998                         lkb = iter;
4999                         break;
5000                 }
5001         }
5002         mutex_unlock(&ls->ls_waiters_mutex);
5003
5004         return lkb;
5005 }
5006
5007 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5008    master or dir-node for r.  Processing the lkb may result in it being placed
5009    back on waiters. */
5010
5011 /* We do this after normal locking has been enabled and any saved messages
5012    (in requestqueue) have been processed.  We should be confident that at
5013    this point we won't get or process a reply to any of these waiting
5014    operations.  But, new ops may be coming in on the rsbs/locks here from
5015    userspace or remotely. */
5016
5017 /* there may have been an overlap unlock/cancel prior to recovery or after
5018    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5019    overlap flag would just have been set and nothing new sent.  we can be
5020    confident here than any replies to either the initial op or overlap ops
5021    prior to recovery have been received. */
5022
5023 int dlm_recover_waiters_post(struct dlm_ls *ls)
5024 {
5025         struct dlm_lkb *lkb;
5026         struct dlm_rsb *r;
5027         int error = 0, mstype, err, oc, ou;
5028
5029         while (1) {
5030                 if (dlm_locking_stopped(ls)) {
5031                         log_debug(ls, "recover_waiters_post aborted");
5032                         error = -EINTR;
5033                         break;
5034                 }
5035
5036                 lkb = find_resend_waiter(ls);
5037                 if (!lkb)
5038                         break;
5039
5040                 r = lkb->lkb_resource;
5041                 hold_rsb(r);
5042                 lock_rsb(r);
5043
5044                 mstype = lkb->lkb_wait_type;
5045                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5046                                         &lkb->lkb_iflags);
5047                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5048                                         &lkb->lkb_iflags);
5049                 err = 0;
5050
5051                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5052                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5053                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5054                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5055                           dlm_dir_nodeid(r), oc, ou);
5056
5057                 /* At this point we assume that we won't get a reply to any
5058                    previous op or overlap op on this lock.  First, do a big
5059                    remove_from_waiters() for all previous ops. */
5060
5061                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5062                 lkb->lkb_wait_type = 0;
5063                 /* drop all wait_count references we still
5064                  * hold a reference for this iteration.
5065                  */
5066                 while (!atomic_dec_and_test(&lkb->lkb_wait_count))
5067                         unhold_lkb(lkb);
5068
5069                 mutex_lock(&ls->ls_waiters_mutex);
5070                 list_del_init(&lkb->lkb_wait_reply);
5071                 mutex_unlock(&ls->ls_waiters_mutex);
5072
5073                 if (oc || ou) {
5074                         /* do an unlock or cancel instead of resending */
5075                         switch (mstype) {
5076                         case DLM_MSG_LOOKUP:
5077                         case DLM_MSG_REQUEST:
5078                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5079                                                         -DLM_ECANCEL);
5080                                 unhold_lkb(lkb); /* undoes create_lkb() */
5081                                 break;
5082                         case DLM_MSG_CONVERT:
5083                                 if (oc) {
5084                                         queue_cast(r, lkb, -DLM_ECANCEL);
5085                                 } else {
5086                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5087                                         _unlock_lock(r, lkb);
5088                                 }
5089                                 break;
5090                         default:
5091                                 err = 1;
5092                         }
5093                 } else {
5094                         switch (mstype) {
5095                         case DLM_MSG_LOOKUP:
5096                         case DLM_MSG_REQUEST:
5097                                 _request_lock(r, lkb);
5098                                 if (is_master(r))
5099                                         confirm_master(r, 0);
5100                                 break;
5101                         case DLM_MSG_CONVERT:
5102                                 _convert_lock(r, lkb);
5103                                 break;
5104                         default:
5105                                 err = 1;
5106                         }
5107                 }
5108
5109                 if (err) {
5110                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5111                                   "dir_nodeid %d overlap %d %d",
5112                                   lkb->lkb_id, mstype, r->res_nodeid,
5113                                   dlm_dir_nodeid(r), oc, ou);
5114                 }
5115                 unlock_rsb(r);
5116                 put_rsb(r);
5117                 dlm_put_lkb(lkb);
5118         }
5119
5120         return error;
5121 }
5122
5123 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5124                               struct list_head *list)
5125 {
5126         struct dlm_lkb *lkb, *safe;
5127
5128         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5129                 if (!is_master_copy(lkb))
5130                         continue;
5131
5132                 /* don't purge lkbs we've added in recover_master_copy for
5133                    the current recovery seq */
5134
5135                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5136                         continue;
5137
5138                 del_lkb(r, lkb);
5139
5140                 /* this put should free the lkb */
5141                 if (!dlm_put_lkb(lkb))
5142                         log_error(ls, "purged mstcpy lkb not released");
5143         }
5144 }
5145
5146 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5147 {
5148         struct dlm_ls *ls = r->res_ls;
5149
5150         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5151         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5152         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5153 }
5154
5155 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5156                             struct list_head *list,
5157                             int nodeid_gone, unsigned int *count)
5158 {
5159         struct dlm_lkb *lkb, *safe;
5160
5161         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5162                 if (!is_master_copy(lkb))
5163                         continue;
5164
5165                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5166                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5167
5168                         /* tell recover_lvb to invalidate the lvb
5169                            because a node holding EX/PW failed */
5170                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5171                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5172                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5173                         }
5174
5175                         del_lkb(r, lkb);
5176
5177                         /* this put should free the lkb */
5178                         if (!dlm_put_lkb(lkb))
5179                                 log_error(ls, "purged dead lkb not released");
5180
5181                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5182
5183                         (*count)++;
5184                 }
5185         }
5186 }
5187
5188 /* Get rid of locks held by nodes that are gone. */
5189
5190 void dlm_recover_purge(struct dlm_ls *ls)
5191 {
5192         struct dlm_rsb *r;
5193         struct dlm_member *memb;
5194         int nodes_count = 0;
5195         int nodeid_gone = 0;
5196         unsigned int lkb_count = 0;
5197
5198         /* cache one removed nodeid to optimize the common
5199            case of a single node removed */
5200
5201         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5202                 nodes_count++;
5203                 nodeid_gone = memb->nodeid;
5204         }
5205
5206         if (!nodes_count)
5207                 return;
5208
5209         down_write(&ls->ls_root_sem);
5210         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5211                 hold_rsb(r);
5212                 lock_rsb(r);
5213                 if (is_master(r)) {
5214                         purge_dead_list(ls, r, &r->res_grantqueue,
5215                                         nodeid_gone, &lkb_count);
5216                         purge_dead_list(ls, r, &r->res_convertqueue,
5217                                         nodeid_gone, &lkb_count);
5218                         purge_dead_list(ls, r, &r->res_waitqueue,
5219                                         nodeid_gone, &lkb_count);
5220                 }
5221                 unlock_rsb(r);
5222                 unhold_rsb(r);
5223                 cond_resched();
5224         }
5225         up_write(&ls->ls_root_sem);
5226
5227         if (lkb_count)
5228                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5229                           lkb_count, nodes_count);
5230 }
5231
5232 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5233 {
5234         struct rb_node *n;
5235         struct dlm_rsb *r;
5236
5237         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5238         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5239                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5240
5241                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5242                         continue;
5243                 if (!is_master(r)) {
5244                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5245                         continue;
5246                 }
5247                 hold_rsb(r);
5248                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5249                 return r;
5250         }
5251         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5252         return NULL;
5253 }
5254
5255 /*
5256  * Attempt to grant locks on resources that we are the master of.
5257  * Locks may have become grantable during recovery because locks
5258  * from departed nodes have been purged (or not rebuilt), allowing
5259  * previously blocked locks to now be granted.  The subset of rsb's
5260  * we are interested in are those with lkb's on either the convert or
5261  * waiting queues.
5262  *
5263  * Simplest would be to go through each master rsb and check for non-empty
5264  * convert or waiting queues, and attempt to grant on those rsbs.
5265  * Checking the queues requires lock_rsb, though, for which we'd need
5266  * to release the rsbtbl lock.  This would make iterating through all
5267  * rsb's very inefficient.  So, we rely on earlier recovery routines
5268  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5269  * locks for.
5270  */
5271
5272 void dlm_recover_grant(struct dlm_ls *ls)
5273 {
5274         struct dlm_rsb *r;
5275         int bucket = 0;
5276         unsigned int count = 0;
5277         unsigned int rsb_count = 0;
5278         unsigned int lkb_count = 0;
5279
5280         while (1) {
5281                 r = find_grant_rsb(ls, bucket);
5282                 if (!r) {
5283                         if (bucket == ls->ls_rsbtbl_size - 1)
5284                                 break;
5285                         bucket++;
5286                         continue;
5287                 }
5288                 rsb_count++;
5289                 count = 0;
5290                 lock_rsb(r);
5291                 /* the RECOVER_GRANT flag is checked in the grant path */
5292                 grant_pending_locks(r, &count);
5293                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5294                 lkb_count += count;
5295                 confirm_master(r, 0);
5296                 unlock_rsb(r);
5297                 put_rsb(r);
5298                 cond_resched();
5299         }
5300
5301         if (lkb_count)
5302                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5303                           lkb_count, rsb_count);
5304 }
5305
5306 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5307                                          uint32_t remid)
5308 {
5309         struct dlm_lkb *lkb;
5310
5311         list_for_each_entry(lkb, head, lkb_statequeue) {
5312                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5313                         return lkb;
5314         }
5315         return NULL;
5316 }
5317
5318 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5319                                     uint32_t remid)
5320 {
5321         struct dlm_lkb *lkb;
5322
5323         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5324         if (lkb)
5325                 return lkb;
5326         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5327         if (lkb)
5328                 return lkb;
5329         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5330         if (lkb)
5331                 return lkb;
5332         return NULL;
5333 }
5334
5335 /* needs at least dlm_rcom + rcom_lock */
5336 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5337                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5338 {
5339         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5340
5341         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5342         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5343         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5344         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5345         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5346         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5347         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5348         lkb->lkb_rqmode = rl->rl_rqmode;
5349         lkb->lkb_grmode = rl->rl_grmode;
5350         /* don't set lkb_status because add_lkb wants to itself */
5351
5352         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5353         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5354
5355         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5356                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5357                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5358                 if (lvblen > ls->ls_lvblen)
5359                         return -EINVAL;
5360                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5361                 if (!lkb->lkb_lvbptr)
5362                         return -ENOMEM;
5363                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5364         }
5365
5366         /* Conversions between PR and CW (middle modes) need special handling.
5367            The real granted mode of these converting locks cannot be determined
5368            until all locks have been rebuilt on the rsb (recover_conversion) */
5369
5370         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5371             middle_conversion(lkb)) {
5372                 rl->rl_status = DLM_LKSTS_CONVERT;
5373                 lkb->lkb_grmode = DLM_LOCK_IV;
5374                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5375         }
5376
5377         return 0;
5378 }
5379
5380 /* This lkb may have been recovered in a previous aborted recovery so we need
5381    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5382    If so we just send back a standard reply.  If not, we create a new lkb with
5383    the given values and send back our lkid.  We send back our lkid by sending
5384    back the rcom_lock struct we got but with the remid field filled in. */
5385
5386 /* needs at least dlm_rcom + rcom_lock */
5387 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5388 {
5389         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5390         struct dlm_rsb *r;
5391         struct dlm_lkb *lkb;
5392         uint32_t remid = 0;
5393         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5394         int error;
5395
5396         if (rl->rl_parent_lkid) {
5397                 error = -EOPNOTSUPP;
5398                 goto out;
5399         }
5400
5401         remid = le32_to_cpu(rl->rl_lkid);
5402
5403         /* In general we expect the rsb returned to be R_MASTER, but we don't
5404            have to require it.  Recovery of masters on one node can overlap
5405            recovery of locks on another node, so one node can send us MSTCPY
5406            locks before we've made ourselves master of this rsb.  We can still
5407            add new MSTCPY locks that we receive here without any harm; when
5408            we make ourselves master, dlm_recover_masters() won't touch the
5409            MSTCPY locks we've received early. */
5410
5411         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5412                          from_nodeid, R_RECEIVE_RECOVER, &r);
5413         if (error)
5414                 goto out;
5415
5416         lock_rsb(r);
5417
5418         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5419                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5420                           from_nodeid, remid);
5421                 error = -EBADR;
5422                 goto out_unlock;
5423         }
5424
5425         lkb = search_remid(r, from_nodeid, remid);
5426         if (lkb) {
5427                 error = -EEXIST;
5428                 goto out_remid;
5429         }
5430
5431         error = create_lkb(ls, &lkb);
5432         if (error)
5433                 goto out_unlock;
5434
5435         error = receive_rcom_lock_args(ls, lkb, r, rc);
5436         if (error) {
5437                 __put_lkb(ls, lkb);
5438                 goto out_unlock;
5439         }
5440
5441         attach_lkb(r, lkb);
5442         add_lkb(r, lkb, rl->rl_status);
5443         ls->ls_recover_locks_in++;
5444
5445         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5446                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5447
5448  out_remid:
5449         /* this is the new value returned to the lock holder for
5450            saving in its process-copy lkb */
5451         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5452
5453         lkb->lkb_recover_seq = ls->ls_recover_seq;
5454
5455  out_unlock:
5456         unlock_rsb(r);
5457         put_rsb(r);
5458  out:
5459         if (error && error != -EEXIST)
5460                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5461                           from_nodeid, remid, error);
5462         rl->rl_result = cpu_to_le32(error);
5463         return error;
5464 }
5465
5466 /* needs at least dlm_rcom + rcom_lock */
5467 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5468 {
5469         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5470         struct dlm_rsb *r;
5471         struct dlm_lkb *lkb;
5472         uint32_t lkid, remid;
5473         int error, result;
5474
5475         lkid = le32_to_cpu(rl->rl_lkid);
5476         remid = le32_to_cpu(rl->rl_remid);
5477         result = le32_to_cpu(rl->rl_result);
5478
5479         error = find_lkb(ls, lkid, &lkb);
5480         if (error) {
5481                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5482                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5483                           result);
5484                 return error;
5485         }
5486
5487         r = lkb->lkb_resource;
5488         hold_rsb(r);
5489         lock_rsb(r);
5490
5491         if (!is_process_copy(lkb)) {
5492                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5493                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5494                           result);
5495                 dlm_dump_rsb(r);
5496                 unlock_rsb(r);
5497                 put_rsb(r);
5498                 dlm_put_lkb(lkb);
5499                 return -EINVAL;
5500         }
5501
5502         switch (result) {
5503         case -EBADR:
5504                 /* There's a chance the new master received our lock before
5505                    dlm_recover_master_reply(), this wouldn't happen if we did
5506                    a barrier between recover_masters and recover_locks. */
5507
5508                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5509                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5510                           result);
5511         
5512                 dlm_send_rcom_lock(r, lkb);
5513                 goto out;
5514         case -EEXIST:
5515         case 0:
5516                 lkb->lkb_remid = remid;
5517                 break;
5518         default:
5519                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5520                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5521                           result);
5522         }
5523
5524         /* an ack for dlm_recover_locks() which waits for replies from
5525            all the locks it sends to new masters */
5526         dlm_recovered_lock(r);
5527  out:
5528         unlock_rsb(r);
5529         put_rsb(r);
5530         dlm_put_lkb(lkb);
5531
5532         return 0;
5533 }
5534
5535 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5536                      int mode, uint32_t flags, void *name, unsigned int namelen)
5537 {
5538         struct dlm_lkb *lkb;
5539         struct dlm_args args;
5540         bool do_put = true;
5541         int error;
5542
5543         dlm_lock_recovery(ls);
5544
5545         error = create_lkb(ls, &lkb);
5546         if (error) {
5547                 kfree(ua);
5548                 goto out;
5549         }
5550
5551         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5552
5553         if (flags & DLM_LKF_VALBLK) {
5554                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5555                 if (!ua->lksb.sb_lvbptr) {
5556                         kfree(ua);
5557                         error = -ENOMEM;
5558                         goto out_put;
5559                 }
5560         }
5561         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5562                               fake_bastfn, &args);
5563         if (error) {
5564                 kfree(ua->lksb.sb_lvbptr);
5565                 ua->lksb.sb_lvbptr = NULL;
5566                 kfree(ua);
5567                 goto out_put;
5568         }
5569
5570         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5571            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5572            lock and that lkb_astparam is the dlm_user_args structure. */
5573         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5574         error = request_lock(ls, lkb, name, namelen, &args);
5575
5576         switch (error) {
5577         case 0:
5578                 break;
5579         case -EINPROGRESS:
5580                 error = 0;
5581                 break;
5582         case -EAGAIN:
5583                 error = 0;
5584                 fallthrough;
5585         default:
5586                 goto out_put;
5587         }
5588
5589         /* add this new lkb to the per-process list of locks */
5590         spin_lock(&ua->proc->locks_spin);
5591         hold_lkb(lkb);
5592         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5593         spin_unlock(&ua->proc->locks_spin);
5594         do_put = false;
5595  out_put:
5596         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5597         if (do_put)
5598                 __put_lkb(ls, lkb);
5599  out:
5600         dlm_unlock_recovery(ls);
5601         return error;
5602 }
5603
5604 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5605                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5606 {
5607         struct dlm_lkb *lkb;
5608         struct dlm_args args;
5609         struct dlm_user_args *ua;
5610         int error;
5611
5612         dlm_lock_recovery(ls);
5613
5614         error = find_lkb(ls, lkid, &lkb);
5615         if (error)
5616                 goto out;
5617
5618         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5619
5620         /* user can change the params on its lock when it converts it, or
5621            add an lvb that didn't exist before */
5622
5623         ua = lkb->lkb_ua;
5624
5625         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5626                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5627                 if (!ua->lksb.sb_lvbptr) {
5628                         error = -ENOMEM;
5629                         goto out_put;
5630                 }
5631         }
5632         if (lvb_in && ua->lksb.sb_lvbptr)
5633                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5634
5635         ua->xid = ua_tmp->xid;
5636         ua->castparam = ua_tmp->castparam;
5637         ua->castaddr = ua_tmp->castaddr;
5638         ua->bastparam = ua_tmp->bastparam;
5639         ua->bastaddr = ua_tmp->bastaddr;
5640         ua->user_lksb = ua_tmp->user_lksb;
5641
5642         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5643                               fake_bastfn, &args);
5644         if (error)
5645                 goto out_put;
5646
5647         error = convert_lock(ls, lkb, &args);
5648
5649         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5650                 error = 0;
5651  out_put:
5652         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5653         dlm_put_lkb(lkb);
5654  out:
5655         dlm_unlock_recovery(ls);
5656         kfree(ua_tmp);
5657         return error;
5658 }
5659
5660 /*
5661  * The caller asks for an orphan lock on a given resource with a given mode.
5662  * If a matching lock exists, it's moved to the owner's list of locks and
5663  * the lkid is returned.
5664  */
5665
5666 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5667                      int mode, uint32_t flags, void *name, unsigned int namelen,
5668                      uint32_t *lkid)
5669 {
5670         struct dlm_lkb *lkb = NULL, *iter;
5671         struct dlm_user_args *ua;
5672         int found_other_mode = 0;
5673         int rv = 0;
5674
5675         mutex_lock(&ls->ls_orphans_mutex);
5676         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5677                 if (iter->lkb_resource->res_length != namelen)
5678                         continue;
5679                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5680                         continue;
5681                 if (iter->lkb_grmode != mode) {
5682                         found_other_mode = 1;
5683                         continue;
5684                 }
5685
5686                 lkb = iter;
5687                 list_del_init(&iter->lkb_ownqueue);
5688                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5689                 *lkid = iter->lkb_id;
5690                 break;
5691         }
5692         mutex_unlock(&ls->ls_orphans_mutex);
5693
5694         if (!lkb && found_other_mode) {
5695                 rv = -EAGAIN;
5696                 goto out;
5697         }
5698
5699         if (!lkb) {
5700                 rv = -ENOENT;
5701                 goto out;
5702         }
5703
5704         lkb->lkb_exflags = flags;
5705         lkb->lkb_ownpid = (int) current->pid;
5706
5707         ua = lkb->lkb_ua;
5708
5709         ua->proc = ua_tmp->proc;
5710         ua->xid = ua_tmp->xid;
5711         ua->castparam = ua_tmp->castparam;
5712         ua->castaddr = ua_tmp->castaddr;
5713         ua->bastparam = ua_tmp->bastparam;
5714         ua->bastaddr = ua_tmp->bastaddr;
5715         ua->user_lksb = ua_tmp->user_lksb;
5716
5717         /*
5718          * The lkb reference from the ls_orphans list was not
5719          * removed above, and is now considered the reference
5720          * for the proc locks list.
5721          */
5722
5723         spin_lock(&ua->proc->locks_spin);
5724         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5725         spin_unlock(&ua->proc->locks_spin);
5726  out:
5727         kfree(ua_tmp);
5728         return rv;
5729 }
5730
5731 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5732                     uint32_t flags, uint32_t lkid, char *lvb_in)
5733 {
5734         struct dlm_lkb *lkb;
5735         struct dlm_args args;
5736         struct dlm_user_args *ua;
5737         int error;
5738
5739         dlm_lock_recovery(ls);
5740
5741         error = find_lkb(ls, lkid, &lkb);
5742         if (error)
5743                 goto out;
5744
5745         trace_dlm_unlock_start(ls, lkb, flags);
5746
5747         ua = lkb->lkb_ua;
5748
5749         if (lvb_in && ua->lksb.sb_lvbptr)
5750                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5751         if (ua_tmp->castparam)
5752                 ua->castparam = ua_tmp->castparam;
5753         ua->user_lksb = ua_tmp->user_lksb;
5754
5755         error = set_unlock_args(flags, ua, &args);
5756         if (error)
5757                 goto out_put;
5758
5759         error = unlock_lock(ls, lkb, &args);
5760
5761         if (error == -DLM_EUNLOCK)
5762                 error = 0;
5763         /* from validate_unlock_args() */
5764         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5765                 error = 0;
5766         if (error)
5767                 goto out_put;
5768
5769         spin_lock(&ua->proc->locks_spin);
5770         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5771         if (!list_empty(&lkb->lkb_ownqueue))
5772                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5773         spin_unlock(&ua->proc->locks_spin);
5774  out_put:
5775         trace_dlm_unlock_end(ls, lkb, flags, error);
5776         dlm_put_lkb(lkb);
5777  out:
5778         dlm_unlock_recovery(ls);
5779         kfree(ua_tmp);
5780         return error;
5781 }
5782
5783 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5784                     uint32_t flags, uint32_t lkid)
5785 {
5786         struct dlm_lkb *lkb;
5787         struct dlm_args args;
5788         struct dlm_user_args *ua;
5789         int error;
5790
5791         dlm_lock_recovery(ls);
5792
5793         error = find_lkb(ls, lkid, &lkb);
5794         if (error)
5795                 goto out;
5796
5797         trace_dlm_unlock_start(ls, lkb, flags);
5798
5799         ua = lkb->lkb_ua;
5800         if (ua_tmp->castparam)
5801                 ua->castparam = ua_tmp->castparam;
5802         ua->user_lksb = ua_tmp->user_lksb;
5803
5804         error = set_unlock_args(flags, ua, &args);
5805         if (error)
5806                 goto out_put;
5807
5808         error = cancel_lock(ls, lkb, &args);
5809
5810         if (error == -DLM_ECANCEL)
5811                 error = 0;
5812         /* from validate_unlock_args() */
5813         if (error == -EBUSY)
5814                 error = 0;
5815  out_put:
5816         trace_dlm_unlock_end(ls, lkb, flags, error);
5817         dlm_put_lkb(lkb);
5818  out:
5819         dlm_unlock_recovery(ls);
5820         kfree(ua_tmp);
5821         return error;
5822 }
5823
5824 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5825 {
5826         struct dlm_lkb *lkb;
5827         struct dlm_args args;
5828         struct dlm_user_args *ua;
5829         struct dlm_rsb *r;
5830         int error;
5831
5832         dlm_lock_recovery(ls);
5833
5834         error = find_lkb(ls, lkid, &lkb);
5835         if (error)
5836                 goto out;
5837
5838         trace_dlm_unlock_start(ls, lkb, flags);
5839
5840         ua = lkb->lkb_ua;
5841
5842         error = set_unlock_args(flags, ua, &args);
5843         if (error)
5844                 goto out_put;
5845
5846         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5847
5848         r = lkb->lkb_resource;
5849         hold_rsb(r);
5850         lock_rsb(r);
5851
5852         error = validate_unlock_args(lkb, &args);
5853         if (error)
5854                 goto out_r;
5855         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5856
5857         error = _cancel_lock(r, lkb);
5858  out_r:
5859         unlock_rsb(r);
5860         put_rsb(r);
5861
5862         if (error == -DLM_ECANCEL)
5863                 error = 0;
5864         /* from validate_unlock_args() */
5865         if (error == -EBUSY)
5866                 error = 0;
5867  out_put:
5868         trace_dlm_unlock_end(ls, lkb, flags, error);
5869         dlm_put_lkb(lkb);
5870  out:
5871         dlm_unlock_recovery(ls);
5872         return error;
5873 }
5874
5875 /* lkb's that are removed from the waiters list by revert are just left on the
5876    orphans list with the granted orphan locks, to be freed by purge */
5877
5878 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5879 {
5880         struct dlm_args args;
5881         int error;
5882
5883         hold_lkb(lkb); /* reference for the ls_orphans list */
5884         mutex_lock(&ls->ls_orphans_mutex);
5885         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5886         mutex_unlock(&ls->ls_orphans_mutex);
5887
5888         set_unlock_args(0, lkb->lkb_ua, &args);
5889
5890         error = cancel_lock(ls, lkb, &args);
5891         if (error == -DLM_ECANCEL)
5892                 error = 0;
5893         return error;
5894 }
5895
5896 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5897    granted.  Regardless of what rsb queue the lock is on, it's removed and
5898    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
5899    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
5900
5901 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5902 {
5903         struct dlm_args args;
5904         int error;
5905
5906         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
5907                         lkb->lkb_ua, &args);
5908
5909         error = unlock_lock(ls, lkb, &args);
5910         if (error == -DLM_EUNLOCK)
5911                 error = 0;
5912         return error;
5913 }
5914
5915 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5916    (which does lock_rsb) due to deadlock with receiving a message that does
5917    lock_rsb followed by dlm_user_add_cb() */
5918
5919 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5920                                      struct dlm_user_proc *proc)
5921 {
5922         struct dlm_lkb *lkb = NULL;
5923
5924         spin_lock(&ls->ls_clear_proc_locks);
5925         if (list_empty(&proc->locks))
5926                 goto out;
5927
5928         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5929         list_del_init(&lkb->lkb_ownqueue);
5930
5931         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5932                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5933         else
5934                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5935  out:
5936         spin_unlock(&ls->ls_clear_proc_locks);
5937         return lkb;
5938 }
5939
5940 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5941    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5942    which we clear here. */
5943
5944 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5945    list, and no more device_writes should add lkb's to proc->locks list; so we
5946    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5947    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5948    them ourself. */
5949
5950 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5951 {
5952         struct dlm_lkb *lkb, *safe;
5953
5954         dlm_lock_recovery(ls);
5955
5956         while (1) {
5957                 lkb = del_proc_lock(ls, proc);
5958                 if (!lkb)
5959                         break;
5960                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5961                         orphan_proc_lock(ls, lkb);
5962                 else
5963                         unlock_proc_lock(ls, lkb);
5964
5965                 /* this removes the reference for the proc->locks list
5966                    added by dlm_user_request, it may result in the lkb
5967                    being freed */
5968
5969                 dlm_put_lkb(lkb);
5970         }
5971
5972         spin_lock(&ls->ls_clear_proc_locks);
5973
5974         /* in-progress unlocks */
5975         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5976                 list_del_init(&lkb->lkb_ownqueue);
5977                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5978                 dlm_put_lkb(lkb);
5979         }
5980
5981         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5982                 dlm_purge_lkb_callbacks(lkb);
5983                 list_del_init(&lkb->lkb_cb_list);
5984                 dlm_put_lkb(lkb);
5985         }
5986
5987         spin_unlock(&ls->ls_clear_proc_locks);
5988         dlm_unlock_recovery(ls);
5989 }
5990
5991 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5992 {
5993         struct dlm_lkb *lkb, *safe;
5994
5995         while (1) {
5996                 lkb = NULL;
5997                 spin_lock(&proc->locks_spin);
5998                 if (!list_empty(&proc->locks)) {
5999                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6000                                          lkb_ownqueue);
6001                         list_del_init(&lkb->lkb_ownqueue);
6002                 }
6003                 spin_unlock(&proc->locks_spin);
6004
6005                 if (!lkb)
6006                         break;
6007
6008                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6009                 unlock_proc_lock(ls, lkb);
6010                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6011         }
6012
6013         spin_lock(&proc->locks_spin);
6014         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6015                 list_del_init(&lkb->lkb_ownqueue);
6016                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6017                 dlm_put_lkb(lkb);
6018         }
6019         spin_unlock(&proc->locks_spin);
6020
6021         spin_lock(&proc->asts_spin);
6022         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6023                 dlm_purge_lkb_callbacks(lkb);
6024                 list_del_init(&lkb->lkb_cb_list);
6025                 dlm_put_lkb(lkb);
6026         }
6027         spin_unlock(&proc->asts_spin);
6028 }
6029
6030 /* pid of 0 means purge all orphans */
6031
6032 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6033 {
6034         struct dlm_lkb *lkb, *safe;
6035
6036         mutex_lock(&ls->ls_orphans_mutex);
6037         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6038                 if (pid && lkb->lkb_ownpid != pid)
6039                         continue;
6040                 unlock_proc_lock(ls, lkb);
6041                 list_del_init(&lkb->lkb_ownqueue);
6042                 dlm_put_lkb(lkb);
6043         }
6044         mutex_unlock(&ls->ls_orphans_mutex);
6045 }
6046
6047 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6048 {
6049         struct dlm_message *ms;
6050         struct dlm_mhandle *mh;
6051         int error;
6052
6053         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6054                                 DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
6055         if (error)
6056                 return error;
6057         ms->m_nodeid = cpu_to_le32(nodeid);
6058         ms->m_pid = cpu_to_le32(pid);
6059
6060         return send_message(mh, ms, NULL, 0);
6061 }
6062
6063 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6064                    int nodeid, int pid)
6065 {
6066         int error = 0;
6067
6068         if (nodeid && (nodeid != dlm_our_nodeid())) {
6069                 error = send_purge(ls, nodeid, pid);
6070         } else {
6071                 dlm_lock_recovery(ls);
6072                 if (pid == current->pid)
6073                         purge_proc_locks(ls, proc);
6074                 else
6075                         do_purge(ls, nodeid, pid);
6076                 dlm_unlock_recovery(ls);
6077         }
6078         return error;
6079 }
6080
6081 /* debug functionality */
6082 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6083                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6084 {
6085         struct dlm_lksb *lksb;
6086         struct dlm_lkb *lkb;
6087         struct dlm_rsb *r;
6088         int error;
6089
6090         /* we currently can't set a valid user lock */
6091         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6092                 return -EOPNOTSUPP;
6093
6094         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6095         if (!lksb)
6096                 return -ENOMEM;
6097
6098         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6099         if (error) {
6100                 kfree(lksb);
6101                 return error;
6102         }
6103
6104         dlm_set_dflags_val(lkb, lkb_dflags);
6105         lkb->lkb_nodeid = lkb_nodeid;
6106         lkb->lkb_lksb = lksb;
6107         /* user specific pointer, just don't have it NULL for kernel locks */
6108         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6109                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6110
6111         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6112         if (error) {
6113                 kfree(lksb);
6114                 __put_lkb(ls, lkb);
6115                 return error;
6116         }
6117
6118         lock_rsb(r);
6119         attach_lkb(r, lkb);
6120         add_lkb(r, lkb, lkb_status);
6121         unlock_rsb(r);
6122         put_rsb(r);
6123
6124         return 0;
6125 }
6126
6127 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6128                                  int mstype, int to_nodeid)
6129 {
6130         struct dlm_lkb *lkb;
6131         int error;
6132
6133         error = find_lkb(ls, lkb_id, &lkb);
6134         if (error)
6135                 return error;
6136
6137         error = add_to_waiters(lkb, mstype, to_nodeid);
6138         dlm_put_lkb(lkb);
6139         return error;
6140 }
6141
This page took 0.374142 seconds and 4 git commands to generate.