]> Git Repo - J-linux.git/blob - fs/dlm/lock.c
Merge tag 'vfs-6.13-rc7.fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs
[J-linux.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325         return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333         /* inactive rsbs are not ref counted */
334         WARN_ON(rsb_flag(r, RSB_INACTIVE));
335         kref_get(&r->res_ref);
336 }
337
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340         hold_rsb(r);
341 }
342
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348         if (refcount_dec_not_one(r))
349                 return false;
350
351         write_lock_bh(lock);
352         if (!refcount_dec_and_test(r)) {
353                 write_unlock_bh(lock);
354                 return false;
355         }
356
357         return true;
358 }
359
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362                                              void (*release)(struct kref *kref),
363                                              rwlock_t *lock)
364 {
365         if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366                 release(kref);
367                 return 1;
368         }
369
370         return 0;
371 }
372
373 static void put_rsb(struct dlm_rsb *r)
374 {
375         struct dlm_ls *ls = r->res_ls;
376         int rv;
377
378         rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379                                         &ls->ls_rsbtbl_lock);
380         if (rv)
381                 write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386         put_rsb(r);
387 }
388
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395         if (!dlm_locking_stopped(ls))
396                 mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407         struct dlm_rsb *r;
408
409         spin_lock_bh(&ls->ls_scan_lock);
410         r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411                                      res_scan_list);
412         if (r && !timer_pending(&ls->ls_scan_timer))
413                 enable_scan_timer(ls, r->res_toss_time);
414         spin_unlock_bh(&ls->ls_scan_lock);
415 }
416
417 /* ls_rsbtbl_lock must be held */
418
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421         struct dlm_rsb *first;
422
423         /* active rsbs should never be on the scan list */
424         WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425
426         spin_lock_bh(&ls->ls_scan_lock);
427         r->res_toss_time = 0;
428
429         /* if the rsb is not queued do nothing */
430         if (list_empty(&r->res_scan_list))
431                 goto out;
432
433         /* get the first element before delete */
434         first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435                                  res_scan_list);
436         list_del_init(&r->res_scan_list);
437         /* check if the first element was the rsb we deleted */
438         if (first == r) {
439                 /* try to get the new first element, if the list
440                  * is empty now try to delete the timer, if we are
441                  * too late we don't care.
442                  *
443                  * if the list isn't empty and a new first element got
444                  * in place, set the new timer expire time.
445                  */
446                 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447                                                  res_scan_list);
448                 if (!first)
449                         timer_delete(&ls->ls_scan_timer);
450                 else
451                         enable_scan_timer(ls, first->res_toss_time);
452         }
453
454 out:
455         spin_unlock_bh(&ls->ls_scan_lock);
456 }
457
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460         int our_nodeid = dlm_our_nodeid();
461         struct dlm_rsb *first;
462
463         /* A dir record for a remote master rsb should never be on the scan list. */
464         WARN_ON(!dlm_no_directory(ls) &&
465                 (r->res_master_nodeid != our_nodeid) &&
466                 (dlm_dir_nodeid(r) == our_nodeid));
467
468         /* An active rsb should never be on the scan list. */
469         WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470
471         /* An rsb should not already be on the scan list. */
472         WARN_ON(!list_empty(&r->res_scan_list));
473
474         spin_lock_bh(&ls->ls_scan_lock);
475         /* set the new rsb absolute expire time in the rsb */
476         r->res_toss_time = rsb_toss_jiffies();
477         if (list_empty(&ls->ls_scan_list)) {
478                 /* if the queue is empty add the element and it's
479                  * our new expire time
480                  */
481                 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482                 enable_scan_timer(ls, r->res_toss_time);
483         } else {
484                 /* try to get the maybe new first element and then add
485                  * to this rsb with the oldest expire time to the end
486                  * of the queue. If the list was empty before this
487                  * rsb expire time is our next expiration if it wasn't
488                  * the now new first elemet is our new expiration time
489                  */
490                 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491                                                  res_scan_list);
492                 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493                 if (!first)
494                         enable_scan_timer(ls, r->res_toss_time);
495                 else
496                         enable_scan_timer(ls, first->res_toss_time);
497         }
498         spin_unlock_bh(&ls->ls_scan_lock);
499 }
500
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY    (jiffies + msecs_to_jiffies(250))
507
508 /* Called by lockspace scan_timer to free unused rsb's. */
509
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512         struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513         int our_nodeid = dlm_our_nodeid();
514         struct dlm_rsb *r;
515         int rv;
516
517         while (1) {
518                 /* interrupting point to leave iteration when
519                  * recovery waits for timer_delete_sync(), recovery
520                  * will take care to delete everything in scan list.
521                  */
522                 if (dlm_locking_stopped(ls))
523                         break;
524
525                 rv = spin_trylock(&ls->ls_scan_lock);
526                 if (!rv) {
527                         /* rearm again try timer */
528                         enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529                         break;
530                 }
531
532                 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533                                              res_scan_list);
534                 if (!r) {
535                         /* the next add_scan will enable the timer again */
536                         spin_unlock(&ls->ls_scan_lock);
537                         break;
538                 }
539
540                 /*
541                  * If the first rsb is not yet expired, then stop because the
542                  * list is sorted with nearest expiration first.
543                  */
544                 if (time_before(jiffies, r->res_toss_time)) {
545                         /* rearm with the next rsb to expire in the future */
546                         enable_scan_timer(ls, r->res_toss_time);
547                         spin_unlock(&ls->ls_scan_lock);
548                         break;
549                 }
550
551                 /* in find_rsb_dir/nodir there is a reverse order of this
552                  * lock, however this is only a trylock if we hit some
553                  * possible contention we try it again.
554                  */
555                 rv = write_trylock(&ls->ls_rsbtbl_lock);
556                 if (!rv) {
557                         spin_unlock(&ls->ls_scan_lock);
558                         /* rearm again try timer */
559                         enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560                         break;
561                 }
562
563                 list_del(&r->res_slow_list);
564                 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565                                        dlm_rhash_rsb_params);
566                 rsb_clear_flag(r, RSB_HASHED);
567
568                 /* ls_rsbtbl_lock is not needed when calling send_remove() */
569                 write_unlock(&ls->ls_rsbtbl_lock);
570
571                 list_del_init(&r->res_scan_list);
572                 spin_unlock(&ls->ls_scan_lock);
573
574                 /* An rsb that is a dir record for a remote master rsb
575                  * cannot be removed, and should not have a timer enabled.
576                  */
577                 WARN_ON(!dlm_no_directory(ls) &&
578                         (r->res_master_nodeid != our_nodeid) &&
579                         (dlm_dir_nodeid(r) == our_nodeid));
580
581                 /* We're the master of this rsb but we're not
582                  * the directory record, so we need to tell the
583                  * dir node to remove the dir record
584                  */
585                 if (!dlm_no_directory(ls) &&
586                     (r->res_master_nodeid == our_nodeid) &&
587                     (dlm_dir_nodeid(r) != our_nodeid))
588                         send_remove(r);
589
590                 free_inactive_rsb(r);
591         }
592 }
593
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599                           struct dlm_rsb **r_ret)
600 {
601         struct dlm_rsb *r;
602
603         r = dlm_allocate_rsb();
604         if (!r)
605                 return -ENOMEM;
606
607         r->res_ls = ls;
608         r->res_length = len;
609         memcpy(r->res_name, name, len);
610         spin_lock_init(&r->res_lock);
611
612         INIT_LIST_HEAD(&r->res_lookup);
613         INIT_LIST_HEAD(&r->res_grantqueue);
614         INIT_LIST_HEAD(&r->res_convertqueue);
615         INIT_LIST_HEAD(&r->res_waitqueue);
616         INIT_LIST_HEAD(&r->res_root_list);
617         INIT_LIST_HEAD(&r->res_scan_list);
618         INIT_LIST_HEAD(&r->res_recover_list);
619         INIT_LIST_HEAD(&r->res_masters_list);
620
621         *r_ret = r;
622         return 0;
623 }
624
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626                         struct dlm_rsb **r_ret)
627 {
628         char key[DLM_RESNAME_MAXLEN] = {};
629
630         memcpy(key, name, len);
631         *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632         if (*r_ret)
633                 return 0;
634
635         return -EBADR;
636 }
637
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640         int rv;
641
642         rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643                                     dlm_rhash_rsb_params);
644         if (!rv)
645                 rsb_set_flag(rsb, RSB_HASHED);
646
647         return rv;
648 }
649
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695                         uint32_t hash, int dir_nodeid, int from_nodeid,
696                         unsigned int flags, struct dlm_rsb **r_ret)
697 {
698         struct dlm_rsb *r = NULL;
699         int our_nodeid = dlm_our_nodeid();
700         int from_local = 0;
701         int from_other = 0;
702         int from_dir = 0;
703         int create = 0;
704         int error;
705
706         if (flags & R_RECEIVE_REQUEST) {
707                 if (from_nodeid == dir_nodeid)
708                         from_dir = 1;
709                 else
710                         from_other = 1;
711         } else if (flags & R_REQUEST) {
712                 from_local = 1;
713         }
714
715         /*
716          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717          * from_nodeid has sent us a lock in dlm_recover_locks, believing
718          * we're the new master.  Our local recovery may not have set
719          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720          * create the rsb; dlm_recover_process_copy() will handle EBADR
721          * by resending.
722          *
723          * If someone sends us a request, we are the dir node, and we do
724          * not find the rsb anywhere, then recreate it.  This happens if
725          * someone sends us a request after we have removed/freed an rsb.
726          * (They sent a request instead of lookup because they are using
727          * an rsb taken from their scan list.)
728          */
729
730         if (from_local || from_dir ||
731             (from_other && (dir_nodeid == our_nodeid))) {
732                 create = 1;
733         }
734
735  retry:
736         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737         if (error)
738                 goto do_new;
739
740         /* check if the rsb is active under read lock - likely path */
741         read_lock_bh(&ls->ls_rsbtbl_lock);
742         if (!rsb_flag(r, RSB_HASHED)) {
743                 read_unlock_bh(&ls->ls_rsbtbl_lock);
744                 goto do_new;
745         }
746         
747         /*
748          * rsb is active, so we can't check master_nodeid without lock_rsb.
749          */
750
751         if (rsb_flag(r, RSB_INACTIVE)) {
752                 read_unlock_bh(&ls->ls_rsbtbl_lock);
753                 goto do_inactive;
754         }
755
756         kref_get(&r->res_ref);
757         read_unlock_bh(&ls->ls_rsbtbl_lock);
758         goto out;
759
760
761  do_inactive:
762         write_lock_bh(&ls->ls_rsbtbl_lock);
763
764         /*
765          * The expectation here is that the rsb will have HASHED and
766          * INACTIVE flags set, and that the rsb can be moved from
767          * inactive back to active again.  However, between releasing
768          * the read lock and acquiring the write lock, this rsb could
769          * have been removed from rsbtbl, and had HASHED cleared, to
770          * be freed.  To deal with this case, we would normally need
771          * to repeat dlm_search_rsb_tree while holding the write lock,
772          * but rcu allows us to simply check the HASHED flag, because
773          * the rcu read lock means the rsb will not be freed yet.
774          * If the HASHED flag is not set, then the rsb is being freed,
775          * so we add a new rsb struct.  If the HASHED flag is set,
776          * and INACTIVE is not set, it means another thread has
777          * made the rsb active, as we're expecting to do here, and
778          * we just repeat the lookup (this will be very unlikely.)
779          */
780         if (rsb_flag(r, RSB_HASHED)) {
781                 if (!rsb_flag(r, RSB_INACTIVE)) {
782                         write_unlock_bh(&ls->ls_rsbtbl_lock);
783                         goto retry;
784                 }
785         } else {
786                 write_unlock_bh(&ls->ls_rsbtbl_lock);
787                 goto do_new;
788         }
789
790         /*
791          * rsb found inactive (master_nodeid may be out of date unless
792          * we are the dir_nodeid or were the master)  No other thread
793          * is using this rsb because it's inactive, so we can
794          * look at or update res_master_nodeid without lock_rsb.
795          */
796
797         if ((r->res_master_nodeid != our_nodeid) && from_other) {
798                 /* our rsb was not master, and another node (not the dir node)
799                    has sent us a request */
800                 log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
801                           from_nodeid, r->res_master_nodeid, dir_nodeid,
802                           r->res_name);
803                 write_unlock_bh(&ls->ls_rsbtbl_lock);
804                 error = -ENOTBLK;
805                 goto out;
806         }
807
808         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
809                 /* don't think this should ever happen */
810                 log_error(ls, "find_rsb inactive from_dir %d master %d",
811                           from_nodeid, r->res_master_nodeid);
812                 dlm_print_rsb(r);
813                 /* fix it and go on */
814                 r->res_master_nodeid = our_nodeid;
815                 r->res_nodeid = 0;
816                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
817                 r->res_first_lkid = 0;
818         }
819
820         if (from_local && (r->res_master_nodeid != our_nodeid)) {
821                 /* Because we have held no locks on this rsb,
822                    res_master_nodeid could have become stale. */
823                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
824                 r->res_first_lkid = 0;
825         }
826
827         /* A dir record will not be on the scan list. */
828         if (r->res_dir_nodeid != our_nodeid)
829                 del_scan(ls, r);
830         list_move(&r->res_slow_list, &ls->ls_slow_active);
831         rsb_clear_flag(r, RSB_INACTIVE);
832         kref_init(&r->res_ref); /* ref is now used in active state */
833         write_unlock_bh(&ls->ls_rsbtbl_lock);
834
835         goto out;
836
837
838  do_new:
839         /*
840          * rsb not found
841          */
842
843         if (error == -EBADR && !create)
844                 goto out;
845
846         error = get_rsb_struct(ls, name, len, &r);
847         if (WARN_ON_ONCE(error))
848                 goto out;
849
850         r->res_hash = hash;
851         r->res_dir_nodeid = dir_nodeid;
852         kref_init(&r->res_ref);
853
854         if (from_dir) {
855                 /* want to see how often this happens */
856                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
857                           from_nodeid, r->res_name);
858                 r->res_master_nodeid = our_nodeid;
859                 r->res_nodeid = 0;
860                 goto out_add;
861         }
862
863         if (from_other && (dir_nodeid != our_nodeid)) {
864                 /* should never happen */
865                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
866                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
867                 dlm_free_rsb(r);
868                 r = NULL;
869                 error = -ENOTBLK;
870                 goto out;
871         }
872
873         if (from_other) {
874                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
875                           from_nodeid, dir_nodeid, r->res_name);
876         }
877
878         if (dir_nodeid == our_nodeid) {
879                 /* When we are the dir nodeid, we can set the master
880                    node immediately */
881                 r->res_master_nodeid = our_nodeid;
882                 r->res_nodeid = 0;
883         } else {
884                 /* set_master will send_lookup to dir_nodeid */
885                 r->res_master_nodeid = 0;
886                 r->res_nodeid = -1;
887         }
888
889  out_add:
890
891         write_lock_bh(&ls->ls_rsbtbl_lock);
892         error = rsb_insert(r, &ls->ls_rsbtbl);
893         if (error == -EEXIST) {
894                 /* somebody else was faster and it seems the
895                  * rsb exists now, we do a whole relookup
896                  */
897                 write_unlock_bh(&ls->ls_rsbtbl_lock);
898                 dlm_free_rsb(r);
899                 goto retry;
900         } else if (!error) {
901                 list_add(&r->res_slow_list, &ls->ls_slow_active);
902         }
903         write_unlock_bh(&ls->ls_rsbtbl_lock);
904  out:
905         *r_ret = r;
906         return error;
907 }
908
909 /* During recovery, other nodes can send us new MSTCPY locks (from
910    dlm_recover_locks) before we've made ourself master (in
911    dlm_recover_masters). */
912
913 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
914                           uint32_t hash, int dir_nodeid, int from_nodeid,
915                           unsigned int flags, struct dlm_rsb **r_ret)
916 {
917         struct dlm_rsb *r = NULL;
918         int our_nodeid = dlm_our_nodeid();
919         int recover = (flags & R_RECEIVE_RECOVER);
920         int error;
921
922  retry:
923         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
924         if (error)
925                 goto do_new;
926
927         /* check if the rsb is in active state under read lock - likely path */
928         read_lock_bh(&ls->ls_rsbtbl_lock);
929         if (!rsb_flag(r, RSB_HASHED)) {
930                 read_unlock_bh(&ls->ls_rsbtbl_lock);
931                 goto do_new;
932         }
933
934         if (rsb_flag(r, RSB_INACTIVE)) {
935                 read_unlock_bh(&ls->ls_rsbtbl_lock);
936                 goto do_inactive;
937         }
938
939         /*
940          * rsb is active, so we can't check master_nodeid without lock_rsb.
941          */
942
943         kref_get(&r->res_ref);
944         read_unlock_bh(&ls->ls_rsbtbl_lock);
945
946         goto out;
947
948
949  do_inactive:
950         write_lock_bh(&ls->ls_rsbtbl_lock);
951
952         /* See comment in find_rsb_dir. */
953         if (rsb_flag(r, RSB_HASHED)) {
954                 if (!rsb_flag(r, RSB_INACTIVE)) {
955                         write_unlock_bh(&ls->ls_rsbtbl_lock);
956                         goto retry;
957                 }
958         } else {
959                 write_unlock_bh(&ls->ls_rsbtbl_lock);
960                 goto do_new;
961         }
962
963
964         /*
965          * rsb found inactive. No other thread is using this rsb because
966          * it's inactive, so we can look at or update res_master_nodeid
967          * without lock_rsb.
968          */
969
970         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
971                 /* our rsb is not master, and another node has sent us a
972                    request; this should never happen */
973                 log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
974                           from_nodeid, r->res_master_nodeid, dir_nodeid);
975                 dlm_print_rsb(r);
976                 write_unlock_bh(&ls->ls_rsbtbl_lock);
977                 error = -ENOTBLK;
978                 goto out;
979         }
980
981         if (!recover && (r->res_master_nodeid != our_nodeid) &&
982             (dir_nodeid == our_nodeid)) {
983                 /* our rsb is not master, and we are dir; may as well fix it;
984                    this should never happen */
985                 log_error(ls, "find_rsb inactive our %d master %d dir %d",
986                           our_nodeid, r->res_master_nodeid, dir_nodeid);
987                 dlm_print_rsb(r);
988                 r->res_master_nodeid = our_nodeid;
989                 r->res_nodeid = 0;
990         }
991
992         list_move(&r->res_slow_list, &ls->ls_slow_active);
993         rsb_clear_flag(r, RSB_INACTIVE);
994         kref_init(&r->res_ref);
995         del_scan(ls, r);
996         write_unlock_bh(&ls->ls_rsbtbl_lock);
997
998         goto out;
999
1000
1001  do_new:
1002         /*
1003          * rsb not found
1004          */
1005
1006         error = get_rsb_struct(ls, name, len, &r);
1007         if (WARN_ON_ONCE(error))
1008                 goto out;
1009
1010         r->res_hash = hash;
1011         r->res_dir_nodeid = dir_nodeid;
1012         r->res_master_nodeid = dir_nodeid;
1013         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1014         kref_init(&r->res_ref);
1015
1016         write_lock_bh(&ls->ls_rsbtbl_lock);
1017         error = rsb_insert(r, &ls->ls_rsbtbl);
1018         if (error == -EEXIST) {
1019                 /* somebody else was faster and it seems the
1020                  * rsb exists now, we do a whole relookup
1021                  */
1022                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1023                 dlm_free_rsb(r);
1024                 goto retry;
1025         } else if (!error) {
1026                 list_add(&r->res_slow_list, &ls->ls_slow_active);
1027         }
1028         write_unlock_bh(&ls->ls_rsbtbl_lock);
1029
1030  out:
1031         *r_ret = r;
1032         return error;
1033 }
1034
1035 /*
1036  * rsb rcu usage
1037  *
1038  * While rcu read lock is held, the rsb cannot be freed,
1039  * which allows a lookup optimization.
1040  *
1041  * Two threads are accessing the same rsb concurrently,
1042  * the first (A) is trying to use the rsb, the second (B)
1043  * is trying to free the rsb.
1044  *
1045  * thread A                 thread B
1046  * (trying to use rsb)      (trying to free rsb)
1047  *
1048  * A1. rcu read lock
1049  * A2. rsbtbl read lock
1050  * A3. look up rsb in rsbtbl
1051  * A4. rsbtbl read unlock
1052  *                          B1. rsbtbl write lock
1053  *                          B2. look up rsb in rsbtbl
1054  *                          B3. remove rsb from rsbtbl
1055  *                          B4. clear rsb HASHED flag
1056  *                          B5. rsbtbl write unlock
1057  *                          B6. begin freeing rsb using rcu...
1058  *
1059  * (rsb is inactive, so try to make it active again)
1060  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1061  * A6. the rsb HASHED flag is not set, which it means the rsb
1062  *     is being removed from rsbtbl and freed, so don't use it.
1063  * A7. rcu read unlock
1064  *
1065  *                          B7. ...finish freeing rsb using rcu
1066  * A8. create a new rsb
1067  *
1068  * Without the rcu optimization, steps A5-8 would need to do
1069  * an extra rsbtbl lookup:
1070  * A5. rsbtbl write lock
1071  * A6. look up rsb in rsbtbl, not found
1072  * A7. rsbtbl write unlock
1073  * A8. create a new rsb
1074  */
1075
1076 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1077                     int from_nodeid, unsigned int flags,
1078                     struct dlm_rsb **r_ret)
1079 {
1080         int dir_nodeid;
1081         uint32_t hash;
1082         int rv;
1083
1084         if (len > DLM_RESNAME_MAXLEN)
1085                 return -EINVAL;
1086
1087         hash = jhash(name, len, 0);
1088         dir_nodeid = dlm_hash2nodeid(ls, hash);
1089
1090         rcu_read_lock();
1091         if (dlm_no_directory(ls))
1092                 rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1093                                       from_nodeid, flags, r_ret);
1094         else
1095                 rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1096                                     from_nodeid, flags, r_ret);
1097         rcu_read_unlock();
1098         return rv;
1099 }
1100
1101 /* we have received a request and found that res_master_nodeid != our_nodeid,
1102    so we need to return an error or make ourself the master */
1103
1104 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1105                                   int from_nodeid)
1106 {
1107         if (dlm_no_directory(ls)) {
1108                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1109                           from_nodeid, r->res_master_nodeid,
1110                           r->res_dir_nodeid);
1111                 dlm_print_rsb(r);
1112                 return -ENOTBLK;
1113         }
1114
1115         if (from_nodeid != r->res_dir_nodeid) {
1116                 /* our rsb is not master, and another node (not the dir node)
1117                    has sent us a request.  this is much more common when our
1118                    master_nodeid is zero, so limit debug to non-zero.  */
1119
1120                 if (r->res_master_nodeid) {
1121                         log_debug(ls, "validate master from_other %d master %d "
1122                                   "dir %d first %x %s", from_nodeid,
1123                                   r->res_master_nodeid, r->res_dir_nodeid,
1124                                   r->res_first_lkid, r->res_name);
1125                 }
1126                 return -ENOTBLK;
1127         } else {
1128                 /* our rsb is not master, but the dir nodeid has sent us a
1129                    request; this could happen with master 0 / res_nodeid -1 */
1130
1131                 if (r->res_master_nodeid) {
1132                         log_error(ls, "validate master from_dir %d master %d "
1133                                   "first %x %s",
1134                                   from_nodeid, r->res_master_nodeid,
1135                                   r->res_first_lkid, r->res_name);
1136                 }
1137
1138                 r->res_master_nodeid = dlm_our_nodeid();
1139                 r->res_nodeid = 0;
1140                 return 0;
1141         }
1142 }
1143
1144 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1145                                 int from_nodeid, bool is_inactive, unsigned int flags,
1146                                 int *r_nodeid, int *result)
1147 {
1148         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1149         int from_master = (flags & DLM_LU_RECOVER_DIR);
1150
1151         if (r->res_dir_nodeid != our_nodeid) {
1152                 /* should not happen, but may as well fix it and carry on */
1153                 log_error(ls, "%s res_dir %d our %d %s", __func__,
1154                           r->res_dir_nodeid, our_nodeid, r->res_name);
1155                 r->res_dir_nodeid = our_nodeid;
1156         }
1157
1158         if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1159                 /* Recovery uses this function to set a new master when
1160                  * the previous master failed.  Setting NEW_MASTER will
1161                  * force dlm_recover_masters to call recover_master on this
1162                  * rsb even though the res_nodeid is no longer removed.
1163                  */
1164
1165                 r->res_master_nodeid = from_nodeid;
1166                 r->res_nodeid = from_nodeid;
1167                 rsb_set_flag(r, RSB_NEW_MASTER);
1168
1169                 if (is_inactive) {
1170                         /* I don't think we should ever find it inactive. */
1171                         log_error(ls, "%s fix_master inactive", __func__);
1172                         dlm_dump_rsb(r);
1173                 }
1174         }
1175
1176         if (from_master && (r->res_master_nodeid != from_nodeid)) {
1177                 /* this will happen if from_nodeid became master during
1178                  * a previous recovery cycle, and we aborted the previous
1179                  * cycle before recovering this master value
1180                  */
1181
1182                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1183                           __func__, from_nodeid, r->res_master_nodeid,
1184                           r->res_nodeid, r->res_first_lkid, r->res_name);
1185
1186                 if (r->res_master_nodeid == our_nodeid) {
1187                         log_error(ls, "from_master %d our_master", from_nodeid);
1188                         dlm_dump_rsb(r);
1189                         goto ret_assign;
1190                 }
1191
1192                 r->res_master_nodeid = from_nodeid;
1193                 r->res_nodeid = from_nodeid;
1194                 rsb_set_flag(r, RSB_NEW_MASTER);
1195         }
1196
1197         if (!r->res_master_nodeid) {
1198                 /* this will happen if recovery happens while we're looking
1199                  * up the master for this rsb
1200                  */
1201
1202                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1203                           from_nodeid, r->res_first_lkid, r->res_name);
1204                 r->res_master_nodeid = from_nodeid;
1205                 r->res_nodeid = from_nodeid;
1206         }
1207
1208         if (!from_master && !fix_master &&
1209             (r->res_master_nodeid == from_nodeid)) {
1210                 /* this can happen when the master sends remove, the dir node
1211                  * finds the rsb on the active list and ignores the remove,
1212                  * and the former master sends a lookup
1213                  */
1214
1215                 log_limit(ls, "%s from master %d flags %x first %x %s",
1216                           __func__, from_nodeid, flags, r->res_first_lkid,
1217                           r->res_name);
1218         }
1219
1220  ret_assign:
1221         *r_nodeid = r->res_master_nodeid;
1222         if (result)
1223                 *result = DLM_LU_MATCH;
1224 }
1225
1226 /*
1227  * We're the dir node for this res and another node wants to know the
1228  * master nodeid.  During normal operation (non recovery) this is only
1229  * called from receive_lookup(); master lookups when the local node is
1230  * the dir node are done by find_rsb().
1231  *
1232  * normal operation, we are the dir node for a resource
1233  * . _request_lock
1234  * . set_master
1235  * . send_lookup
1236  * . receive_lookup
1237  * . dlm_master_lookup flags 0
1238  *
1239  * recover directory, we are rebuilding dir for all resources
1240  * . dlm_recover_directory
1241  * . dlm_rcom_names
1242  *   remote node sends back the rsb names it is master of and we are dir of
1243  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1244  *   we either create new rsb setting remote node as master, or find existing
1245  *   rsb and set master to be the remote node.
1246  *
1247  * recover masters, we are finding the new master for resources
1248  * . dlm_recover_masters
1249  * . recover_master
1250  * . dlm_send_rcom_lookup
1251  * . receive_rcom_lookup
1252  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1253  */
1254
1255 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1256                               int len, unsigned int flags, int *r_nodeid, int *result)
1257 {
1258         struct dlm_rsb *r = NULL;
1259         uint32_t hash;
1260         int our_nodeid = dlm_our_nodeid();
1261         int dir_nodeid, error;
1262
1263         if (len > DLM_RESNAME_MAXLEN)
1264                 return -EINVAL;
1265
1266         if (from_nodeid == our_nodeid) {
1267                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1268                           our_nodeid, flags);
1269                 return -EINVAL;
1270         }
1271
1272         hash = jhash(name, len, 0);
1273         dir_nodeid = dlm_hash2nodeid(ls, hash);
1274         if (dir_nodeid != our_nodeid) {
1275                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1276                           from_nodeid, dir_nodeid, our_nodeid, hash,
1277                           ls->ls_num_nodes);
1278                 *r_nodeid = -1;
1279                 return -EINVAL;
1280         }
1281
1282  retry:
1283         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1284         if (error)
1285                 goto not_found;
1286
1287         /* check if the rsb is active under read lock - likely path */
1288         read_lock_bh(&ls->ls_rsbtbl_lock);
1289         if (!rsb_flag(r, RSB_HASHED)) {
1290                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1291                 goto not_found;
1292         }
1293
1294         if (rsb_flag(r, RSB_INACTIVE)) {
1295                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1296                 goto do_inactive;
1297         }
1298
1299         /* because the rsb is active, we need to lock_rsb before
1300          * checking/changing re_master_nodeid
1301          */
1302
1303         hold_rsb(r);
1304         read_unlock_bh(&ls->ls_rsbtbl_lock);
1305         lock_rsb(r);
1306
1307         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1308                             flags, r_nodeid, result);
1309
1310         /* the rsb was active */
1311         unlock_rsb(r);
1312         put_rsb(r);
1313
1314         return 0;
1315
1316  do_inactive:
1317         /* unlikely path - check if still part of ls_rsbtbl */
1318         write_lock_bh(&ls->ls_rsbtbl_lock);
1319
1320         /* see comment in find_rsb_dir */
1321         if (rsb_flag(r, RSB_HASHED)) {
1322                 if (!rsb_flag(r, RSB_INACTIVE)) {
1323                         write_unlock_bh(&ls->ls_rsbtbl_lock);
1324                         /* something as changed, very unlikely but
1325                          * try again
1326                          */
1327                         goto retry;
1328                 }
1329         } else {
1330                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1331                 goto not_found;
1332         }
1333
1334         /* because the rsb is inactive, it's not refcounted and lock_rsb
1335            is not used, but is protected by the rsbtbl lock */
1336
1337         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1338                             r_nodeid, result);
1339
1340         /* A dir record rsb should never be on scan list. */
1341         /* Try to fix this with del_scan? */
1342         WARN_ON(!list_empty(&r->res_scan_list));
1343
1344         write_unlock_bh(&ls->ls_rsbtbl_lock);
1345
1346         return 0;
1347
1348  not_found:
1349         error = get_rsb_struct(ls, name, len, &r);
1350         if (WARN_ON_ONCE(error))
1351                 goto out;
1352
1353         r->res_hash = hash;
1354         r->res_dir_nodeid = our_nodeid;
1355         r->res_master_nodeid = from_nodeid;
1356         r->res_nodeid = from_nodeid;
1357         rsb_set_flag(r, RSB_INACTIVE);
1358
1359         write_lock_bh(&ls->ls_rsbtbl_lock);
1360         error = rsb_insert(r, &ls->ls_rsbtbl);
1361         if (error == -EEXIST) {
1362                 /* somebody else was faster and it seems the
1363                  * rsb exists now, we do a whole relookup
1364                  */
1365                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1366                 dlm_free_rsb(r);
1367                 goto retry;
1368         } else if (error) {
1369                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1370                 /* should never happen */
1371                 dlm_free_rsb(r);
1372                 goto retry;
1373         }
1374
1375         list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1376         write_unlock_bh(&ls->ls_rsbtbl_lock);
1377
1378         if (result)
1379                 *result = DLM_LU_ADD;
1380         *r_nodeid = from_nodeid;
1381  out:
1382         return error;
1383 }
1384
1385 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1386                       int len, unsigned int flags, int *r_nodeid, int *result)
1387 {
1388         int rv;
1389         rcu_read_lock();
1390         rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1391         rcu_read_unlock();
1392         return rv;
1393 }
1394
1395 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1396 {
1397         struct dlm_rsb *r;
1398
1399         read_lock_bh(&ls->ls_rsbtbl_lock);
1400         list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1401                 if (r->res_hash == hash)
1402                         dlm_dump_rsb(r);
1403         }
1404         read_unlock_bh(&ls->ls_rsbtbl_lock);
1405 }
1406
1407 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1408 {
1409         struct dlm_rsb *r = NULL;
1410         int error;
1411
1412         rcu_read_lock();
1413         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1414         if (!error)
1415                 goto out;
1416
1417         dlm_dump_rsb(r);
1418  out:
1419         rcu_read_unlock();
1420 }
1421
1422 static void deactivate_rsb(struct kref *kref)
1423 {
1424         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1425         struct dlm_ls *ls = r->res_ls;
1426         int our_nodeid = dlm_our_nodeid();
1427
1428         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1429         rsb_set_flag(r, RSB_INACTIVE);
1430         list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1431
1432         /*
1433          * When the rsb becomes unused:
1434          * - If it's not a dir record for a remote master rsb,
1435          *   then it is put on the scan list to be freed.
1436          * - If it's a dir record for a remote master rsb,
1437          *   then it is kept in the inactive state until
1438          *   receive_remove() from the master node.
1439          */
1440         if (!dlm_no_directory(ls) &&
1441             (r->res_master_nodeid != our_nodeid) &&
1442             (dlm_dir_nodeid(r) != our_nodeid))
1443                 add_scan(ls, r);
1444
1445         if (r->res_lvbptr) {
1446                 dlm_free_lvb(r->res_lvbptr);
1447                 r->res_lvbptr = NULL;
1448         }
1449 }
1450
1451 void free_inactive_rsb(struct dlm_rsb *r)
1452 {
1453         WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1454
1455         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1456         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1457         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1458         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1459         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1460         DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1461         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1462         DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1463
1464         dlm_free_rsb(r);
1465 }
1466
1467 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1468    The rsb must exist as long as any lkb's for it do. */
1469
1470 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472         hold_rsb(r);
1473         lkb->lkb_resource = r;
1474 }
1475
1476 static void detach_lkb(struct dlm_lkb *lkb)
1477 {
1478         if (lkb->lkb_resource) {
1479                 put_rsb(lkb->lkb_resource);
1480                 lkb->lkb_resource = NULL;
1481         }
1482 }
1483
1484 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1485                        unsigned long start, unsigned long end)
1486 {
1487         struct xa_limit limit;
1488         struct dlm_lkb *lkb;
1489         int rv;
1490
1491         limit.max = end;
1492         limit.min = start;
1493
1494         lkb = dlm_allocate_lkb();
1495         if (!lkb)
1496                 return -ENOMEM;
1497
1498         lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1499         lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1500         lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1501         lkb->lkb_nodeid = -1;
1502         lkb->lkb_grmode = DLM_LOCK_IV;
1503         kref_init(&lkb->lkb_ref);
1504         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1505         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1506
1507         write_lock_bh(&ls->ls_lkbxa_lock);
1508         rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1509         write_unlock_bh(&ls->ls_lkbxa_lock);
1510
1511         if (rv < 0) {
1512                 log_error(ls, "create_lkb xa error %d", rv);
1513                 dlm_free_lkb(lkb);
1514                 return rv;
1515         }
1516
1517         *lkb_ret = lkb;
1518         return 0;
1519 }
1520
1521 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1522 {
1523         return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1524 }
1525
1526 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1527 {
1528         struct dlm_lkb *lkb;
1529
1530         rcu_read_lock();
1531         lkb = xa_load(&ls->ls_lkbxa, lkid);
1532         if (lkb) {
1533                 /* check if lkb is still part of lkbxa under lkbxa_lock as
1534                  * the lkb_ref is tight to the lkbxa data structure, see
1535                  * __put_lkb().
1536                  */
1537                 read_lock_bh(&ls->ls_lkbxa_lock);
1538                 if (kref_read(&lkb->lkb_ref))
1539                         kref_get(&lkb->lkb_ref);
1540                 else
1541                         lkb = NULL;
1542                 read_unlock_bh(&ls->ls_lkbxa_lock);
1543         }
1544         rcu_read_unlock();
1545
1546         *lkb_ret = lkb;
1547         return lkb ? 0 : -ENOENT;
1548 }
1549
1550 static void kill_lkb(struct kref *kref)
1551 {
1552         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1553
1554         /* All work is done after the return from kref_put() so we
1555            can release the write_lock before the detach_lkb */
1556
1557         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1558 }
1559
1560 /* __put_lkb() is used when an lkb may not have an rsb attached to
1561    it so we need to provide the lockspace explicitly */
1562
1563 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1564 {
1565         uint32_t lkid = lkb->lkb_id;
1566         int rv;
1567
1568         rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1569                                         &ls->ls_lkbxa_lock);
1570         if (rv) {
1571                 xa_erase(&ls->ls_lkbxa, lkid);
1572                 write_unlock_bh(&ls->ls_lkbxa_lock);
1573
1574                 detach_lkb(lkb);
1575
1576                 /* for local/process lkbs, lvbptr points to caller's lksb */
1577                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1578                         dlm_free_lvb(lkb->lkb_lvbptr);
1579                 dlm_free_lkb(lkb);
1580         }
1581
1582         return rv;
1583 }
1584
1585 int dlm_put_lkb(struct dlm_lkb *lkb)
1586 {
1587         struct dlm_ls *ls;
1588
1589         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1590         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1591
1592         ls = lkb->lkb_resource->res_ls;
1593         return __put_lkb(ls, lkb);
1594 }
1595
1596 /* This is only called to add a reference when the code already holds
1597    a valid reference to the lkb, so there's no need for locking. */
1598
1599 static inline void hold_lkb(struct dlm_lkb *lkb)
1600 {
1601         kref_get(&lkb->lkb_ref);
1602 }
1603
1604 static void unhold_lkb_assert(struct kref *kref)
1605 {
1606         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1607
1608         DLM_ASSERT(false, dlm_print_lkb(lkb););
1609 }
1610
1611 /* This is called when we need to remove a reference and are certain
1612    it's not the last ref.  e.g. del_lkb is always called between a
1613    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1614    put_lkb would work fine, but would involve unnecessary locking */
1615
1616 static inline void unhold_lkb(struct dlm_lkb *lkb)
1617 {
1618         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1619 }
1620
1621 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1622                             int mode)
1623 {
1624         struct dlm_lkb *lkb = NULL, *iter;
1625
1626         list_for_each_entry(iter, head, lkb_statequeue)
1627                 if (iter->lkb_rqmode < mode) {
1628                         lkb = iter;
1629                         list_add_tail(new, &iter->lkb_statequeue);
1630                         break;
1631                 }
1632
1633         if (!lkb)
1634                 list_add_tail(new, head);
1635 }
1636
1637 /* add/remove lkb to rsb's grant/convert/wait queue */
1638
1639 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1640 {
1641         kref_get(&lkb->lkb_ref);
1642
1643         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1644
1645         lkb->lkb_timestamp = ktime_get();
1646
1647         lkb->lkb_status = status;
1648
1649         switch (status) {
1650         case DLM_LKSTS_WAITING:
1651                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1652                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1653                 else
1654                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1655                 break;
1656         case DLM_LKSTS_GRANTED:
1657                 /* convention says granted locks kept in order of grmode */
1658                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1659                                 lkb->lkb_grmode);
1660                 break;
1661         case DLM_LKSTS_CONVERT:
1662                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1663                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1664                 else
1665                         list_add_tail(&lkb->lkb_statequeue,
1666                                       &r->res_convertqueue);
1667                 break;
1668         default:
1669                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1670         }
1671 }
1672
1673 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1674 {
1675         lkb->lkb_status = 0;
1676         list_del(&lkb->lkb_statequeue);
1677         unhold_lkb(lkb);
1678 }
1679
1680 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1681 {
1682         del_lkb(r, lkb);
1683         add_lkb(r, lkb, sts);
1684 }
1685
1686 static int msg_reply_type(int mstype)
1687 {
1688         switch (mstype) {
1689         case DLM_MSG_REQUEST:
1690                 return DLM_MSG_REQUEST_REPLY;
1691         case DLM_MSG_CONVERT:
1692                 return DLM_MSG_CONVERT_REPLY;
1693         case DLM_MSG_UNLOCK:
1694                 return DLM_MSG_UNLOCK_REPLY;
1695         case DLM_MSG_CANCEL:
1696                 return DLM_MSG_CANCEL_REPLY;
1697         case DLM_MSG_LOOKUP:
1698                 return DLM_MSG_LOOKUP_REPLY;
1699         }
1700         return -1;
1701 }
1702
1703 /* add/remove lkb from global waiters list of lkb's waiting for
1704    a reply from a remote node */
1705
1706 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1707 {
1708         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1709
1710         spin_lock_bh(&ls->ls_waiters_lock);
1711         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1712                 switch (mstype) {
1713                 case DLM_MSG_UNLOCK:
1714                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1715                         break;
1716                 case DLM_MSG_CANCEL:
1717                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1718                         break;
1719                 default:
1720                         /* should never happen as validate_lock_args() checks
1721                          * on lkb_wait_type and validate_unlock_args() only
1722                          * creates UNLOCK or CANCEL messages.
1723                          */
1724                         WARN_ON_ONCE(1);
1725                         goto out;
1726                 }
1727                 lkb->lkb_wait_count++;
1728                 hold_lkb(lkb);
1729
1730                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1731                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1732                           lkb->lkb_wait_count, dlm_iflags_val(lkb));
1733                 goto out;
1734         }
1735
1736         DLM_ASSERT(!lkb->lkb_wait_count,
1737                    dlm_print_lkb(lkb);
1738                    printk("wait_count %d\n", lkb->lkb_wait_count););
1739
1740         lkb->lkb_wait_count++;
1741         lkb->lkb_wait_type = mstype;
1742         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1743         hold_lkb(lkb);
1744         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1745  out:
1746         spin_unlock_bh(&ls->ls_waiters_lock);
1747 }
1748
1749 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1750    list as part of process_requestqueue (e.g. a lookup that has an optimized
1751    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1752    set RESEND and dlm_recover_waiters_post() */
1753
1754 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1755                                 const struct dlm_message *ms)
1756 {
1757         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1758         int overlap_done = 0;
1759
1760         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1761             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1762                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1763                 overlap_done = 1;
1764                 goto out_del;
1765         }
1766
1767         if (mstype == DLM_MSG_CANCEL_REPLY &&
1768             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1769                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1770                 overlap_done = 1;
1771                 goto out_del;
1772         }
1773
1774         /* Cancel state was preemptively cleared by a successful convert,
1775            see next comment, nothing to do. */
1776
1777         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1778             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1779                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1780                           lkb->lkb_id, lkb->lkb_wait_type);
1781                 return -1;
1782         }
1783
1784         /* Remove for the convert reply, and premptively remove for the
1785            cancel reply.  A convert has been granted while there's still
1786            an outstanding cancel on it (the cancel is moot and the result
1787            in the cancel reply should be 0).  We preempt the cancel reply
1788            because the app gets the convert result and then can follow up
1789            with another op, like convert.  This subsequent op would see the
1790            lingering state of the cancel and fail with -EBUSY. */
1791
1792         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1793             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1794             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1795                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1796                           lkb->lkb_id);
1797                 lkb->lkb_wait_type = 0;
1798                 lkb->lkb_wait_count--;
1799                 unhold_lkb(lkb);
1800                 goto out_del;
1801         }
1802
1803         /* N.B. type of reply may not always correspond to type of original
1804            msg due to lookup->request optimization, verify others? */
1805
1806         if (lkb->lkb_wait_type) {
1807                 lkb->lkb_wait_type = 0;
1808                 goto out_del;
1809         }
1810
1811         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1812                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1813                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1814         return -1;
1815
1816  out_del:
1817         /* the force-unlock/cancel has completed and we haven't recvd a reply
1818            to the op that was in progress prior to the unlock/cancel; we
1819            give up on any reply to the earlier op.  FIXME: not sure when/how
1820            this would happen */
1821
1822         if (overlap_done && lkb->lkb_wait_type) {
1823                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1824                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1825                 lkb->lkb_wait_count--;
1826                 unhold_lkb(lkb);
1827                 lkb->lkb_wait_type = 0;
1828         }
1829
1830         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1831
1832         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1833         lkb->lkb_wait_count--;
1834         if (!lkb->lkb_wait_count)
1835                 list_del_init(&lkb->lkb_wait_reply);
1836         unhold_lkb(lkb);
1837         return 0;
1838 }
1839
1840 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1841 {
1842         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1843         int error;
1844
1845         spin_lock_bh(&ls->ls_waiters_lock);
1846         error = _remove_from_waiters(lkb, mstype, NULL);
1847         spin_unlock_bh(&ls->ls_waiters_lock);
1848         return error;
1849 }
1850
1851 /* Handles situations where we might be processing a "fake" or "local" reply in
1852  * the recovery context which stops any locking activity. Only debugfs might
1853  * change the lockspace waiters but they will held the recovery lock to ensure
1854  * remove_from_waiters_ms() in local case will be the only user manipulating the
1855  * lockspace waiters in recovery context.
1856  */
1857
1858 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1859                                   const struct dlm_message *ms, bool local)
1860 {
1861         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1862         int error;
1863
1864         if (!local)
1865                 spin_lock_bh(&ls->ls_waiters_lock);
1866         else
1867                 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1868                              !dlm_locking_stopped(ls));
1869         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1870         if (!local)
1871                 spin_unlock_bh(&ls->ls_waiters_lock);
1872         return error;
1873 }
1874
1875 /* lkb is master or local copy */
1876
1877 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1878 {
1879         int b, len = r->res_ls->ls_lvblen;
1880
1881         /* b=1 lvb returned to caller
1882            b=0 lvb written to rsb or invalidated
1883            b=-1 do nothing */
1884
1885         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1886
1887         if (b == 1) {
1888                 if (!lkb->lkb_lvbptr)
1889                         return;
1890
1891                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1892                         return;
1893
1894                 if (!r->res_lvbptr)
1895                         return;
1896
1897                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1898                 lkb->lkb_lvbseq = r->res_lvbseq;
1899
1900         } else if (b == 0) {
1901                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1902                         rsb_set_flag(r, RSB_VALNOTVALID);
1903                         return;
1904                 }
1905
1906                 if (!lkb->lkb_lvbptr)
1907                         return;
1908
1909                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1910                         return;
1911
1912                 if (!r->res_lvbptr)
1913                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1914
1915                 if (!r->res_lvbptr)
1916                         return;
1917
1918                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1919                 r->res_lvbseq++;
1920                 lkb->lkb_lvbseq = r->res_lvbseq;
1921                 rsb_clear_flag(r, RSB_VALNOTVALID);
1922         }
1923
1924         if (rsb_flag(r, RSB_VALNOTVALID))
1925                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1926 }
1927
1928 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1929 {
1930         if (lkb->lkb_grmode < DLM_LOCK_PW)
1931                 return;
1932
1933         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1934                 rsb_set_flag(r, RSB_VALNOTVALID);
1935                 return;
1936         }
1937
1938         if (!lkb->lkb_lvbptr)
1939                 return;
1940
1941         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1942                 return;
1943
1944         if (!r->res_lvbptr)
1945                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1946
1947         if (!r->res_lvbptr)
1948                 return;
1949
1950         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1951         r->res_lvbseq++;
1952         rsb_clear_flag(r, RSB_VALNOTVALID);
1953 }
1954
1955 /* lkb is process copy (pc) */
1956
1957 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1958                             const struct dlm_message *ms)
1959 {
1960         int b;
1961
1962         if (!lkb->lkb_lvbptr)
1963                 return;
1964
1965         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1966                 return;
1967
1968         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1969         if (b == 1) {
1970                 int len = receive_extralen(ms);
1971                 if (len > r->res_ls->ls_lvblen)
1972                         len = r->res_ls->ls_lvblen;
1973                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1974                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1975         }
1976 }
1977
1978 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1979    remove_lock -- used for unlock, removes lkb from granted
1980    revert_lock -- used for cancel, moves lkb from convert to granted
1981    grant_lock  -- used for request and convert, adds lkb to granted or
1982                   moves lkb from convert or waiting to granted
1983
1984    Each of these is used for master or local copy lkb's.  There is
1985    also a _pc() variation used to make the corresponding change on
1986    a process copy (pc) lkb. */
1987
1988 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1989 {
1990         del_lkb(r, lkb);
1991         lkb->lkb_grmode = DLM_LOCK_IV;
1992         /* this unhold undoes the original ref from create_lkb()
1993            so this leads to the lkb being freed */
1994         unhold_lkb(lkb);
1995 }
1996
1997 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999         set_lvb_unlock(r, lkb);
2000         _remove_lock(r, lkb);
2001 }
2002
2003 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2004 {
2005         _remove_lock(r, lkb);
2006 }
2007
2008 /* returns: 0 did nothing
2009             1 moved lock to granted
2010            -1 removed lock */
2011
2012 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014         int rv = 0;
2015
2016         lkb->lkb_rqmode = DLM_LOCK_IV;
2017
2018         switch (lkb->lkb_status) {
2019         case DLM_LKSTS_GRANTED:
2020                 break;
2021         case DLM_LKSTS_CONVERT:
2022                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2023                 rv = 1;
2024                 break;
2025         case DLM_LKSTS_WAITING:
2026                 del_lkb(r, lkb);
2027                 lkb->lkb_grmode = DLM_LOCK_IV;
2028                 /* this unhold undoes the original ref from create_lkb()
2029                    so this leads to the lkb being freed */
2030                 unhold_lkb(lkb);
2031                 rv = -1;
2032                 break;
2033         default:
2034                 log_print("invalid status for revert %d", lkb->lkb_status);
2035         }
2036         return rv;
2037 }
2038
2039 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040 {
2041         return revert_lock(r, lkb);
2042 }
2043
2044 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2045 {
2046         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2047                 lkb->lkb_grmode = lkb->lkb_rqmode;
2048                 if (lkb->lkb_status)
2049                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2050                 else
2051                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2052         }
2053
2054         lkb->lkb_rqmode = DLM_LOCK_IV;
2055         lkb->lkb_highbast = 0;
2056 }
2057
2058 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060         set_lvb_lock(r, lkb);
2061         _grant_lock(r, lkb);
2062 }
2063
2064 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2065                           const struct dlm_message *ms)
2066 {
2067         set_lvb_lock_pc(r, lkb, ms);
2068         _grant_lock(r, lkb);
2069 }
2070
2071 /* called by grant_pending_locks() which means an async grant message must
2072    be sent to the requesting node in addition to granting the lock if the
2073    lkb belongs to a remote node. */
2074
2075 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077         grant_lock(r, lkb);
2078         if (is_master_copy(lkb))
2079                 send_grant(r, lkb);
2080         else
2081                 queue_cast(r, lkb, 0);
2082 }
2083
2084 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2085    change the granted/requested modes.  We're munging things accordingly in
2086    the process copy.
2087    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2088    conversion deadlock
2089    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2090    compatible with other granted locks */
2091
2092 static void munge_demoted(struct dlm_lkb *lkb)
2093 {
2094         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2095                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2096                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2097                 return;
2098         }
2099
2100         lkb->lkb_grmode = DLM_LOCK_NL;
2101 }
2102
2103 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2104 {
2105         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2106             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2107                 log_print("munge_altmode %x invalid reply type %d",
2108                           lkb->lkb_id, le32_to_cpu(ms->m_type));
2109                 return;
2110         }
2111
2112         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2113                 lkb->lkb_rqmode = DLM_LOCK_PR;
2114         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2115                 lkb->lkb_rqmode = DLM_LOCK_CW;
2116         else {
2117                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2118                 dlm_print_lkb(lkb);
2119         }
2120 }
2121
2122 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2123 {
2124         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2125                                            lkb_statequeue);
2126         if (lkb->lkb_id == first->lkb_id)
2127                 return 1;
2128
2129         return 0;
2130 }
2131
2132 /* Check if the given lkb conflicts with another lkb on the queue. */
2133
2134 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2135 {
2136         struct dlm_lkb *this;
2137
2138         list_for_each_entry(this, head, lkb_statequeue) {
2139                 if (this == lkb)
2140                         continue;
2141                 if (!modes_compat(this, lkb))
2142                         return 1;
2143         }
2144         return 0;
2145 }
2146
2147 /*
2148  * "A conversion deadlock arises with a pair of lock requests in the converting
2149  * queue for one resource.  The granted mode of each lock blocks the requested
2150  * mode of the other lock."
2151  *
2152  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2153  * convert queue from being granted, then deadlk/demote lkb.
2154  *
2155  * Example:
2156  * Granted Queue: empty
2157  * Convert Queue: NL->EX (first lock)
2158  *                PR->EX (second lock)
2159  *
2160  * The first lock can't be granted because of the granted mode of the second
2161  * lock and the second lock can't be granted because it's not first in the
2162  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2163  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2164  * flag set and return DEMOTED in the lksb flags.
2165  *
2166  * Originally, this function detected conv-deadlk in a more limited scope:
2167  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2168  * - if lkb1 was the first entry in the queue (not just earlier), and was
2169  *   blocked by the granted mode of lkb2, and there was nothing on the
2170  *   granted queue preventing lkb1 from being granted immediately, i.e.
2171  *   lkb2 was the only thing preventing lkb1 from being granted.
2172  *
2173  * That second condition meant we'd only say there was conv-deadlk if
2174  * resolving it (by demotion) would lead to the first lock on the convert
2175  * queue being granted right away.  It allowed conversion deadlocks to exist
2176  * between locks on the convert queue while they couldn't be granted anyway.
2177  *
2178  * Now, we detect and take action on conversion deadlocks immediately when
2179  * they're created, even if they may not be immediately consequential.  If
2180  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2181  * mode that would prevent lkb1's conversion from being granted, we do a
2182  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2183  * I think this means that the lkb_is_ahead condition below should always
2184  * be zero, i.e. there will never be conv-deadlk between two locks that are
2185  * both already on the convert queue.
2186  */
2187
2188 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2189 {
2190         struct dlm_lkb *lkb1;
2191         int lkb_is_ahead = 0;
2192
2193         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2194                 if (lkb1 == lkb2) {
2195                         lkb_is_ahead = 1;
2196                         continue;
2197                 }
2198
2199                 if (!lkb_is_ahead) {
2200                         if (!modes_compat(lkb2, lkb1))
2201                                 return 1;
2202                 } else {
2203                         if (!modes_compat(lkb2, lkb1) &&
2204                             !modes_compat(lkb1, lkb2))
2205                                 return 1;
2206                 }
2207         }
2208         return 0;
2209 }
2210
2211 /*
2212  * Return 1 if the lock can be granted, 0 otherwise.
2213  * Also detect and resolve conversion deadlocks.
2214  *
2215  * lkb is the lock to be granted
2216  *
2217  * now is 1 if the function is being called in the context of the
2218  * immediate request, it is 0 if called later, after the lock has been
2219  * queued.
2220  *
2221  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2222  * after recovery.
2223  *
2224  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2225  */
2226
2227 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2228                            int recover)
2229 {
2230         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2231
2232         /*
2233          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2234          * a new request for a NL mode lock being blocked.
2235          *
2236          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2237          * request, then it would be granted.  In essence, the use of this flag
2238          * tells the Lock Manager to expedite theis request by not considering
2239          * what may be in the CONVERTING or WAITING queues...  As of this
2240          * writing, the EXPEDITE flag can be used only with new requests for NL
2241          * mode locks.  This flag is not valid for conversion requests.
2242          *
2243          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2244          * conversion or used with a non-NL requested mode.  We also know an
2245          * EXPEDITE request is always granted immediately, so now must always
2246          * be 1.  The full condition to grant an expedite request: (now &&
2247          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2248          * therefore be shortened to just checking the flag.
2249          */
2250
2251         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2252                 return 1;
2253
2254         /*
2255          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2256          * added to the remaining conditions.
2257          */
2258
2259         if (queue_conflict(&r->res_grantqueue, lkb))
2260                 return 0;
2261
2262         /*
2263          * 6-3: By default, a conversion request is immediately granted if the
2264          * requested mode is compatible with the modes of all other granted
2265          * locks
2266          */
2267
2268         if (queue_conflict(&r->res_convertqueue, lkb))
2269                 return 0;
2270
2271         /*
2272          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2273          * locks for a recovered rsb, on which lkb's have been rebuilt.
2274          * The lkb's may have been rebuilt on the queues in a different
2275          * order than they were in on the previous master.  So, granting
2276          * queued conversions in order after recovery doesn't make sense
2277          * since the order hasn't been preserved anyway.  The new order
2278          * could also have created a new "in place" conversion deadlock.
2279          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2280          * After recovery, there would be no granted locks, and possibly
2281          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2282          * recovery, grant conversions without considering order.
2283          */
2284
2285         if (conv && recover)
2286                 return 1;
2287
2288         /*
2289          * 6-5: But the default algorithm for deciding whether to grant or
2290          * queue conversion requests does not by itself guarantee that such
2291          * requests are serviced on a "first come first serve" basis.  This, in
2292          * turn, can lead to a phenomenon known as "indefinate postponement".
2293          *
2294          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2295          * the system service employed to request a lock conversion.  This flag
2296          * forces certain conversion requests to be queued, even if they are
2297          * compatible with the granted modes of other locks on the same
2298          * resource.  Thus, the use of this flag results in conversion requests
2299          * being ordered on a "first come first servce" basis.
2300          *
2301          * DCT: This condition is all about new conversions being able to occur
2302          * "in place" while the lock remains on the granted queue (assuming
2303          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2304          * doesn't _have_ to go onto the convert queue where it's processed in
2305          * order.  The "now" variable is necessary to distinguish converts
2306          * being received and processed for the first time now, because once a
2307          * convert is moved to the conversion queue the condition below applies
2308          * requiring fifo granting.
2309          */
2310
2311         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2312                 return 1;
2313
2314         /*
2315          * Even if the convert is compat with all granted locks,
2316          * QUECVT forces it behind other locks on the convert queue.
2317          */
2318
2319         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2320                 if (list_empty(&r->res_convertqueue))
2321                         return 1;
2322                 else
2323                         return 0;
2324         }
2325
2326         /*
2327          * The NOORDER flag is set to avoid the standard vms rules on grant
2328          * order.
2329          */
2330
2331         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2332                 return 1;
2333
2334         /*
2335          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2336          * granted until all other conversion requests ahead of it are granted
2337          * and/or canceled.
2338          */
2339
2340         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2341                 return 1;
2342
2343         /*
2344          * 6-4: By default, a new request is immediately granted only if all
2345          * three of the following conditions are satisfied when the request is
2346          * issued:
2347          * - The queue of ungranted conversion requests for the resource is
2348          *   empty.
2349          * - The queue of ungranted new requests for the resource is empty.
2350          * - The mode of the new request is compatible with the most
2351          *   restrictive mode of all granted locks on the resource.
2352          */
2353
2354         if (now && !conv && list_empty(&r->res_convertqueue) &&
2355             list_empty(&r->res_waitqueue))
2356                 return 1;
2357
2358         /*
2359          * 6-4: Once a lock request is in the queue of ungranted new requests,
2360          * it cannot be granted until the queue of ungranted conversion
2361          * requests is empty, all ungranted new requests ahead of it are
2362          * granted and/or canceled, and it is compatible with the granted mode
2363          * of the most restrictive lock granted on the resource.
2364          */
2365
2366         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2367             first_in_list(lkb, &r->res_waitqueue))
2368                 return 1;
2369
2370         return 0;
2371 }
2372
2373 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2374                           int recover, int *err)
2375 {
2376         int rv;
2377         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2378         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2379
2380         if (err)
2381                 *err = 0;
2382
2383         rv = _can_be_granted(r, lkb, now, recover);
2384         if (rv)
2385                 goto out;
2386
2387         /*
2388          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2389          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2390          * cancels one of the locks.
2391          */
2392
2393         if (is_convert && can_be_queued(lkb) &&
2394             conversion_deadlock_detect(r, lkb)) {
2395                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2396                         lkb->lkb_grmode = DLM_LOCK_NL;
2397                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2398                 } else if (err) {
2399                         *err = -EDEADLK;
2400                 } else {
2401                         log_print("can_be_granted deadlock %x now %d",
2402                                   lkb->lkb_id, now);
2403                         dlm_dump_rsb(r);
2404                 }
2405                 goto out;
2406         }
2407
2408         /*
2409          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2410          * to grant a request in a mode other than the normal rqmode.  It's a
2411          * simple way to provide a big optimization to applications that can
2412          * use them.
2413          */
2414
2415         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2416                 alt = DLM_LOCK_PR;
2417         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2418                 alt = DLM_LOCK_CW;
2419
2420         if (alt) {
2421                 lkb->lkb_rqmode = alt;
2422                 rv = _can_be_granted(r, lkb, now, 0);
2423                 if (rv)
2424                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2425                 else
2426                         lkb->lkb_rqmode = rqmode;
2427         }
2428  out:
2429         return rv;
2430 }
2431
2432 /* Returns the highest requested mode of all blocked conversions; sets
2433    cw if there's a blocked conversion to DLM_LOCK_CW. */
2434
2435 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2436                                  unsigned int *count)
2437 {
2438         struct dlm_lkb *lkb, *s;
2439         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2440         int hi, demoted, quit, grant_restart, demote_restart;
2441         int deadlk;
2442
2443         quit = 0;
2444  restart:
2445         grant_restart = 0;
2446         demote_restart = 0;
2447         hi = DLM_LOCK_IV;
2448
2449         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2450                 demoted = is_demoted(lkb);
2451                 deadlk = 0;
2452
2453                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2454                         grant_lock_pending(r, lkb);
2455                         grant_restart = 1;
2456                         if (count)
2457                                 (*count)++;
2458                         continue;
2459                 }
2460
2461                 if (!demoted && is_demoted(lkb)) {
2462                         log_print("WARN: pending demoted %x node %d %s",
2463                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2464                         demote_restart = 1;
2465                         continue;
2466                 }
2467
2468                 if (deadlk) {
2469                         /*
2470                          * If DLM_LKB_NODLKWT flag is set and conversion
2471                          * deadlock is detected, we request blocking AST and
2472                          * down (or cancel) conversion.
2473                          */
2474                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2475                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2476                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2477                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2478                                 }
2479                         } else {
2480                                 log_print("WARN: pending deadlock %x node %d %s",
2481                                           lkb->lkb_id, lkb->lkb_nodeid,
2482                                           r->res_name);
2483                                 dlm_dump_rsb(r);
2484                         }
2485                         continue;
2486                 }
2487
2488                 hi = max_t(int, lkb->lkb_rqmode, hi);
2489
2490                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2491                         *cw = 1;
2492         }
2493
2494         if (grant_restart)
2495                 goto restart;
2496         if (demote_restart && !quit) {
2497                 quit = 1;
2498                 goto restart;
2499         }
2500
2501         return max_t(int, high, hi);
2502 }
2503
2504 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2505                               unsigned int *count)
2506 {
2507         struct dlm_lkb *lkb, *s;
2508
2509         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2510                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2511                         grant_lock_pending(r, lkb);
2512                         if (count)
2513                                 (*count)++;
2514                 } else {
2515                         high = max_t(int, lkb->lkb_rqmode, high);
2516                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2517                                 *cw = 1;
2518                 }
2519         }
2520
2521         return high;
2522 }
2523
2524 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2525    on either the convert or waiting queue.
2526    high is the largest rqmode of all locks blocked on the convert or
2527    waiting queue. */
2528
2529 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2530 {
2531         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2532                 if (gr->lkb_highbast < DLM_LOCK_EX)
2533                         return 1;
2534                 return 0;
2535         }
2536
2537         if (gr->lkb_highbast < high &&
2538             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2539                 return 1;
2540         return 0;
2541 }
2542
2543 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2544 {
2545         struct dlm_lkb *lkb, *s;
2546         int high = DLM_LOCK_IV;
2547         int cw = 0;
2548
2549         if (!is_master(r)) {
2550                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2551                 dlm_dump_rsb(r);
2552                 return;
2553         }
2554
2555         high = grant_pending_convert(r, high, &cw, count);
2556         high = grant_pending_wait(r, high, &cw, count);
2557
2558         if (high == DLM_LOCK_IV)
2559                 return;
2560
2561         /*
2562          * If there are locks left on the wait/convert queue then send blocking
2563          * ASTs to granted locks based on the largest requested mode (high)
2564          * found above.
2565          */
2566
2567         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2568                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2569                         if (cw && high == DLM_LOCK_PR &&
2570                             lkb->lkb_grmode == DLM_LOCK_PR)
2571                                 queue_bast(r, lkb, DLM_LOCK_CW);
2572                         else
2573                                 queue_bast(r, lkb, high);
2574                         lkb->lkb_highbast = high;
2575                 }
2576         }
2577 }
2578
2579 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2580 {
2581         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2582             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2583                 if (gr->lkb_highbast < DLM_LOCK_EX)
2584                         return 1;
2585                 return 0;
2586         }
2587
2588         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2589                 return 1;
2590         return 0;
2591 }
2592
2593 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2594                             struct dlm_lkb *lkb)
2595 {
2596         struct dlm_lkb *gr;
2597
2598         list_for_each_entry(gr, head, lkb_statequeue) {
2599                 /* skip self when sending basts to convertqueue */
2600                 if (gr == lkb)
2601                         continue;
2602                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2603                         queue_bast(r, gr, lkb->lkb_rqmode);
2604                         gr->lkb_highbast = lkb->lkb_rqmode;
2605                 }
2606         }
2607 }
2608
2609 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2610 {
2611         send_bast_queue(r, &r->res_grantqueue, lkb);
2612 }
2613
2614 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2615 {
2616         send_bast_queue(r, &r->res_grantqueue, lkb);
2617         send_bast_queue(r, &r->res_convertqueue, lkb);
2618 }
2619
2620 /* set_master(r, lkb) -- set the master nodeid of a resource
2621
2622    The purpose of this function is to set the nodeid field in the given
2623    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2624    known, it can just be copied to the lkb and the function will return
2625    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2626    before it can be copied to the lkb.
2627
2628    When the rsb nodeid is being looked up remotely, the initial lkb
2629    causing the lookup is kept on the ls_waiters list waiting for the
2630    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2631    on the rsb's res_lookup list until the master is verified.
2632
2633    Return values:
2634    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2635    1: the rsb master is not available and the lkb has been placed on
2636       a wait queue
2637 */
2638
2639 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641         int our_nodeid = dlm_our_nodeid();
2642
2643         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2644                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2645                 r->res_first_lkid = lkb->lkb_id;
2646                 lkb->lkb_nodeid = r->res_nodeid;
2647                 return 0;
2648         }
2649
2650         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2651                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2652                 return 1;
2653         }
2654
2655         if (r->res_master_nodeid == our_nodeid) {
2656                 lkb->lkb_nodeid = 0;
2657                 return 0;
2658         }
2659
2660         if (r->res_master_nodeid) {
2661                 lkb->lkb_nodeid = r->res_master_nodeid;
2662                 return 0;
2663         }
2664
2665         if (dlm_dir_nodeid(r) == our_nodeid) {
2666                 /* This is a somewhat unusual case; find_rsb will usually
2667                    have set res_master_nodeid when dir nodeid is local, but
2668                    there are cases where we become the dir node after we've
2669                    past find_rsb and go through _request_lock again.
2670                    confirm_master() or process_lookup_list() needs to be
2671                    called after this. */
2672                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2673                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2674                           r->res_name);
2675                 r->res_master_nodeid = our_nodeid;
2676                 r->res_nodeid = 0;
2677                 lkb->lkb_nodeid = 0;
2678                 return 0;
2679         }
2680
2681         r->res_first_lkid = lkb->lkb_id;
2682         send_lookup(r, lkb);
2683         return 1;
2684 }
2685
2686 static void process_lookup_list(struct dlm_rsb *r)
2687 {
2688         struct dlm_lkb *lkb, *safe;
2689
2690         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2691                 list_del_init(&lkb->lkb_rsb_lookup);
2692                 _request_lock(r, lkb);
2693         }
2694 }
2695
2696 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2697
2698 static void confirm_master(struct dlm_rsb *r, int error)
2699 {
2700         struct dlm_lkb *lkb;
2701
2702         if (!r->res_first_lkid)
2703                 return;
2704
2705         switch (error) {
2706         case 0:
2707         case -EINPROGRESS:
2708                 r->res_first_lkid = 0;
2709                 process_lookup_list(r);
2710                 break;
2711
2712         case -EAGAIN:
2713         case -EBADR:
2714         case -ENOTBLK:
2715                 /* the remote request failed and won't be retried (it was
2716                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2717                    lkb the first_lkid */
2718
2719                 r->res_first_lkid = 0;
2720
2721                 if (!list_empty(&r->res_lookup)) {
2722                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2723                                          lkb_rsb_lookup);
2724                         list_del_init(&lkb->lkb_rsb_lookup);
2725                         r->res_first_lkid = lkb->lkb_id;
2726                         _request_lock(r, lkb);
2727                 }
2728                 break;
2729
2730         default:
2731                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2732         }
2733 }
2734
2735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2736                          int namelen, void (*ast)(void *astparam),
2737                          void *astparam,
2738                          void (*bast)(void *astparam, int mode),
2739                          struct dlm_args *args)
2740 {
2741         int rv = -EINVAL;
2742
2743         /* check for invalid arg usage */
2744
2745         if (mode < 0 || mode > DLM_LOCK_EX)
2746                 goto out;
2747
2748         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2749                 goto out;
2750
2751         if (flags & DLM_LKF_CANCEL)
2752                 goto out;
2753
2754         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2755                 goto out;
2756
2757         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2758                 goto out;
2759
2760         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2761                 goto out;
2762
2763         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2764                 goto out;
2765
2766         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2767                 goto out;
2768
2769         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2770                 goto out;
2771
2772         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2773                 goto out;
2774
2775         if (!ast || !lksb)
2776                 goto out;
2777
2778         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2779                 goto out;
2780
2781         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2782                 goto out;
2783
2784         /* these args will be copied to the lkb in validate_lock_args,
2785            it cannot be done now because when converting locks, fields in
2786            an active lkb cannot be modified before locking the rsb */
2787
2788         args->flags = flags;
2789         args->astfn = ast;
2790         args->astparam = astparam;
2791         args->bastfn = bast;
2792         args->mode = mode;
2793         args->lksb = lksb;
2794         rv = 0;
2795  out:
2796         return rv;
2797 }
2798
2799 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2800 {
2801         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2802                       DLM_LKF_FORCEUNLOCK))
2803                 return -EINVAL;
2804
2805         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2806                 return -EINVAL;
2807
2808         args->flags = flags;
2809         args->astparam = astarg;
2810         return 0;
2811 }
2812
2813 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2814                               struct dlm_args *args)
2815 {
2816         int rv = -EBUSY;
2817
2818         if (args->flags & DLM_LKF_CONVERT) {
2819                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2820                         goto out;
2821
2822                 /* lock not allowed if there's any op in progress */
2823                 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2824                         goto out;
2825
2826                 if (is_overlap(lkb))
2827                         goto out;
2828
2829                 rv = -EINVAL;
2830                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2831                         goto out;
2832
2833                 if (args->flags & DLM_LKF_QUECVT &&
2834                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2835                         goto out;
2836         }
2837
2838         lkb->lkb_exflags = args->flags;
2839         dlm_set_sbflags_val(lkb, 0);
2840         lkb->lkb_astfn = args->astfn;
2841         lkb->lkb_astparam = args->astparam;
2842         lkb->lkb_bastfn = args->bastfn;
2843         lkb->lkb_rqmode = args->mode;
2844         lkb->lkb_lksb = args->lksb;
2845         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2846         lkb->lkb_ownpid = (int) current->pid;
2847         rv = 0;
2848  out:
2849         switch (rv) {
2850         case 0:
2851                 break;
2852         case -EINVAL:
2853                 /* annoy the user because dlm usage is wrong */
2854                 WARN_ON(1);
2855                 log_error(ls, "%s %d %x %x %x %d %d", __func__,
2856                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2857                           lkb->lkb_status, lkb->lkb_wait_type);
2858                 break;
2859         default:
2860                 log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2861                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2862                           lkb->lkb_status, lkb->lkb_wait_type);
2863                 break;
2864         }
2865
2866         return rv;
2867 }
2868
2869 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2870    for success */
2871
2872 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2873    because there may be a lookup in progress and it's valid to do
2874    cancel/unlockf on it */
2875
2876 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2877 {
2878         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2879         int rv = -EBUSY;
2880
2881         /* normal unlock not allowed if there's any op in progress */
2882         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2883             (lkb->lkb_wait_type || lkb->lkb_wait_count))
2884                 goto out;
2885
2886         /* an lkb may be waiting for an rsb lookup to complete where the
2887            lookup was initiated by another lock */
2888
2889         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2890                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2891                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2892                         list_del_init(&lkb->lkb_rsb_lookup);
2893                         queue_cast(lkb->lkb_resource, lkb,
2894                                    args->flags & DLM_LKF_CANCEL ?
2895                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2896                         unhold_lkb(lkb); /* undoes create_lkb() */
2897                 }
2898                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2899                 goto out;
2900         }
2901
2902         rv = -EINVAL;
2903         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2904                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2905                 dlm_print_lkb(lkb);
2906                 goto out;
2907         }
2908
2909         /* an lkb may still exist even though the lock is EOL'ed due to a
2910          * cancel, unlock or failed noqueue request; an app can't use these
2911          * locks; return same error as if the lkid had not been found at all
2912          */
2913
2914         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2915                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2916                 rv = -ENOENT;
2917                 goto out;
2918         }
2919
2920         if (is_overlap_unlock(lkb))
2921                 goto out;
2922
2923         /* cancel not allowed with another cancel/unlock in progress */
2924
2925         if (args->flags & DLM_LKF_CANCEL) {
2926                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2927                         goto out;
2928
2929                 if (is_overlap_cancel(lkb))
2930                         goto out;
2931
2932                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2933                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2934                         rv = -EBUSY;
2935                         goto out;
2936                 }
2937
2938                 /* there's nothing to cancel */
2939                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2940                     !lkb->lkb_wait_type) {
2941                         rv = -EBUSY;
2942                         goto out;
2943                 }
2944
2945                 switch (lkb->lkb_wait_type) {
2946                 case DLM_MSG_LOOKUP:
2947                 case DLM_MSG_REQUEST:
2948                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2949                         rv = -EBUSY;
2950                         goto out;
2951                 case DLM_MSG_UNLOCK:
2952                 case DLM_MSG_CANCEL:
2953                         goto out;
2954                 }
2955                 /* add_to_waiters() will set OVERLAP_CANCEL */
2956                 goto out_ok;
2957         }
2958
2959         /* do we need to allow a force-unlock if there's a normal unlock
2960            already in progress?  in what conditions could the normal unlock
2961            fail such that we'd want to send a force-unlock to be sure? */
2962
2963         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2964                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2965                         goto out;
2966
2967                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2968                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2969                         rv = -EBUSY;
2970                         goto out;
2971                 }
2972
2973                 switch (lkb->lkb_wait_type) {
2974                 case DLM_MSG_LOOKUP:
2975                 case DLM_MSG_REQUEST:
2976                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2977                         rv = -EBUSY;
2978                         goto out;
2979                 case DLM_MSG_UNLOCK:
2980                         goto out;
2981                 }
2982                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2983         }
2984
2985  out_ok:
2986         /* an overlapping op shouldn't blow away exflags from other op */
2987         lkb->lkb_exflags |= args->flags;
2988         dlm_set_sbflags_val(lkb, 0);
2989         lkb->lkb_astparam = args->astparam;
2990         rv = 0;
2991  out:
2992         switch (rv) {
2993         case 0:
2994                 break;
2995         case -EINVAL:
2996                 /* annoy the user because dlm usage is wrong */
2997                 WARN_ON(1);
2998                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2999                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3000                           args->flags, lkb->lkb_wait_type,
3001                           lkb->lkb_resource->res_name);
3002                 break;
3003         default:
3004                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3005                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3006                           args->flags, lkb->lkb_wait_type,
3007                           lkb->lkb_resource->res_name);
3008                 break;
3009         }
3010
3011         return rv;
3012 }
3013
3014 /*
3015  * Four stage 4 varieties:
3016  * do_request(), do_convert(), do_unlock(), do_cancel()
3017  * These are called on the master node for the given lock and
3018  * from the central locking logic.
3019  */
3020
3021 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 {
3023         int error = 0;
3024
3025         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3026                 grant_lock(r, lkb);
3027                 queue_cast(r, lkb, 0);
3028                 goto out;
3029         }
3030
3031         if (can_be_queued(lkb)) {
3032                 error = -EINPROGRESS;
3033                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3034                 goto out;
3035         }
3036
3037         error = -EAGAIN;
3038         queue_cast(r, lkb, -EAGAIN);
3039  out:
3040         return error;
3041 }
3042
3043 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3044                                int error)
3045 {
3046         switch (error) {
3047         case -EAGAIN:
3048                 if (force_blocking_asts(lkb))
3049                         send_blocking_asts_all(r, lkb);
3050                 break;
3051         case -EINPROGRESS:
3052                 send_blocking_asts(r, lkb);
3053                 break;
3054         }
3055 }
3056
3057 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3058 {
3059         int error = 0;
3060         int deadlk = 0;
3061
3062         /* changing an existing lock may allow others to be granted */
3063
3064         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3065                 grant_lock(r, lkb);
3066                 queue_cast(r, lkb, 0);
3067                 goto out;
3068         }
3069
3070         /* can_be_granted() detected that this lock would block in a conversion
3071            deadlock, so we leave it on the granted queue and return EDEADLK in
3072            the ast for the convert. */
3073
3074         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3075                 /* it's left on the granted queue */
3076                 revert_lock(r, lkb);
3077                 queue_cast(r, lkb, -EDEADLK);
3078                 error = -EDEADLK;
3079                 goto out;
3080         }
3081
3082         /* is_demoted() means the can_be_granted() above set the grmode
3083            to NL, and left us on the granted queue.  This auto-demotion
3084            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3085            now grantable.  We have to try to grant other converting locks
3086            before we try again to grant this one. */
3087
3088         if (is_demoted(lkb)) {
3089                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3090                 if (_can_be_granted(r, lkb, 1, 0)) {
3091                         grant_lock(r, lkb);
3092                         queue_cast(r, lkb, 0);
3093                         goto out;
3094                 }
3095                 /* else fall through and move to convert queue */
3096         }
3097
3098         if (can_be_queued(lkb)) {
3099                 error = -EINPROGRESS;
3100                 del_lkb(r, lkb);
3101                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3102                 goto out;
3103         }
3104
3105         error = -EAGAIN;
3106         queue_cast(r, lkb, -EAGAIN);
3107  out:
3108         return error;
3109 }
3110
3111 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3112                                int error)
3113 {
3114         switch (error) {
3115         case 0:
3116                 grant_pending_locks(r, NULL);
3117                 /* grant_pending_locks also sends basts */
3118                 break;
3119         case -EAGAIN:
3120                 if (force_blocking_asts(lkb))
3121                         send_blocking_asts_all(r, lkb);
3122                 break;
3123         case -EINPROGRESS:
3124                 send_blocking_asts(r, lkb);
3125                 break;
3126         }
3127 }
3128
3129 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3130 {
3131         remove_lock(r, lkb);
3132         queue_cast(r, lkb, -DLM_EUNLOCK);
3133         return -DLM_EUNLOCK;
3134 }
3135
3136 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3137                               int error)
3138 {
3139         grant_pending_locks(r, NULL);
3140 }
3141
3142 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3143
3144 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3145 {
3146         int error;
3147
3148         error = revert_lock(r, lkb);
3149         if (error) {
3150                 queue_cast(r, lkb, -DLM_ECANCEL);
3151                 return -DLM_ECANCEL;
3152         }
3153         return 0;
3154 }
3155
3156 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3157                               int error)
3158 {
3159         if (error)
3160                 grant_pending_locks(r, NULL);
3161 }
3162
3163 /*
3164  * Four stage 3 varieties:
3165  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3166  */
3167
3168 /* add a new lkb to a possibly new rsb, called by requesting process */
3169
3170 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3171 {
3172         int error;
3173
3174         /* set_master: sets lkb nodeid from r */
3175
3176         error = set_master(r, lkb);
3177         if (error < 0)
3178                 goto out;
3179         if (error) {
3180                 error = 0;
3181                 goto out;
3182         }
3183
3184         if (is_remote(r)) {
3185                 /* receive_request() calls do_request() on remote node */
3186                 error = send_request(r, lkb);
3187         } else {
3188                 error = do_request(r, lkb);
3189                 /* for remote locks the request_reply is sent
3190                    between do_request and do_request_effects */
3191                 do_request_effects(r, lkb, error);
3192         }
3193  out:
3194         return error;
3195 }
3196
3197 /* change some property of an existing lkb, e.g. mode */
3198
3199 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3200 {
3201         int error;
3202
3203         if (is_remote(r)) {
3204                 /* receive_convert() calls do_convert() on remote node */
3205                 error = send_convert(r, lkb);
3206         } else {
3207                 error = do_convert(r, lkb);
3208                 /* for remote locks the convert_reply is sent
3209                    between do_convert and do_convert_effects */
3210                 do_convert_effects(r, lkb, error);
3211         }
3212
3213         return error;
3214 }
3215
3216 /* remove an existing lkb from the granted queue */
3217
3218 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3219 {
3220         int error;
3221
3222         if (is_remote(r)) {
3223                 /* receive_unlock() calls do_unlock() on remote node */
3224                 error = send_unlock(r, lkb);
3225         } else {
3226                 error = do_unlock(r, lkb);
3227                 /* for remote locks the unlock_reply is sent
3228                    between do_unlock and do_unlock_effects */
3229                 do_unlock_effects(r, lkb, error);
3230         }
3231
3232         return error;
3233 }
3234
3235 /* remove an existing lkb from the convert or wait queue */
3236
3237 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3238 {
3239         int error;
3240
3241         if (is_remote(r)) {
3242                 /* receive_cancel() calls do_cancel() on remote node */
3243                 error = send_cancel(r, lkb);
3244         } else {
3245                 error = do_cancel(r, lkb);
3246                 /* for remote locks the cancel_reply is sent
3247                    between do_cancel and do_cancel_effects */
3248                 do_cancel_effects(r, lkb, error);
3249         }
3250
3251         return error;
3252 }
3253
3254 /*
3255  * Four stage 2 varieties:
3256  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3257  */
3258
3259 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3260                         const void *name, int len,
3261                         struct dlm_args *args)
3262 {
3263         struct dlm_rsb *r;
3264         int error;
3265
3266         error = validate_lock_args(ls, lkb, args);
3267         if (error)
3268                 return error;
3269
3270         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3271         if (error)
3272                 return error;
3273
3274         lock_rsb(r);
3275
3276         attach_lkb(r, lkb);
3277         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3278
3279         error = _request_lock(r, lkb);
3280
3281         unlock_rsb(r);
3282         put_rsb(r);
3283         return error;
3284 }
3285
3286 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3287                         struct dlm_args *args)
3288 {
3289         struct dlm_rsb *r;
3290         int error;
3291
3292         r = lkb->lkb_resource;
3293
3294         hold_rsb(r);
3295         lock_rsb(r);
3296
3297         error = validate_lock_args(ls, lkb, args);
3298         if (error)
3299                 goto out;
3300
3301         error = _convert_lock(r, lkb);
3302  out:
3303         unlock_rsb(r);
3304         put_rsb(r);
3305         return error;
3306 }
3307
3308 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3309                        struct dlm_args *args)
3310 {
3311         struct dlm_rsb *r;
3312         int error;
3313
3314         r = lkb->lkb_resource;
3315
3316         hold_rsb(r);
3317         lock_rsb(r);
3318
3319         error = validate_unlock_args(lkb, args);
3320         if (error)
3321                 goto out;
3322
3323         error = _unlock_lock(r, lkb);
3324  out:
3325         unlock_rsb(r);
3326         put_rsb(r);
3327         return error;
3328 }
3329
3330 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3331                        struct dlm_args *args)
3332 {
3333         struct dlm_rsb *r;
3334         int error;
3335
3336         r = lkb->lkb_resource;
3337
3338         hold_rsb(r);
3339         lock_rsb(r);
3340
3341         error = validate_unlock_args(lkb, args);
3342         if (error)
3343                 goto out;
3344
3345         error = _cancel_lock(r, lkb);
3346  out:
3347         unlock_rsb(r);
3348         put_rsb(r);
3349         return error;
3350 }
3351
3352 /*
3353  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3354  */
3355
3356 int dlm_lock(dlm_lockspace_t *lockspace,
3357              int mode,
3358              struct dlm_lksb *lksb,
3359              uint32_t flags,
3360              const void *name,
3361              unsigned int namelen,
3362              uint32_t parent_lkid,
3363              void (*ast) (void *astarg),
3364              void *astarg,
3365              void (*bast) (void *astarg, int mode))
3366 {
3367         struct dlm_ls *ls;
3368         struct dlm_lkb *lkb;
3369         struct dlm_args args;
3370         int error, convert = flags & DLM_LKF_CONVERT;
3371
3372         ls = dlm_find_lockspace_local(lockspace);
3373         if (!ls)
3374                 return -EINVAL;
3375
3376         dlm_lock_recovery(ls);
3377
3378         if (convert)
3379                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3380         else
3381                 error = create_lkb(ls, &lkb);
3382
3383         if (error)
3384                 goto out;
3385
3386         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3387
3388         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3389                               &args);
3390         if (error)
3391                 goto out_put;
3392
3393         if (convert)
3394                 error = convert_lock(ls, lkb, &args);
3395         else
3396                 error = request_lock(ls, lkb, name, namelen, &args);
3397
3398         if (error == -EINPROGRESS)
3399                 error = 0;
3400  out_put:
3401         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3402
3403         if (convert || error)
3404                 __put_lkb(ls, lkb);
3405         if (error == -EAGAIN || error == -EDEADLK)
3406                 error = 0;
3407  out:
3408         dlm_unlock_recovery(ls);
3409         dlm_put_lockspace(ls);
3410         return error;
3411 }
3412
3413 int dlm_unlock(dlm_lockspace_t *lockspace,
3414                uint32_t lkid,
3415                uint32_t flags,
3416                struct dlm_lksb *lksb,
3417                void *astarg)
3418 {
3419         struct dlm_ls *ls;
3420         struct dlm_lkb *lkb;
3421         struct dlm_args args;
3422         int error;
3423
3424         ls = dlm_find_lockspace_local(lockspace);
3425         if (!ls)
3426                 return -EINVAL;
3427
3428         dlm_lock_recovery(ls);
3429
3430         error = find_lkb(ls, lkid, &lkb);
3431         if (error)
3432                 goto out;
3433
3434         trace_dlm_unlock_start(ls, lkb, flags);
3435
3436         error = set_unlock_args(flags, astarg, &args);
3437         if (error)
3438                 goto out_put;
3439
3440         if (flags & DLM_LKF_CANCEL)
3441                 error = cancel_lock(ls, lkb, &args);
3442         else
3443                 error = unlock_lock(ls, lkb, &args);
3444
3445         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3446                 error = 0;
3447         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3448                 error = 0;
3449  out_put:
3450         trace_dlm_unlock_end(ls, lkb, flags, error);
3451
3452         dlm_put_lkb(lkb);
3453  out:
3454         dlm_unlock_recovery(ls);
3455         dlm_put_lockspace(ls);
3456         return error;
3457 }
3458
3459 /*
3460  * send/receive routines for remote operations and replies
3461  *
3462  * send_args
3463  * send_common
3464  * send_request                 receive_request
3465  * send_convert                 receive_convert
3466  * send_unlock                  receive_unlock
3467  * send_cancel                  receive_cancel
3468  * send_grant                   receive_grant
3469  * send_bast                    receive_bast
3470  * send_lookup                  receive_lookup
3471  * send_remove                  receive_remove
3472  *
3473  *                              send_common_reply
3474  * receive_request_reply        send_request_reply
3475  * receive_convert_reply        send_convert_reply
3476  * receive_unlock_reply         send_unlock_reply
3477  * receive_cancel_reply         send_cancel_reply
3478  * receive_lookup_reply         send_lookup_reply
3479  */
3480
3481 static int _create_message(struct dlm_ls *ls, int mb_len,
3482                            int to_nodeid, int mstype,
3483                            struct dlm_message **ms_ret,
3484                            struct dlm_mhandle **mh_ret)
3485 {
3486         struct dlm_message *ms;
3487         struct dlm_mhandle *mh;
3488         char *mb;
3489
3490         /* get_buffer gives us a message handle (mh) that we need to
3491            pass into midcomms_commit and a message buffer (mb) that we
3492            write our data into */
3493
3494         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3495         if (!mh)
3496                 return -ENOBUFS;
3497
3498         ms = (struct dlm_message *) mb;
3499
3500         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3501         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3502         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3503         ms->m_header.h_length = cpu_to_le16(mb_len);
3504         ms->m_header.h_cmd = DLM_MSG;
3505
3506         ms->m_type = cpu_to_le32(mstype);
3507
3508         *mh_ret = mh;
3509         *ms_ret = ms;
3510         return 0;
3511 }
3512
3513 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3514                           int to_nodeid, int mstype,
3515                           struct dlm_message **ms_ret,
3516                           struct dlm_mhandle **mh_ret)
3517 {
3518         int mb_len = sizeof(struct dlm_message);
3519
3520         switch (mstype) {
3521         case DLM_MSG_REQUEST:
3522         case DLM_MSG_LOOKUP:
3523         case DLM_MSG_REMOVE:
3524                 mb_len += r->res_length;
3525                 break;
3526         case DLM_MSG_CONVERT:
3527         case DLM_MSG_UNLOCK:
3528         case DLM_MSG_REQUEST_REPLY:
3529         case DLM_MSG_CONVERT_REPLY:
3530         case DLM_MSG_GRANT:
3531                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3532                         mb_len += r->res_ls->ls_lvblen;
3533                 break;
3534         }
3535
3536         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3537                                ms_ret, mh_ret);
3538 }
3539
3540 /* further lowcomms enhancements or alternate implementations may make
3541    the return value from this function useful at some point */
3542
3543 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3544                         const void *name, int namelen)
3545 {
3546         dlm_midcomms_commit_mhandle(mh, name, namelen);
3547         return 0;
3548 }
3549
3550 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551                       struct dlm_message *ms)
3552 {
3553         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3554         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3555         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3556         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3557         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3558         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3559         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3560         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3561         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3562         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3563         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3564         ms->m_hash     = cpu_to_le32(r->res_hash);
3565
3566         /* m_result and m_bastmode are set from function args,
3567            not from lkb fields */
3568
3569         if (lkb->lkb_bastfn)
3570                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3571         if (lkb->lkb_astfn)
3572                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3573
3574         /* compare with switch in create_message; send_remove() doesn't
3575            use send_args() */
3576
3577         switch (ms->m_type) {
3578         case cpu_to_le32(DLM_MSG_REQUEST):
3579         case cpu_to_le32(DLM_MSG_LOOKUP):
3580                 memcpy(ms->m_extra, r->res_name, r->res_length);
3581                 break;
3582         case cpu_to_le32(DLM_MSG_CONVERT):
3583         case cpu_to_le32(DLM_MSG_UNLOCK):
3584         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3585         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3586         case cpu_to_le32(DLM_MSG_GRANT):
3587                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3588                         break;
3589                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3590                 break;
3591         }
3592 }
3593
3594 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3595 {
3596         struct dlm_message *ms;
3597         struct dlm_mhandle *mh;
3598         int to_nodeid, error;
3599
3600         to_nodeid = r->res_nodeid;
3601
3602         add_to_waiters(lkb, mstype, to_nodeid);
3603         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3604         if (error)
3605                 goto fail;
3606
3607         send_args(r, lkb, ms);
3608
3609         error = send_message(mh, ms, r->res_name, r->res_length);
3610         if (error)
3611                 goto fail;
3612         return 0;
3613
3614  fail:
3615         remove_from_waiters(lkb, msg_reply_type(mstype));
3616         return error;
3617 }
3618
3619 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3620 {
3621         return send_common(r, lkb, DLM_MSG_REQUEST);
3622 }
3623
3624 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3625 {
3626         int error;
3627
3628         error = send_common(r, lkb, DLM_MSG_CONVERT);
3629
3630         /* down conversions go without a reply from the master */
3631         if (!error && down_conversion(lkb)) {
3632                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3633                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3634                 r->res_ls->ls_local_ms.m_result = 0;
3635                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3636         }
3637
3638         return error;
3639 }
3640
3641 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3642    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3643    that the master is still correct. */
3644
3645 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 {
3647         return send_common(r, lkb, DLM_MSG_UNLOCK);
3648 }
3649
3650 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3651 {
3652         return send_common(r, lkb, DLM_MSG_CANCEL);
3653 }
3654
3655 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3656 {
3657         struct dlm_message *ms;
3658         struct dlm_mhandle *mh;
3659         int to_nodeid, error;
3660
3661         to_nodeid = lkb->lkb_nodeid;
3662
3663         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3664         if (error)
3665                 goto out;
3666
3667         send_args(r, lkb, ms);
3668
3669         ms->m_result = 0;
3670
3671         error = send_message(mh, ms, r->res_name, r->res_length);
3672  out:
3673         return error;
3674 }
3675
3676 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3677 {
3678         struct dlm_message *ms;
3679         struct dlm_mhandle *mh;
3680         int to_nodeid, error;
3681
3682         to_nodeid = lkb->lkb_nodeid;
3683
3684         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3685         if (error)
3686                 goto out;
3687
3688         send_args(r, lkb, ms);
3689
3690         ms->m_bastmode = cpu_to_le32(mode);
3691
3692         error = send_message(mh, ms, r->res_name, r->res_length);
3693  out:
3694         return error;
3695 }
3696
3697 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3698 {
3699         struct dlm_message *ms;
3700         struct dlm_mhandle *mh;
3701         int to_nodeid, error;
3702
3703         to_nodeid = dlm_dir_nodeid(r);
3704
3705         add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3706         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3707         if (error)
3708                 goto fail;
3709
3710         send_args(r, lkb, ms);
3711
3712         error = send_message(mh, ms, r->res_name, r->res_length);
3713         if (error)
3714                 goto fail;
3715         return 0;
3716
3717  fail:
3718         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3719         return error;
3720 }
3721
3722 static int send_remove(struct dlm_rsb *r)
3723 {
3724         struct dlm_message *ms;
3725         struct dlm_mhandle *mh;
3726         int to_nodeid, error;
3727
3728         to_nodeid = dlm_dir_nodeid(r);
3729
3730         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3731         if (error)
3732                 goto out;
3733
3734         memcpy(ms->m_extra, r->res_name, r->res_length);
3735         ms->m_hash = cpu_to_le32(r->res_hash);
3736
3737         error = send_message(mh, ms, r->res_name, r->res_length);
3738  out:
3739         return error;
3740 }
3741
3742 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3743                              int mstype, int rv)
3744 {
3745         struct dlm_message *ms;
3746         struct dlm_mhandle *mh;
3747         int to_nodeid, error;
3748
3749         to_nodeid = lkb->lkb_nodeid;
3750
3751         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3752         if (error)
3753                 goto out;
3754
3755         send_args(r, lkb, ms);
3756
3757         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3758
3759         error = send_message(mh, ms, r->res_name, r->res_length);
3760  out:
3761         return error;
3762 }
3763
3764 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3765 {
3766         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3767 }
3768
3769 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3770 {
3771         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3772 }
3773
3774 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3775 {
3776         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3777 }
3778
3779 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3780 {
3781         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3782 }
3783
3784 static int send_lookup_reply(struct dlm_ls *ls,
3785                              const struct dlm_message *ms_in, int ret_nodeid,
3786                              int rv)
3787 {
3788         struct dlm_rsb *r = &ls->ls_local_rsb;
3789         struct dlm_message *ms;
3790         struct dlm_mhandle *mh;
3791         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3792
3793         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3794         if (error)
3795                 goto out;
3796
3797         ms->m_lkid = ms_in->m_lkid;
3798         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3799         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3800
3801         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3802  out:
3803         return error;
3804 }
3805
3806 /* which args we save from a received message depends heavily on the type
3807    of message, unlike the send side where we can safely send everything about
3808    the lkb for any type of message */
3809
3810 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3811 {
3812         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3813         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3814         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3815 }
3816
3817 static void receive_flags_reply(struct dlm_lkb *lkb,
3818                                 const struct dlm_message *ms,
3819                                 bool local)
3820 {
3821         if (local)
3822                 return;
3823
3824         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3825         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3826 }
3827
3828 static int receive_extralen(const struct dlm_message *ms)
3829 {
3830         return (le16_to_cpu(ms->m_header.h_length) -
3831                 sizeof(struct dlm_message));
3832 }
3833
3834 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3835                        const struct dlm_message *ms)
3836 {
3837         int len;
3838
3839         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3840                 if (!lkb->lkb_lvbptr)
3841                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3842                 if (!lkb->lkb_lvbptr)
3843                         return -ENOMEM;
3844                 len = receive_extralen(ms);
3845                 if (len > ls->ls_lvblen)
3846                         len = ls->ls_lvblen;
3847                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3848         }
3849         return 0;
3850 }
3851
3852 static void fake_bastfn(void *astparam, int mode)
3853 {
3854         log_print("fake_bastfn should not be called");
3855 }
3856
3857 static void fake_astfn(void *astparam)
3858 {
3859         log_print("fake_astfn should not be called");
3860 }
3861
3862 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3863                                 const struct dlm_message *ms)
3864 {
3865         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3866         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3867         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3868         lkb->lkb_grmode = DLM_LOCK_IV;
3869         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3870
3871         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3872         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3873
3874         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3875                 /* lkb was just created so there won't be an lvb yet */
3876                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3877                 if (!lkb->lkb_lvbptr)
3878                         return -ENOMEM;
3879         }
3880
3881         return 0;
3882 }
3883
3884 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3885                                 const struct dlm_message *ms)
3886 {
3887         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3888                 return -EBUSY;
3889
3890         if (receive_lvb(ls, lkb, ms))
3891                 return -ENOMEM;
3892
3893         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3894         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3895
3896         return 0;
3897 }
3898
3899 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3900                                const struct dlm_message *ms)
3901 {
3902         if (receive_lvb(ls, lkb, ms))
3903                 return -ENOMEM;
3904         return 0;
3905 }
3906
3907 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3908    uses to send a reply and that the remote end uses to process the reply. */
3909
3910 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3911 {
3912         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3913         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3914         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3915 }
3916
3917 /* This is called after the rsb is locked so that we can safely inspect
3918    fields in the lkb. */
3919
3920 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3921 {
3922         int from = le32_to_cpu(ms->m_header.h_nodeid);
3923         int error = 0;
3924
3925         /* currently mixing of user/kernel locks are not supported */
3926         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3927             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3928                 log_error(lkb->lkb_resource->res_ls,
3929                           "got user dlm message for a kernel lock");
3930                 error = -EINVAL;
3931                 goto out;
3932         }
3933
3934         switch (ms->m_type) {
3935         case cpu_to_le32(DLM_MSG_CONVERT):
3936         case cpu_to_le32(DLM_MSG_UNLOCK):
3937         case cpu_to_le32(DLM_MSG_CANCEL):
3938                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3939                         error = -EINVAL;
3940                 break;
3941
3942         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3943         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3944         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3945         case cpu_to_le32(DLM_MSG_GRANT):
3946         case cpu_to_le32(DLM_MSG_BAST):
3947                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3948                         error = -EINVAL;
3949                 break;
3950
3951         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3952                 if (!is_process_copy(lkb))
3953                         error = -EINVAL;
3954                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3955                         error = -EINVAL;
3956                 break;
3957
3958         default:
3959                 error = -EINVAL;
3960         }
3961
3962 out:
3963         if (error)
3964                 log_error(lkb->lkb_resource->res_ls,
3965                           "ignore invalid message %d from %d %x %x %x %d",
3966                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3967                           lkb->lkb_remid, dlm_iflags_val(lkb),
3968                           lkb->lkb_nodeid);
3969         return error;
3970 }
3971
3972 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3973 {
3974         struct dlm_lkb *lkb;
3975         struct dlm_rsb *r;
3976         int from_nodeid;
3977         int error, namelen = 0;
3978
3979         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3980
3981         error = create_lkb(ls, &lkb);
3982         if (error)
3983                 goto fail;
3984
3985         receive_flags(lkb, ms);
3986         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3987         error = receive_request_args(ls, lkb, ms);
3988         if (error) {
3989                 __put_lkb(ls, lkb);
3990                 goto fail;
3991         }
3992
3993         /* The dir node is the authority on whether we are the master
3994            for this rsb or not, so if the master sends us a request, we should
3995            recreate the rsb if we've destroyed it.   This race happens when we
3996            send a remove message to the dir node at the same time that the dir
3997            node sends us a request for the rsb. */
3998
3999         namelen = receive_extralen(ms);
4000
4001         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4002                          R_RECEIVE_REQUEST, &r);
4003         if (error) {
4004                 __put_lkb(ls, lkb);
4005                 goto fail;
4006         }
4007
4008         lock_rsb(r);
4009
4010         if (r->res_master_nodeid != dlm_our_nodeid()) {
4011                 error = validate_master_nodeid(ls, r, from_nodeid);
4012                 if (error) {
4013                         unlock_rsb(r);
4014                         put_rsb(r);
4015                         __put_lkb(ls, lkb);
4016                         goto fail;
4017                 }
4018         }
4019
4020         attach_lkb(r, lkb);
4021         error = do_request(r, lkb);
4022         send_request_reply(r, lkb, error);
4023         do_request_effects(r, lkb, error);
4024
4025         unlock_rsb(r);
4026         put_rsb(r);
4027
4028         if (error == -EINPROGRESS)
4029                 error = 0;
4030         if (error)
4031                 dlm_put_lkb(lkb);
4032         return 0;
4033
4034  fail:
4035         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4036            and do this receive_request again from process_lookup_list once
4037            we get the lookup reply.  This would avoid a many repeated
4038            ENOTBLK request failures when the lookup reply designating us
4039            as master is delayed. */
4040
4041         if (error != -ENOTBLK) {
4042                 log_limit(ls, "receive_request %x from %d %d",
4043                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
4044         }
4045
4046         setup_local_lkb(ls, ms);
4047         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4048         return error;
4049 }
4050
4051 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4052 {
4053         struct dlm_lkb *lkb;
4054         struct dlm_rsb *r;
4055         int error, reply = 1;
4056
4057         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4058         if (error)
4059                 goto fail;
4060
4061         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4062                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4063                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4064                           (unsigned long long)lkb->lkb_recover_seq,
4065                           le32_to_cpu(ms->m_header.h_nodeid),
4066                           le32_to_cpu(ms->m_lkid));
4067                 error = -ENOENT;
4068                 dlm_put_lkb(lkb);
4069                 goto fail;
4070         }
4071
4072         r = lkb->lkb_resource;
4073
4074         hold_rsb(r);
4075         lock_rsb(r);
4076
4077         error = validate_message(lkb, ms);
4078         if (error)
4079                 goto out;
4080
4081         receive_flags(lkb, ms);
4082
4083         error = receive_convert_args(ls, lkb, ms);
4084         if (error) {
4085                 send_convert_reply(r, lkb, error);
4086                 goto out;
4087         }
4088
4089         reply = !down_conversion(lkb);
4090
4091         error = do_convert(r, lkb);
4092         if (reply)
4093                 send_convert_reply(r, lkb, error);
4094         do_convert_effects(r, lkb, error);
4095  out:
4096         unlock_rsb(r);
4097         put_rsb(r);
4098         dlm_put_lkb(lkb);
4099         return 0;
4100
4101  fail:
4102         setup_local_lkb(ls, ms);
4103         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4104         return error;
4105 }
4106
4107 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4108 {
4109         struct dlm_lkb *lkb;
4110         struct dlm_rsb *r;
4111         int error;
4112
4113         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4114         if (error)
4115                 goto fail;
4116
4117         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4118                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4119                           lkb->lkb_id, lkb->lkb_remid,
4120                           le32_to_cpu(ms->m_header.h_nodeid),
4121                           le32_to_cpu(ms->m_lkid));
4122                 error = -ENOENT;
4123                 dlm_put_lkb(lkb);
4124                 goto fail;
4125         }
4126
4127         r = lkb->lkb_resource;
4128
4129         hold_rsb(r);
4130         lock_rsb(r);
4131
4132         error = validate_message(lkb, ms);
4133         if (error)
4134                 goto out;
4135
4136         receive_flags(lkb, ms);
4137
4138         error = receive_unlock_args(ls, lkb, ms);
4139         if (error) {
4140                 send_unlock_reply(r, lkb, error);
4141                 goto out;
4142         }
4143
4144         error = do_unlock(r, lkb);
4145         send_unlock_reply(r, lkb, error);
4146         do_unlock_effects(r, lkb, error);
4147  out:
4148         unlock_rsb(r);
4149         put_rsb(r);
4150         dlm_put_lkb(lkb);
4151         return 0;
4152
4153  fail:
4154         setup_local_lkb(ls, ms);
4155         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4156         return error;
4157 }
4158
4159 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4160 {
4161         struct dlm_lkb *lkb;
4162         struct dlm_rsb *r;
4163         int error;
4164
4165         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4166         if (error)
4167                 goto fail;
4168
4169         receive_flags(lkb, ms);
4170
4171         r = lkb->lkb_resource;
4172
4173         hold_rsb(r);
4174         lock_rsb(r);
4175
4176         error = validate_message(lkb, ms);
4177         if (error)
4178                 goto out;
4179
4180         error = do_cancel(r, lkb);
4181         send_cancel_reply(r, lkb, error);
4182         do_cancel_effects(r, lkb, error);
4183  out:
4184         unlock_rsb(r);
4185         put_rsb(r);
4186         dlm_put_lkb(lkb);
4187         return 0;
4188
4189  fail:
4190         setup_local_lkb(ls, ms);
4191         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4192         return error;
4193 }
4194
4195 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4196 {
4197         struct dlm_lkb *lkb;
4198         struct dlm_rsb *r;
4199         int error;
4200
4201         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4202         if (error)
4203                 return error;
4204
4205         r = lkb->lkb_resource;
4206
4207         hold_rsb(r);
4208         lock_rsb(r);
4209
4210         error = validate_message(lkb, ms);
4211         if (error)
4212                 goto out;
4213
4214         receive_flags_reply(lkb, ms, false);
4215         if (is_altmode(lkb))
4216                 munge_altmode(lkb, ms);
4217         grant_lock_pc(r, lkb, ms);
4218         queue_cast(r, lkb, 0);
4219  out:
4220         unlock_rsb(r);
4221         put_rsb(r);
4222         dlm_put_lkb(lkb);
4223         return 0;
4224 }
4225
4226 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4227 {
4228         struct dlm_lkb *lkb;
4229         struct dlm_rsb *r;
4230         int error;
4231
4232         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4233         if (error)
4234                 return error;
4235
4236         r = lkb->lkb_resource;
4237
4238         hold_rsb(r);
4239         lock_rsb(r);
4240
4241         error = validate_message(lkb, ms);
4242         if (error)
4243                 goto out;
4244
4245         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4246         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4247  out:
4248         unlock_rsb(r);
4249         put_rsb(r);
4250         dlm_put_lkb(lkb);
4251         return 0;
4252 }
4253
4254 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4255 {
4256         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4257
4258         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4259         our_nodeid = dlm_our_nodeid();
4260
4261         len = receive_extralen(ms);
4262
4263         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4264                                   &ret_nodeid, NULL);
4265
4266         /* Optimization: we're master so treat lookup as a request */
4267         if (!error && ret_nodeid == our_nodeid) {
4268                 receive_request(ls, ms);
4269                 return;
4270         }
4271         send_lookup_reply(ls, ms, ret_nodeid, error);
4272 }
4273
4274 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4275 {
4276         char name[DLM_RESNAME_MAXLEN+1];
4277         struct dlm_rsb *r;
4278         int rv, len, dir_nodeid, from_nodeid;
4279
4280         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4281
4282         len = receive_extralen(ms);
4283
4284         if (len > DLM_RESNAME_MAXLEN) {
4285                 log_error(ls, "receive_remove from %d bad len %d",
4286                           from_nodeid, len);
4287                 return;
4288         }
4289
4290         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4291         if (dir_nodeid != dlm_our_nodeid()) {
4292                 log_error(ls, "receive_remove from %d bad nodeid %d",
4293                           from_nodeid, dir_nodeid);
4294                 return;
4295         }
4296
4297         /*
4298          * Look for inactive rsb, if it's there, free it.
4299          * If the rsb is active, it's being used, and we should ignore this
4300          * message.  This is an expected race between the dir node sending a
4301          * request to the master node at the same time as the master node sends
4302          * a remove to the dir node.  The resolution to that race is for the
4303          * dir node to ignore the remove message, and the master node to
4304          * recreate the master rsb when it gets a request from the dir node for
4305          * an rsb it doesn't have.
4306          */
4307
4308         memset(name, 0, sizeof(name));
4309         memcpy(name, ms->m_extra, len);
4310
4311         rcu_read_lock();
4312         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4313         if (rv) {
4314                 rcu_read_unlock();
4315                 /* should not happen */
4316                 log_error(ls, "%s from %d not found %s", __func__,
4317                           from_nodeid, name);
4318                 return;
4319         }
4320
4321         write_lock_bh(&ls->ls_rsbtbl_lock);
4322         if (!rsb_flag(r, RSB_HASHED)) {
4323                 rcu_read_unlock();
4324                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4325                 /* should not happen */
4326                 log_error(ls, "%s from %d got removed during removal %s",
4327                           __func__, from_nodeid, name);
4328                 return;
4329         }
4330         /* at this stage the rsb can only being freed here */
4331         rcu_read_unlock();
4332
4333         if (!rsb_flag(r, RSB_INACTIVE)) {
4334                 if (r->res_master_nodeid != from_nodeid) {
4335                         /* should not happen */
4336                         log_error(ls, "receive_remove on active rsb from %d master %d",
4337                                   from_nodeid, r->res_master_nodeid);
4338                         dlm_print_rsb(r);
4339                         write_unlock_bh(&ls->ls_rsbtbl_lock);
4340                         return;
4341                 }
4342
4343                 /* Ignore the remove message, see race comment above. */
4344
4345                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4346                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4347                           name);
4348                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4349                 return;
4350         }
4351
4352         if (r->res_master_nodeid != from_nodeid) {
4353                 log_error(ls, "receive_remove inactive from %d master %d",
4354                           from_nodeid, r->res_master_nodeid);
4355                 dlm_print_rsb(r);
4356                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4357                 return;
4358         }
4359
4360         list_del(&r->res_slow_list);
4361         rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4362                                dlm_rhash_rsb_params);
4363         rsb_clear_flag(r, RSB_HASHED);
4364         write_unlock_bh(&ls->ls_rsbtbl_lock);
4365
4366         free_inactive_rsb(r);
4367 }
4368
4369 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4370 {
4371         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4372 }
4373
4374 static int receive_request_reply(struct dlm_ls *ls,
4375                                  const struct dlm_message *ms)
4376 {
4377         struct dlm_lkb *lkb;
4378         struct dlm_rsb *r;
4379         int error, mstype, result;
4380         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4381
4382         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4383         if (error)
4384                 return error;
4385
4386         r = lkb->lkb_resource;
4387         hold_rsb(r);
4388         lock_rsb(r);
4389
4390         error = validate_message(lkb, ms);
4391         if (error)
4392                 goto out;
4393
4394         mstype = lkb->lkb_wait_type;
4395         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4396         if (error) {
4397                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4398                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4399                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4400                 dlm_dump_rsb(r);
4401                 goto out;
4402         }
4403
4404         /* Optimization: the dir node was also the master, so it took our
4405            lookup as a request and sent request reply instead of lookup reply */
4406         if (mstype == DLM_MSG_LOOKUP) {
4407                 r->res_master_nodeid = from_nodeid;
4408                 r->res_nodeid = from_nodeid;
4409                 lkb->lkb_nodeid = from_nodeid;
4410         }
4411
4412         /* this is the value returned from do_request() on the master */
4413         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4414
4415         switch (result) {
4416         case -EAGAIN:
4417                 /* request would block (be queued) on remote master */
4418                 queue_cast(r, lkb, -EAGAIN);
4419                 confirm_master(r, -EAGAIN);
4420                 unhold_lkb(lkb); /* undoes create_lkb() */
4421                 break;
4422
4423         case -EINPROGRESS:
4424         case 0:
4425                 /* request was queued or granted on remote master */
4426                 receive_flags_reply(lkb, ms, false);
4427                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4428                 if (is_altmode(lkb))
4429                         munge_altmode(lkb, ms);
4430                 if (result) {
4431                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4432                 } else {
4433                         grant_lock_pc(r, lkb, ms);
4434                         queue_cast(r, lkb, 0);
4435                 }
4436                 confirm_master(r, result);
4437                 break;
4438
4439         case -EBADR:
4440         case -ENOTBLK:
4441                 /* find_rsb failed to find rsb or rsb wasn't master */
4442                 log_limit(ls, "receive_request_reply %x from %d %d "
4443                           "master %d dir %d first %x %s", lkb->lkb_id,
4444                           from_nodeid, result, r->res_master_nodeid,
4445                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4446
4447                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4448                     r->res_master_nodeid != dlm_our_nodeid()) {
4449                         /* cause _request_lock->set_master->send_lookup */
4450                         r->res_master_nodeid = 0;
4451                         r->res_nodeid = -1;
4452                         lkb->lkb_nodeid = -1;
4453                 }
4454
4455                 if (is_overlap(lkb)) {
4456                         /* we'll ignore error in cancel/unlock reply */
4457                         queue_cast_overlap(r, lkb);
4458                         confirm_master(r, result);
4459                         unhold_lkb(lkb); /* undoes create_lkb() */
4460                 } else {
4461                         _request_lock(r, lkb);
4462
4463                         if (r->res_master_nodeid == dlm_our_nodeid())
4464                                 confirm_master(r, 0);
4465                 }
4466                 break;
4467
4468         default:
4469                 log_error(ls, "receive_request_reply %x error %d",
4470                           lkb->lkb_id, result);
4471         }
4472
4473         if ((result == 0 || result == -EINPROGRESS) &&
4474             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4475                 log_debug(ls, "receive_request_reply %x result %d unlock",
4476                           lkb->lkb_id, result);
4477                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4478                 send_unlock(r, lkb);
4479         } else if ((result == -EINPROGRESS) &&
4480                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4481                                       &lkb->lkb_iflags)) {
4482                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4483                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4484                 send_cancel(r, lkb);
4485         } else {
4486                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4487                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4488         }
4489  out:
4490         unlock_rsb(r);
4491         put_rsb(r);
4492         dlm_put_lkb(lkb);
4493         return 0;
4494 }
4495
4496 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4497                                     const struct dlm_message *ms, bool local)
4498 {
4499         /* this is the value returned from do_convert() on the master */
4500         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4501         case -EAGAIN:
4502                 /* convert would block (be queued) on remote master */
4503                 queue_cast(r, lkb, -EAGAIN);
4504                 break;
4505
4506         case -EDEADLK:
4507                 receive_flags_reply(lkb, ms, local);
4508                 revert_lock_pc(r, lkb);
4509                 queue_cast(r, lkb, -EDEADLK);
4510                 break;
4511
4512         case -EINPROGRESS:
4513                 /* convert was queued on remote master */
4514                 receive_flags_reply(lkb, ms, local);
4515                 if (is_demoted(lkb))
4516                         munge_demoted(lkb);
4517                 del_lkb(r, lkb);
4518                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4519                 break;
4520
4521         case 0:
4522                 /* convert was granted on remote master */
4523                 receive_flags_reply(lkb, ms, local);
4524                 if (is_demoted(lkb))
4525                         munge_demoted(lkb);
4526                 grant_lock_pc(r, lkb, ms);
4527                 queue_cast(r, lkb, 0);
4528                 break;
4529
4530         default:
4531                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4532                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4533                           le32_to_cpu(ms->m_lkid),
4534                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4535                 dlm_print_rsb(r);
4536                 dlm_print_lkb(lkb);
4537         }
4538 }
4539
4540 static void _receive_convert_reply(struct dlm_lkb *lkb,
4541                                    const struct dlm_message *ms, bool local)
4542 {
4543         struct dlm_rsb *r = lkb->lkb_resource;
4544         int error;
4545
4546         hold_rsb(r);
4547         lock_rsb(r);
4548
4549         error = validate_message(lkb, ms);
4550         if (error)
4551                 goto out;
4552
4553         error = remove_from_waiters_ms(lkb, ms, local);
4554         if (error)
4555                 goto out;
4556
4557         __receive_convert_reply(r, lkb, ms, local);
4558  out:
4559         unlock_rsb(r);
4560         put_rsb(r);
4561 }
4562
4563 static int receive_convert_reply(struct dlm_ls *ls,
4564                                  const struct dlm_message *ms)
4565 {
4566         struct dlm_lkb *lkb;
4567         int error;
4568
4569         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4570         if (error)
4571                 return error;
4572
4573         _receive_convert_reply(lkb, ms, false);
4574         dlm_put_lkb(lkb);
4575         return 0;
4576 }
4577
4578 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4579                                   const struct dlm_message *ms, bool local)
4580 {
4581         struct dlm_rsb *r = lkb->lkb_resource;
4582         int error;
4583
4584         hold_rsb(r);
4585         lock_rsb(r);
4586
4587         error = validate_message(lkb, ms);
4588         if (error)
4589                 goto out;
4590
4591         error = remove_from_waiters_ms(lkb, ms, local);
4592         if (error)
4593                 goto out;
4594
4595         /* this is the value returned from do_unlock() on the master */
4596
4597         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4598         case -DLM_EUNLOCK:
4599                 receive_flags_reply(lkb, ms, local);
4600                 remove_lock_pc(r, lkb);
4601                 queue_cast(r, lkb, -DLM_EUNLOCK);
4602                 break;
4603         case -ENOENT:
4604                 break;
4605         default:
4606                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4607                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4608         }
4609  out:
4610         unlock_rsb(r);
4611         put_rsb(r);
4612 }
4613
4614 static int receive_unlock_reply(struct dlm_ls *ls,
4615                                 const struct dlm_message *ms)
4616 {
4617         struct dlm_lkb *lkb;
4618         int error;
4619
4620         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4621         if (error)
4622                 return error;
4623
4624         _receive_unlock_reply(lkb, ms, false);
4625         dlm_put_lkb(lkb);
4626         return 0;
4627 }
4628
4629 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4630                                   const struct dlm_message *ms, bool local)
4631 {
4632         struct dlm_rsb *r = lkb->lkb_resource;
4633         int error;
4634
4635         hold_rsb(r);
4636         lock_rsb(r);
4637
4638         error = validate_message(lkb, ms);
4639         if (error)
4640                 goto out;
4641
4642         error = remove_from_waiters_ms(lkb, ms, local);
4643         if (error)
4644                 goto out;
4645
4646         /* this is the value returned from do_cancel() on the master */
4647
4648         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4649         case -DLM_ECANCEL:
4650                 receive_flags_reply(lkb, ms, local);
4651                 revert_lock_pc(r, lkb);
4652                 queue_cast(r, lkb, -DLM_ECANCEL);
4653                 break;
4654         case 0:
4655                 break;
4656         default:
4657                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4658                           lkb->lkb_id,
4659                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4660         }
4661  out:
4662         unlock_rsb(r);
4663         put_rsb(r);
4664 }
4665
4666 static int receive_cancel_reply(struct dlm_ls *ls,
4667                                 const struct dlm_message *ms)
4668 {
4669         struct dlm_lkb *lkb;
4670         int error;
4671
4672         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4673         if (error)
4674                 return error;
4675
4676         _receive_cancel_reply(lkb, ms, false);
4677         dlm_put_lkb(lkb);
4678         return 0;
4679 }
4680
4681 static void receive_lookup_reply(struct dlm_ls *ls,
4682                                  const struct dlm_message *ms)
4683 {
4684         struct dlm_lkb *lkb;
4685         struct dlm_rsb *r;
4686         int error, ret_nodeid;
4687         int do_lookup_list = 0;
4688
4689         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4690         if (error) {
4691                 log_error(ls, "%s no lkid %x", __func__,
4692                           le32_to_cpu(ms->m_lkid));
4693                 return;
4694         }
4695
4696         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4697            FIXME: will a non-zero error ever be returned? */
4698
4699         r = lkb->lkb_resource;
4700         hold_rsb(r);
4701         lock_rsb(r);
4702
4703         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4704         if (error)
4705                 goto out;
4706
4707         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4708
4709         /* We sometimes receive a request from the dir node for this
4710            rsb before we've received the dir node's loookup_reply for it.
4711            The request from the dir node implies we're the master, so we set
4712            ourself as master in receive_request_reply, and verify here that
4713            we are indeed the master. */
4714
4715         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4716                 /* This should never happen */
4717                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4718                           "master %d dir %d our %d first %x %s",
4719                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4720                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4721                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4722         }
4723
4724         if (ret_nodeid == dlm_our_nodeid()) {
4725                 r->res_master_nodeid = ret_nodeid;
4726                 r->res_nodeid = 0;
4727                 do_lookup_list = 1;
4728                 r->res_first_lkid = 0;
4729         } else if (ret_nodeid == -1) {
4730                 /* the remote node doesn't believe it's the dir node */
4731                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4732                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4733                 r->res_master_nodeid = 0;
4734                 r->res_nodeid = -1;
4735                 lkb->lkb_nodeid = -1;
4736         } else {
4737                 /* set_master() will set lkb_nodeid from r */
4738                 r->res_master_nodeid = ret_nodeid;
4739                 r->res_nodeid = ret_nodeid;
4740         }
4741
4742         if (is_overlap(lkb)) {
4743                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4744                           lkb->lkb_id, dlm_iflags_val(lkb));
4745                 queue_cast_overlap(r, lkb);
4746                 unhold_lkb(lkb); /* undoes create_lkb() */
4747                 goto out_list;
4748         }
4749
4750         _request_lock(r, lkb);
4751
4752  out_list:
4753         if (do_lookup_list)
4754                 process_lookup_list(r);
4755  out:
4756         unlock_rsb(r);
4757         put_rsb(r);
4758         dlm_put_lkb(lkb);
4759 }
4760
4761 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4762                              uint32_t saved_seq)
4763 {
4764         int error = 0, noent = 0;
4765
4766         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4767                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4768                           le32_to_cpu(ms->m_type),
4769                           le32_to_cpu(ms->m_header.h_nodeid),
4770                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4771                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4772                 return;
4773         }
4774
4775         switch (ms->m_type) {
4776
4777         /* messages sent to a master node */
4778
4779         case cpu_to_le32(DLM_MSG_REQUEST):
4780                 error = receive_request(ls, ms);
4781                 break;
4782
4783         case cpu_to_le32(DLM_MSG_CONVERT):
4784                 error = receive_convert(ls, ms);
4785                 break;
4786
4787         case cpu_to_le32(DLM_MSG_UNLOCK):
4788                 error = receive_unlock(ls, ms);
4789                 break;
4790
4791         case cpu_to_le32(DLM_MSG_CANCEL):
4792                 noent = 1;
4793                 error = receive_cancel(ls, ms);
4794                 break;
4795
4796         /* messages sent from a master node (replies to above) */
4797
4798         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4799                 error = receive_request_reply(ls, ms);
4800                 break;
4801
4802         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4803                 error = receive_convert_reply(ls, ms);
4804                 break;
4805
4806         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4807                 error = receive_unlock_reply(ls, ms);
4808                 break;
4809
4810         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4811                 error = receive_cancel_reply(ls, ms);
4812                 break;
4813
4814         /* messages sent from a master node (only two types of async msg) */
4815
4816         case cpu_to_le32(DLM_MSG_GRANT):
4817                 noent = 1;
4818                 error = receive_grant(ls, ms);
4819                 break;
4820
4821         case cpu_to_le32(DLM_MSG_BAST):
4822                 noent = 1;
4823                 error = receive_bast(ls, ms);
4824                 break;
4825
4826         /* messages sent to a dir node */
4827
4828         case cpu_to_le32(DLM_MSG_LOOKUP):
4829                 receive_lookup(ls, ms);
4830                 break;
4831
4832         case cpu_to_le32(DLM_MSG_REMOVE):
4833                 receive_remove(ls, ms);
4834                 break;
4835
4836         /* messages sent from a dir node (remove has no reply) */
4837
4838         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4839                 receive_lookup_reply(ls, ms);
4840                 break;
4841
4842         /* other messages */
4843
4844         case cpu_to_le32(DLM_MSG_PURGE):
4845                 receive_purge(ls, ms);
4846                 break;
4847
4848         default:
4849                 log_error(ls, "unknown message type %d",
4850                           le32_to_cpu(ms->m_type));
4851         }
4852
4853         /*
4854          * When checking for ENOENT, we're checking the result of
4855          * find_lkb(m_remid):
4856          *
4857          * The lock id referenced in the message wasn't found.  This may
4858          * happen in normal usage for the async messages and cancel, so
4859          * only use log_debug for them.
4860          *
4861          * Some errors are expected and normal.
4862          */
4863
4864         if (error == -ENOENT && noent) {
4865                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4866                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4867                           le32_to_cpu(ms->m_header.h_nodeid),
4868                           le32_to_cpu(ms->m_lkid), saved_seq);
4869         } else if (error == -ENOENT) {
4870                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4871                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4872                           le32_to_cpu(ms->m_header.h_nodeid),
4873                           le32_to_cpu(ms->m_lkid), saved_seq);
4874
4875                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4876                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4877         }
4878
4879         if (error == -EINVAL) {
4880                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4881                           "saved_seq %u",
4882                           le32_to_cpu(ms->m_type),
4883                           le32_to_cpu(ms->m_header.h_nodeid),
4884                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4885                           saved_seq);
4886         }
4887 }
4888
4889 /* If the lockspace is in recovery mode (locking stopped), then normal
4890    messages are saved on the requestqueue for processing after recovery is
4891    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4892    messages off the requestqueue before we process new ones. This occurs right
4893    after recovery completes when we transition from saving all messages on
4894    requestqueue, to processing all the saved messages, to processing new
4895    messages as they arrive. */
4896
4897 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4898                                 int nodeid)
4899 {
4900 try_again:
4901         read_lock_bh(&ls->ls_requestqueue_lock);
4902         if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4903                 /* If we were a member of this lockspace, left, and rejoined,
4904                    other nodes may still be sending us messages from the
4905                    lockspace generation before we left. */
4906                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4907                         read_unlock_bh(&ls->ls_requestqueue_lock);
4908                         log_limit(ls, "receive %d from %d ignore old gen",
4909                                   le32_to_cpu(ms->m_type), nodeid);
4910                         return;
4911                 }
4912
4913                 read_unlock_bh(&ls->ls_requestqueue_lock);
4914                 write_lock_bh(&ls->ls_requestqueue_lock);
4915                 /* recheck because we hold writelock now */
4916                 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917                         write_unlock_bh(&ls->ls_requestqueue_lock);
4918                         goto try_again;
4919                 }
4920
4921                 dlm_add_requestqueue(ls, nodeid, ms);
4922                 write_unlock_bh(&ls->ls_requestqueue_lock);
4923         } else {
4924                 _receive_message(ls, ms, 0);
4925                 read_unlock_bh(&ls->ls_requestqueue_lock);
4926         }
4927 }
4928
4929 /* This is called by dlm_recoverd to process messages that were saved on
4930    the requestqueue. */
4931
4932 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4933                                uint32_t saved_seq)
4934 {
4935         _receive_message(ls, ms, saved_seq);
4936 }
4937
4938 /* This is called by the midcomms layer when something is received for
4939    the lockspace.  It could be either a MSG (normal message sent as part of
4940    standard locking activity) or an RCOM (recovery message sent as part of
4941    lockspace recovery). */
4942
4943 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4944 {
4945         const struct dlm_header *hd = &p->header;
4946         struct dlm_ls *ls;
4947         int type = 0;
4948
4949         switch (hd->h_cmd) {
4950         case DLM_MSG:
4951                 type = le32_to_cpu(p->message.m_type);
4952                 break;
4953         case DLM_RCOM:
4954                 type = le32_to_cpu(p->rcom.rc_type);
4955                 break;
4956         default:
4957                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4958                 return;
4959         }
4960
4961         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4962                 log_print("invalid h_nodeid %d from %d lockspace %x",
4963                           le32_to_cpu(hd->h_nodeid), nodeid,
4964                           le32_to_cpu(hd->u.h_lockspace));
4965                 return;
4966         }
4967
4968         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4969         if (!ls) {
4970                 if (dlm_config.ci_log_debug) {
4971                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4972                                 "%u from %d cmd %d type %d\n",
4973                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4974                                 hd->h_cmd, type);
4975                 }
4976
4977                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4978                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4979                 return;
4980         }
4981
4982         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4983            be inactive (in this ls) before transitioning to recovery mode */
4984
4985         read_lock_bh(&ls->ls_recv_active);
4986         if (hd->h_cmd == DLM_MSG)
4987                 dlm_receive_message(ls, &p->message, nodeid);
4988         else if (hd->h_cmd == DLM_RCOM)
4989                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4990         else
4991                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4992                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4993         read_unlock_bh(&ls->ls_recv_active);
4994
4995         dlm_put_lockspace(ls);
4996 }
4997
4998 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4999                                    struct dlm_message *ms_local)
5000 {
5001         if (middle_conversion(lkb)) {
5002                 log_rinfo(ls, "%s %x middle convert in progress", __func__,
5003                          lkb->lkb_id);
5004
5005                 /* We sent this lock to the new master. The new master will
5006                  * tell us when it's granted.  We no longer need a reply, so
5007                  * use a fake reply to put the lkb into the right state.
5008                  */
5009                 hold_lkb(lkb);
5010                 memset(ms_local, 0, sizeof(struct dlm_message));
5011                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5012                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5013                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5014                 _receive_convert_reply(lkb, ms_local, true);
5015                 unhold_lkb(lkb);
5016
5017         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5018                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5019         }
5020
5021         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5022            conversions are async; there's no reply from the remote master */
5023 }
5024
5025 /* A waiting lkb needs recovery if the master node has failed, or
5026    the master node is changing (only when no directory is used) */
5027
5028 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5029                                  int dir_nodeid)
5030 {
5031         if (dlm_no_directory(ls))
5032                 return 1;
5033
5034         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5035                 return 1;
5036
5037         return 0;
5038 }
5039
5040 /* Recovery for locks that are waiting for replies from nodes that are now
5041    gone.  We can just complete unlocks and cancels by faking a reply from the
5042    dead node.  Requests and up-conversions we flag to be resent after
5043    recovery.  Down-conversions can just be completed with a fake reply like
5044    unlocks.  Conversions between PR and CW need special attention. */
5045
5046 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5047 {
5048         struct dlm_lkb *lkb, *safe;
5049         struct dlm_message *ms_local;
5050         int wait_type, local_unlock_result, local_cancel_result;
5051         int dir_nodeid;
5052
5053         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5054         if (!ms_local)
5055                 return;
5056
5057         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5058
5059                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5060
5061                 /* exclude debug messages about unlocks because there can be so
5062                    many and they aren't very interesting */
5063
5064                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5065                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5066                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5067                                   lkb->lkb_id,
5068                                   lkb->lkb_remid,
5069                                   lkb->lkb_wait_type,
5070                                   lkb->lkb_resource->res_nodeid,
5071                                   lkb->lkb_nodeid,
5072                                   lkb->lkb_wait_nodeid,
5073                                   dir_nodeid);
5074                 }
5075
5076                 /* all outstanding lookups, regardless of destination  will be
5077                    resent after recovery is done */
5078
5079                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5080                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5081                         continue;
5082                 }
5083
5084                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5085                         continue;
5086
5087                 wait_type = lkb->lkb_wait_type;
5088                 local_unlock_result = -DLM_EUNLOCK;
5089                 local_cancel_result = -DLM_ECANCEL;
5090
5091                 /* Main reply may have been received leaving a zero wait_type,
5092                    but a reply for the overlapping op may not have been
5093                    received.  In that case we need to fake the appropriate
5094                    reply for the overlap op. */
5095
5096                 if (!wait_type) {
5097                         if (is_overlap_cancel(lkb)) {
5098                                 wait_type = DLM_MSG_CANCEL;
5099                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5100                                         local_cancel_result = 0;
5101                         }
5102                         if (is_overlap_unlock(lkb)) {
5103                                 wait_type = DLM_MSG_UNLOCK;
5104                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5105                                         local_unlock_result = -ENOENT;
5106                         }
5107
5108                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5109                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5110                                   local_cancel_result, local_unlock_result);
5111                 }
5112
5113                 switch (wait_type) {
5114
5115                 case DLM_MSG_REQUEST:
5116                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5117                         break;
5118
5119                 case DLM_MSG_CONVERT:
5120                         recover_convert_waiter(ls, lkb, ms_local);
5121                         break;
5122
5123                 case DLM_MSG_UNLOCK:
5124                         hold_lkb(lkb);
5125                         memset(ms_local, 0, sizeof(struct dlm_message));
5126                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5127                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5128                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5129                         _receive_unlock_reply(lkb, ms_local, true);
5130                         dlm_put_lkb(lkb);
5131                         break;
5132
5133                 case DLM_MSG_CANCEL:
5134                         hold_lkb(lkb);
5135                         memset(ms_local, 0, sizeof(struct dlm_message));
5136                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5137                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5138                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5139                         _receive_cancel_reply(lkb, ms_local, true);
5140                         dlm_put_lkb(lkb);
5141                         break;
5142
5143                 default:
5144                         log_error(ls, "invalid lkb wait_type %d %d",
5145                                   lkb->lkb_wait_type, wait_type);
5146                 }
5147                 schedule();
5148         }
5149         kfree(ms_local);
5150 }
5151
5152 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5153 {
5154         struct dlm_lkb *lkb = NULL, *iter;
5155
5156         spin_lock_bh(&ls->ls_waiters_lock);
5157         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5158                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5159                         hold_lkb(iter);
5160                         lkb = iter;
5161                         break;
5162                 }
5163         }
5164         spin_unlock_bh(&ls->ls_waiters_lock);
5165
5166         return lkb;
5167 }
5168
5169 /*
5170  * Forced state reset for locks that were in the middle of remote operations
5171  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5172  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5173  * list need to be reevaluated; some may need resending to a different node
5174  * than previously, and some may now need local handling rather than remote.
5175  *
5176  * First, the lkb state for the voided remote operation is forcibly reset,
5177  * equivalent to what remove_from_waiters() would normally do:
5178  * . lkb removed from ls_waiters list
5179  * . lkb wait_type cleared
5180  * . lkb waiters_count cleared
5181  * . lkb ref count decremented for each waiters_count (almost always 1,
5182  *   but possibly 2 in case of cancel/unlock overlapping, which means
5183  *   two remote replies were being expected for the lkb.)
5184  *
5185  * Second, the lkb is reprocessed like an original operation would be,
5186  * by passing it to _request_lock or _convert_lock, which will either
5187  * process the lkb operation locally, or send it to a remote node again
5188  * and put the lkb back onto the waiters list.
5189  *
5190  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5191  * force-unlock or cancel, either from before recovery began, or after recovery
5192  * finished.  If this is the case, the unlock/cancel is done directly, and the
5193  * original operation is not initiated again (no _request_lock/_convert_lock.)
5194  */
5195
5196 int dlm_recover_waiters_post(struct dlm_ls *ls)
5197 {
5198         struct dlm_lkb *lkb;
5199         struct dlm_rsb *r;
5200         int error = 0, mstype, err, oc, ou;
5201
5202         while (1) {
5203                 if (dlm_locking_stopped(ls)) {
5204                         log_debug(ls, "recover_waiters_post aborted");
5205                         error = -EINTR;
5206                         break;
5207                 }
5208
5209                 /* 
5210                  * Find an lkb from the waiters list that's been affected by
5211                  * recovery node changes, and needs to be reprocessed.  Does
5212                  * hold_lkb(), adding a refcount.
5213                  */
5214                 lkb = find_resend_waiter(ls);
5215                 if (!lkb)
5216                         break;
5217
5218                 r = lkb->lkb_resource;
5219                 hold_rsb(r);
5220                 lock_rsb(r);
5221
5222                 /*
5223                  * If the lkb has been flagged for a force unlock or cancel,
5224                  * then the reprocessing below will be replaced by just doing
5225                  * the unlock/cancel directly.
5226                  */
5227                 mstype = lkb->lkb_wait_type;
5228                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5229                                         &lkb->lkb_iflags);
5230                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5231                                         &lkb->lkb_iflags);
5232                 err = 0;
5233
5234                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5235                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5236                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5237                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5238                           dlm_dir_nodeid(r), oc, ou);
5239
5240                 /*
5241                  * No reply to the pre-recovery operation will now be received,
5242                  * so a forced equivalent of remove_from_waiters() is needed to
5243                  * reset the waiters state that was in place before recovery.
5244                  */
5245
5246                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5247
5248                 /* Forcibly clear wait_type */
5249                 lkb->lkb_wait_type = 0;
5250
5251                 /*
5252                  * Forcibly reset wait_count and associated refcount.  The
5253                  * wait_count will almost always be 1, but in case of an
5254                  * overlapping unlock/cancel it could be 2: see where
5255                  * add_to_waiters() finds the lkb is already on the waiters
5256                  * list and does lkb_wait_count++; hold_lkb().
5257                  */
5258                 while (lkb->lkb_wait_count) {
5259                         lkb->lkb_wait_count--;
5260                         unhold_lkb(lkb);
5261                 }
5262
5263                 /* Forcibly remove from waiters list */
5264                 spin_lock_bh(&ls->ls_waiters_lock);
5265                 list_del_init(&lkb->lkb_wait_reply);
5266                 spin_unlock_bh(&ls->ls_waiters_lock);
5267
5268                 /*
5269                  * The lkb is now clear of all prior waiters state and can be
5270                  * processed locally, or sent to remote node again, or directly
5271                  * cancelled/unlocked.
5272                  */
5273
5274                 if (oc || ou) {
5275                         /* do an unlock or cancel instead of resending */
5276                         switch (mstype) {
5277                         case DLM_MSG_LOOKUP:
5278                         case DLM_MSG_REQUEST:
5279                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5280                                                         -DLM_ECANCEL);
5281                                 unhold_lkb(lkb); /* undoes create_lkb() */
5282                                 break;
5283                         case DLM_MSG_CONVERT:
5284                                 if (oc) {
5285                                         queue_cast(r, lkb, -DLM_ECANCEL);
5286                                 } else {
5287                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5288                                         _unlock_lock(r, lkb);
5289                                 }
5290                                 break;
5291                         default:
5292                                 err = 1;
5293                         }
5294                 } else {
5295                         switch (mstype) {
5296                         case DLM_MSG_LOOKUP:
5297                         case DLM_MSG_REQUEST:
5298                                 _request_lock(r, lkb);
5299                                 if (r->res_nodeid != -1 && is_master(r))
5300                                         confirm_master(r, 0);
5301                                 break;
5302                         case DLM_MSG_CONVERT:
5303                                 _convert_lock(r, lkb);
5304                                 break;
5305                         default:
5306                                 err = 1;
5307                         }
5308                 }
5309
5310                 if (err) {
5311                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5312                                   "dir_nodeid %d overlap %d %d",
5313                                   lkb->lkb_id, mstype, r->res_nodeid,
5314                                   dlm_dir_nodeid(r), oc, ou);
5315                 }
5316                 unlock_rsb(r);
5317                 put_rsb(r);
5318                 dlm_put_lkb(lkb);
5319         }
5320
5321         return error;
5322 }
5323
5324 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5325                               struct list_head *list)
5326 {
5327         struct dlm_lkb *lkb, *safe;
5328
5329         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5330                 if (!is_master_copy(lkb))
5331                         continue;
5332
5333                 /* don't purge lkbs we've added in recover_master_copy for
5334                    the current recovery seq */
5335
5336                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5337                         continue;
5338
5339                 del_lkb(r, lkb);
5340
5341                 /* this put should free the lkb */
5342                 if (!dlm_put_lkb(lkb))
5343                         log_error(ls, "purged mstcpy lkb not released");
5344         }
5345 }
5346
5347 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5348 {
5349         struct dlm_ls *ls = r->res_ls;
5350
5351         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5352         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5353         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5354 }
5355
5356 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5357                             struct list_head *list,
5358                             int nodeid_gone, unsigned int *count)
5359 {
5360         struct dlm_lkb *lkb, *safe;
5361
5362         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5363                 if (!is_master_copy(lkb))
5364                         continue;
5365
5366                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5367                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5368
5369                         /* tell recover_lvb to invalidate the lvb
5370                            because a node holding EX/PW failed */
5371                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5372                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5373                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5374                         }
5375
5376                         del_lkb(r, lkb);
5377
5378                         /* this put should free the lkb */
5379                         if (!dlm_put_lkb(lkb))
5380                                 log_error(ls, "purged dead lkb not released");
5381
5382                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5383
5384                         (*count)++;
5385                 }
5386         }
5387 }
5388
5389 /* Get rid of locks held by nodes that are gone. */
5390
5391 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5392 {
5393         struct dlm_rsb *r;
5394         struct dlm_member *memb;
5395         int nodes_count = 0;
5396         int nodeid_gone = 0;
5397         unsigned int lkb_count = 0;
5398
5399         /* cache one removed nodeid to optimize the common
5400            case of a single node removed */
5401
5402         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5403                 nodes_count++;
5404                 nodeid_gone = memb->nodeid;
5405         }
5406
5407         if (!nodes_count)
5408                 return;
5409
5410         list_for_each_entry(r, root_list, res_root_list) {
5411                 lock_rsb(r);
5412                 if (r->res_nodeid != -1 && is_master(r)) {
5413                         purge_dead_list(ls, r, &r->res_grantqueue,
5414                                         nodeid_gone, &lkb_count);
5415                         purge_dead_list(ls, r, &r->res_convertqueue,
5416                                         nodeid_gone, &lkb_count);
5417                         purge_dead_list(ls, r, &r->res_waitqueue,
5418                                         nodeid_gone, &lkb_count);
5419                 }
5420                 unlock_rsb(r);
5421
5422                 cond_resched();
5423         }
5424
5425         if (lkb_count)
5426                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5427                           lkb_count, nodes_count);
5428 }
5429
5430 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5431 {
5432         struct dlm_rsb *r;
5433
5434         read_lock_bh(&ls->ls_rsbtbl_lock);
5435         list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5436                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5437                         continue;
5438                 if (!is_master(r)) {
5439                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5440                         continue;
5441                 }
5442                 hold_rsb(r);
5443                 read_unlock_bh(&ls->ls_rsbtbl_lock);
5444                 return r;
5445         }
5446         read_unlock_bh(&ls->ls_rsbtbl_lock);
5447         return NULL;
5448 }
5449
5450 /*
5451  * Attempt to grant locks on resources that we are the master of.
5452  * Locks may have become grantable during recovery because locks
5453  * from departed nodes have been purged (or not rebuilt), allowing
5454  * previously blocked locks to now be granted.  The subset of rsb's
5455  * we are interested in are those with lkb's on either the convert or
5456  * waiting queues.
5457  *
5458  * Simplest would be to go through each master rsb and check for non-empty
5459  * convert or waiting queues, and attempt to grant on those rsbs.
5460  * Checking the queues requires lock_rsb, though, for which we'd need
5461  * to release the rsbtbl lock.  This would make iterating through all
5462  * rsb's very inefficient.  So, we rely on earlier recovery routines
5463  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5464  * locks for.
5465  */
5466
5467 void dlm_recover_grant(struct dlm_ls *ls)
5468 {
5469         struct dlm_rsb *r;
5470         unsigned int count = 0;
5471         unsigned int rsb_count = 0;
5472         unsigned int lkb_count = 0;
5473
5474         while (1) {
5475                 r = find_grant_rsb(ls);
5476                 if (!r)
5477                         break;
5478
5479                 rsb_count++;
5480                 count = 0;
5481                 lock_rsb(r);
5482                 /* the RECOVER_GRANT flag is checked in the grant path */
5483                 grant_pending_locks(r, &count);
5484                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5485                 lkb_count += count;
5486                 confirm_master(r, 0);
5487                 unlock_rsb(r);
5488                 put_rsb(r);
5489                 cond_resched();
5490         }
5491
5492         if (lkb_count)
5493                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5494                           lkb_count, rsb_count);
5495 }
5496
5497 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5498                                          uint32_t remid)
5499 {
5500         struct dlm_lkb *lkb;
5501
5502         list_for_each_entry(lkb, head, lkb_statequeue) {
5503                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5504                         return lkb;
5505         }
5506         return NULL;
5507 }
5508
5509 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5510                                     uint32_t remid)
5511 {
5512         struct dlm_lkb *lkb;
5513
5514         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5515         if (lkb)
5516                 return lkb;
5517         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5518         if (lkb)
5519                 return lkb;
5520         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5521         if (lkb)
5522                 return lkb;
5523         return NULL;
5524 }
5525
5526 /* needs at least dlm_rcom + rcom_lock */
5527 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5528                                   struct dlm_rsb *r, const struct dlm_rcom *rc)
5529 {
5530         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5531
5532         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5533         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5534         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5535         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5536         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5537         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5538         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5539         lkb->lkb_rqmode = rl->rl_rqmode;
5540         lkb->lkb_grmode = rl->rl_grmode;
5541         /* don't set lkb_status because add_lkb wants to itself */
5542
5543         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5544         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5545
5546         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5547                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5548                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5549                 if (lvblen > ls->ls_lvblen)
5550                         return -EINVAL;
5551                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5552                 if (!lkb->lkb_lvbptr)
5553                         return -ENOMEM;
5554                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5555         }
5556
5557         /* Conversions between PR and CW (middle modes) need special handling.
5558            The real granted mode of these converting locks cannot be determined
5559            until all locks have been rebuilt on the rsb (recover_conversion) */
5560
5561         if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5562                 /* We may need to adjust grmode depending on other granted locks. */
5563                 log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5564                           __func__, lkb->lkb_id, lkb->lkb_grmode,
5565                           lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5566                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5567         }
5568
5569         return 0;
5570 }
5571
5572 /* This lkb may have been recovered in a previous aborted recovery so we need
5573    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5574    If so we just send back a standard reply.  If not, we create a new lkb with
5575    the given values and send back our lkid.  We send back our lkid by sending
5576    back the rcom_lock struct we got but with the remid field filled in. */
5577
5578 /* needs at least dlm_rcom + rcom_lock */
5579 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5580                             __le32 *rl_remid, __le32 *rl_result)
5581 {
5582         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5583         struct dlm_rsb *r;
5584         struct dlm_lkb *lkb;
5585         uint32_t remid = 0;
5586         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5587         int error;
5588
5589         /* init rl_remid with rcom lock rl_remid */
5590         *rl_remid = rl->rl_remid;
5591
5592         if (rl->rl_parent_lkid) {
5593                 error = -EOPNOTSUPP;
5594                 goto out;
5595         }
5596
5597         remid = le32_to_cpu(rl->rl_lkid);
5598
5599         /* In general we expect the rsb returned to be R_MASTER, but we don't
5600            have to require it.  Recovery of masters on one node can overlap
5601            recovery of locks on another node, so one node can send us MSTCPY
5602            locks before we've made ourselves master of this rsb.  We can still
5603            add new MSTCPY locks that we receive here without any harm; when
5604            we make ourselves master, dlm_recover_masters() won't touch the
5605            MSTCPY locks we've received early. */
5606
5607         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5608                          from_nodeid, R_RECEIVE_RECOVER, &r);
5609         if (error)
5610                 goto out;
5611
5612         lock_rsb(r);
5613
5614         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5615                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5616                           from_nodeid, remid);
5617                 error = -EBADR;
5618                 goto out_unlock;
5619         }
5620
5621         lkb = search_remid(r, from_nodeid, remid);
5622         if (lkb) {
5623                 error = -EEXIST;
5624                 goto out_remid;
5625         }
5626
5627         error = create_lkb(ls, &lkb);
5628         if (error)
5629                 goto out_unlock;
5630
5631         error = receive_rcom_lock_args(ls, lkb, r, rc);
5632         if (error) {
5633                 __put_lkb(ls, lkb);
5634                 goto out_unlock;
5635         }
5636
5637         attach_lkb(r, lkb);
5638         add_lkb(r, lkb, rl->rl_status);
5639         ls->ls_recover_locks_in++;
5640
5641         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5642                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5643
5644  out_remid:
5645         /* this is the new value returned to the lock holder for
5646            saving in its process-copy lkb */
5647         *rl_remid = cpu_to_le32(lkb->lkb_id);
5648
5649         lkb->lkb_recover_seq = ls->ls_recover_seq;
5650
5651  out_unlock:
5652         unlock_rsb(r);
5653         put_rsb(r);
5654  out:
5655         if (error && error != -EEXIST)
5656                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5657                           from_nodeid, remid, error);
5658         *rl_result = cpu_to_le32(error);
5659         return error;
5660 }
5661
5662 /* needs at least dlm_rcom + rcom_lock */
5663 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5664                              uint64_t seq)
5665 {
5666         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5667         struct dlm_rsb *r;
5668         struct dlm_lkb *lkb;
5669         uint32_t lkid, remid;
5670         int error, result;
5671
5672         lkid = le32_to_cpu(rl->rl_lkid);
5673         remid = le32_to_cpu(rl->rl_remid);
5674         result = le32_to_cpu(rl->rl_result);
5675
5676         error = find_lkb(ls, lkid, &lkb);
5677         if (error) {
5678                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5679                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5680                           result);
5681                 return error;
5682         }
5683
5684         r = lkb->lkb_resource;
5685         hold_rsb(r);
5686         lock_rsb(r);
5687
5688         if (!is_process_copy(lkb)) {
5689                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5690                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691                           result);
5692                 dlm_dump_rsb(r);
5693                 unlock_rsb(r);
5694                 put_rsb(r);
5695                 dlm_put_lkb(lkb);
5696                 return -EINVAL;
5697         }
5698
5699         switch (result) {
5700         case -EBADR:
5701                 /* There's a chance the new master received our lock before
5702                    dlm_recover_master_reply(), this wouldn't happen if we did
5703                    a barrier between recover_masters and recover_locks. */
5704
5705                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5706                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707                           result);
5708         
5709                 dlm_send_rcom_lock(r, lkb, seq);
5710                 goto out;
5711         case -EEXIST:
5712         case 0:
5713                 lkb->lkb_remid = remid;
5714                 break;
5715         default:
5716                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5717                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718                           result);
5719         }
5720
5721         /* an ack for dlm_recover_locks() which waits for replies from
5722            all the locks it sends to new masters */
5723         dlm_recovered_lock(r);
5724  out:
5725         unlock_rsb(r);
5726         put_rsb(r);
5727         dlm_put_lkb(lkb);
5728
5729         return 0;
5730 }
5731
5732 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5733                      int mode, uint32_t flags, void *name, unsigned int namelen)
5734 {
5735         struct dlm_lkb *lkb;
5736         struct dlm_args args;
5737         bool do_put = true;
5738         int error;
5739
5740         dlm_lock_recovery(ls);
5741
5742         error = create_lkb(ls, &lkb);
5743         if (error) {
5744                 kfree(ua);
5745                 goto out;
5746         }
5747
5748         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5749
5750         if (flags & DLM_LKF_VALBLK) {
5751                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5752                 if (!ua->lksb.sb_lvbptr) {
5753                         kfree(ua);
5754                         error = -ENOMEM;
5755                         goto out_put;
5756                 }
5757         }
5758         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5759                               fake_bastfn, &args);
5760         if (error) {
5761                 kfree(ua->lksb.sb_lvbptr);
5762                 ua->lksb.sb_lvbptr = NULL;
5763                 kfree(ua);
5764                 goto out_put;
5765         }
5766
5767         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5768            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5769            lock and that lkb_astparam is the dlm_user_args structure. */
5770         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5771         error = request_lock(ls, lkb, name, namelen, &args);
5772
5773         switch (error) {
5774         case 0:
5775                 break;
5776         case -EINPROGRESS:
5777                 error = 0;
5778                 break;
5779         case -EAGAIN:
5780                 error = 0;
5781                 fallthrough;
5782         default:
5783                 goto out_put;
5784         }
5785
5786         /* add this new lkb to the per-process list of locks */
5787         spin_lock_bh(&ua->proc->locks_spin);
5788         hold_lkb(lkb);
5789         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5790         spin_unlock_bh(&ua->proc->locks_spin);
5791         do_put = false;
5792  out_put:
5793         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5794         if (do_put)
5795                 __put_lkb(ls, lkb);
5796  out:
5797         dlm_unlock_recovery(ls);
5798         return error;
5799 }
5800
5801 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5802                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5803 {
5804         struct dlm_lkb *lkb;
5805         struct dlm_args args;
5806         struct dlm_user_args *ua;
5807         int error;
5808
5809         dlm_lock_recovery(ls);
5810
5811         error = find_lkb(ls, lkid, &lkb);
5812         if (error)
5813                 goto out;
5814
5815         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5816
5817         /* user can change the params on its lock when it converts it, or
5818            add an lvb that didn't exist before */
5819
5820         ua = lkb->lkb_ua;
5821
5822         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5823                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5824                 if (!ua->lksb.sb_lvbptr) {
5825                         error = -ENOMEM;
5826                         goto out_put;
5827                 }
5828         }
5829         if (lvb_in && ua->lksb.sb_lvbptr)
5830                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5831
5832         ua->xid = ua_tmp->xid;
5833         ua->castparam = ua_tmp->castparam;
5834         ua->castaddr = ua_tmp->castaddr;
5835         ua->bastparam = ua_tmp->bastparam;
5836         ua->bastaddr = ua_tmp->bastaddr;
5837         ua->user_lksb = ua_tmp->user_lksb;
5838
5839         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5840                               fake_bastfn, &args);
5841         if (error)
5842                 goto out_put;
5843
5844         error = convert_lock(ls, lkb, &args);
5845
5846         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5847                 error = 0;
5848  out_put:
5849         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5850         dlm_put_lkb(lkb);
5851  out:
5852         dlm_unlock_recovery(ls);
5853         kfree(ua_tmp);
5854         return error;
5855 }
5856
5857 /*
5858  * The caller asks for an orphan lock on a given resource with a given mode.
5859  * If a matching lock exists, it's moved to the owner's list of locks and
5860  * the lkid is returned.
5861  */
5862
5863 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5864                      int mode, uint32_t flags, void *name, unsigned int namelen,
5865                      uint32_t *lkid)
5866 {
5867         struct dlm_lkb *lkb = NULL, *iter;
5868         struct dlm_user_args *ua;
5869         int found_other_mode = 0;
5870         int rv = 0;
5871
5872         spin_lock_bh(&ls->ls_orphans_lock);
5873         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5874                 if (iter->lkb_resource->res_length != namelen)
5875                         continue;
5876                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5877                         continue;
5878                 if (iter->lkb_grmode != mode) {
5879                         found_other_mode = 1;
5880                         continue;
5881                 }
5882
5883                 lkb = iter;
5884                 list_del_init(&iter->lkb_ownqueue);
5885                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5886                 *lkid = iter->lkb_id;
5887                 break;
5888         }
5889         spin_unlock_bh(&ls->ls_orphans_lock);
5890
5891         if (!lkb && found_other_mode) {
5892                 rv = -EAGAIN;
5893                 goto out;
5894         }
5895
5896         if (!lkb) {
5897                 rv = -ENOENT;
5898                 goto out;
5899         }
5900
5901         lkb->lkb_exflags = flags;
5902         lkb->lkb_ownpid = (int) current->pid;
5903
5904         ua = lkb->lkb_ua;
5905
5906         ua->proc = ua_tmp->proc;
5907         ua->xid = ua_tmp->xid;
5908         ua->castparam = ua_tmp->castparam;
5909         ua->castaddr = ua_tmp->castaddr;
5910         ua->bastparam = ua_tmp->bastparam;
5911         ua->bastaddr = ua_tmp->bastaddr;
5912         ua->user_lksb = ua_tmp->user_lksb;
5913
5914         /*
5915          * The lkb reference from the ls_orphans list was not
5916          * removed above, and is now considered the reference
5917          * for the proc locks list.
5918          */
5919
5920         spin_lock_bh(&ua->proc->locks_spin);
5921         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5922         spin_unlock_bh(&ua->proc->locks_spin);
5923  out:
5924         kfree(ua_tmp);
5925         return rv;
5926 }
5927
5928 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929                     uint32_t flags, uint32_t lkid, char *lvb_in)
5930 {
5931         struct dlm_lkb *lkb;
5932         struct dlm_args args;
5933         struct dlm_user_args *ua;
5934         int error;
5935
5936         dlm_lock_recovery(ls);
5937
5938         error = find_lkb(ls, lkid, &lkb);
5939         if (error)
5940                 goto out;
5941
5942         trace_dlm_unlock_start(ls, lkb, flags);
5943
5944         ua = lkb->lkb_ua;
5945
5946         if (lvb_in && ua->lksb.sb_lvbptr)
5947                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5948         if (ua_tmp->castparam)
5949                 ua->castparam = ua_tmp->castparam;
5950         ua->user_lksb = ua_tmp->user_lksb;
5951
5952         error = set_unlock_args(flags, ua, &args);
5953         if (error)
5954                 goto out_put;
5955
5956         error = unlock_lock(ls, lkb, &args);
5957
5958         if (error == -DLM_EUNLOCK)
5959                 error = 0;
5960         /* from validate_unlock_args() */
5961         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5962                 error = 0;
5963         if (error)
5964                 goto out_put;
5965
5966         spin_lock_bh(&ua->proc->locks_spin);
5967         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5968         if (!list_empty(&lkb->lkb_ownqueue))
5969                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5970         spin_unlock_bh(&ua->proc->locks_spin);
5971  out_put:
5972         trace_dlm_unlock_end(ls, lkb, flags, error);
5973         dlm_put_lkb(lkb);
5974  out:
5975         dlm_unlock_recovery(ls);
5976         kfree(ua_tmp);
5977         return error;
5978 }
5979
5980 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5981                     uint32_t flags, uint32_t lkid)
5982 {
5983         struct dlm_lkb *lkb;
5984         struct dlm_args args;
5985         struct dlm_user_args *ua;
5986         int error;
5987
5988         dlm_lock_recovery(ls);
5989
5990         error = find_lkb(ls, lkid, &lkb);
5991         if (error)
5992                 goto out;
5993
5994         trace_dlm_unlock_start(ls, lkb, flags);
5995
5996         ua = lkb->lkb_ua;
5997         if (ua_tmp->castparam)
5998                 ua->castparam = ua_tmp->castparam;
5999         ua->user_lksb = ua_tmp->user_lksb;
6000
6001         error = set_unlock_args(flags, ua, &args);
6002         if (error)
6003                 goto out_put;
6004
6005         error = cancel_lock(ls, lkb, &args);
6006
6007         if (error == -DLM_ECANCEL)
6008                 error = 0;
6009         /* from validate_unlock_args() */
6010         if (error == -EBUSY)
6011                 error = 0;
6012  out_put:
6013         trace_dlm_unlock_end(ls, lkb, flags, error);
6014         dlm_put_lkb(lkb);
6015  out:
6016         dlm_unlock_recovery(ls);
6017         kfree(ua_tmp);
6018         return error;
6019 }
6020
6021 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6022 {
6023         struct dlm_lkb *lkb;
6024         struct dlm_args args;
6025         struct dlm_user_args *ua;
6026         struct dlm_rsb *r;
6027         int error;
6028
6029         dlm_lock_recovery(ls);
6030
6031         error = find_lkb(ls, lkid, &lkb);
6032         if (error)
6033                 goto out;
6034
6035         trace_dlm_unlock_start(ls, lkb, flags);
6036
6037         ua = lkb->lkb_ua;
6038
6039         error = set_unlock_args(flags, ua, &args);
6040         if (error)
6041                 goto out_put;
6042
6043         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6044
6045         r = lkb->lkb_resource;
6046         hold_rsb(r);
6047         lock_rsb(r);
6048
6049         error = validate_unlock_args(lkb, &args);
6050         if (error)
6051                 goto out_r;
6052         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6053
6054         error = _cancel_lock(r, lkb);
6055  out_r:
6056         unlock_rsb(r);
6057         put_rsb(r);
6058
6059         if (error == -DLM_ECANCEL)
6060                 error = 0;
6061         /* from validate_unlock_args() */
6062         if (error == -EBUSY)
6063                 error = 0;
6064  out_put:
6065         trace_dlm_unlock_end(ls, lkb, flags, error);
6066         dlm_put_lkb(lkb);
6067  out:
6068         dlm_unlock_recovery(ls);
6069         return error;
6070 }
6071
6072 /* lkb's that are removed from the waiters list by revert are just left on the
6073    orphans list with the granted orphan locks, to be freed by purge */
6074
6075 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6076 {
6077         struct dlm_args args;
6078         int error;
6079
6080         hold_lkb(lkb); /* reference for the ls_orphans list */
6081         spin_lock_bh(&ls->ls_orphans_lock);
6082         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6083         spin_unlock_bh(&ls->ls_orphans_lock);
6084
6085         set_unlock_args(0, lkb->lkb_ua, &args);
6086
6087         error = cancel_lock(ls, lkb, &args);
6088         if (error == -DLM_ECANCEL)
6089                 error = 0;
6090         return error;
6091 }
6092
6093 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6094    granted.  Regardless of what rsb queue the lock is on, it's removed and
6095    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6096    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6097
6098 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6099 {
6100         struct dlm_args args;
6101         int error;
6102
6103         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6104                         lkb->lkb_ua, &args);
6105
6106         error = unlock_lock(ls, lkb, &args);
6107         if (error == -DLM_EUNLOCK)
6108                 error = 0;
6109         return error;
6110 }
6111
6112 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6113    (which does lock_rsb) due to deadlock with receiving a message that does
6114    lock_rsb followed by dlm_user_add_cb() */
6115
6116 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6117                                      struct dlm_user_proc *proc)
6118 {
6119         struct dlm_lkb *lkb = NULL;
6120
6121         spin_lock_bh(&ls->ls_clear_proc_locks);
6122         if (list_empty(&proc->locks))
6123                 goto out;
6124
6125         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6126         list_del_init(&lkb->lkb_ownqueue);
6127
6128         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6129                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6130         else
6131                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6132  out:
6133         spin_unlock_bh(&ls->ls_clear_proc_locks);
6134         return lkb;
6135 }
6136
6137 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6138    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6139    which we clear here. */
6140
6141 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6142    list, and no more device_writes should add lkb's to proc->locks list; so we
6143    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6144    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6145    them ourself. */
6146
6147 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6148 {
6149         struct dlm_callback *cb, *cb_safe;
6150         struct dlm_lkb *lkb, *safe;
6151
6152         dlm_lock_recovery(ls);
6153
6154         while (1) {
6155                 lkb = del_proc_lock(ls, proc);
6156                 if (!lkb)
6157                         break;
6158                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6159                         orphan_proc_lock(ls, lkb);
6160                 else
6161                         unlock_proc_lock(ls, lkb);
6162
6163                 /* this removes the reference for the proc->locks list
6164                    added by dlm_user_request, it may result in the lkb
6165                    being freed */
6166
6167                 dlm_put_lkb(lkb);
6168         }
6169
6170         spin_lock_bh(&ls->ls_clear_proc_locks);
6171
6172         /* in-progress unlocks */
6173         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6174                 list_del_init(&lkb->lkb_ownqueue);
6175                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6176                 dlm_put_lkb(lkb);
6177         }
6178
6179         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6180                 list_del(&cb->list);
6181                 dlm_free_cb(cb);
6182         }
6183
6184         spin_unlock_bh(&ls->ls_clear_proc_locks);
6185         dlm_unlock_recovery(ls);
6186 }
6187
6188 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6189 {
6190         struct dlm_callback *cb, *cb_safe;
6191         struct dlm_lkb *lkb, *safe;
6192
6193         while (1) {
6194                 lkb = NULL;
6195                 spin_lock_bh(&proc->locks_spin);
6196                 if (!list_empty(&proc->locks)) {
6197                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6198                                          lkb_ownqueue);
6199                         list_del_init(&lkb->lkb_ownqueue);
6200                 }
6201                 spin_unlock_bh(&proc->locks_spin);
6202
6203                 if (!lkb)
6204                         break;
6205
6206                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6207                 unlock_proc_lock(ls, lkb);
6208                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6209         }
6210
6211         spin_lock_bh(&proc->locks_spin);
6212         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213                 list_del_init(&lkb->lkb_ownqueue);
6214                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6215                 dlm_put_lkb(lkb);
6216         }
6217         spin_unlock_bh(&proc->locks_spin);
6218
6219         spin_lock_bh(&proc->asts_spin);
6220         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6221                 list_del(&cb->list);
6222                 dlm_free_cb(cb);
6223         }
6224         spin_unlock_bh(&proc->asts_spin);
6225 }
6226
6227 /* pid of 0 means purge all orphans */
6228
6229 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6230 {
6231         struct dlm_lkb *lkb, *safe;
6232
6233         spin_lock_bh(&ls->ls_orphans_lock);
6234         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6235                 if (pid && lkb->lkb_ownpid != pid)
6236                         continue;
6237                 unlock_proc_lock(ls, lkb);
6238                 list_del_init(&lkb->lkb_ownqueue);
6239                 dlm_put_lkb(lkb);
6240         }
6241         spin_unlock_bh(&ls->ls_orphans_lock);
6242 }
6243
6244 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6245 {
6246         struct dlm_message *ms;
6247         struct dlm_mhandle *mh;
6248         int error;
6249
6250         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6251                                 DLM_MSG_PURGE, &ms, &mh);
6252         if (error)
6253                 return error;
6254         ms->m_nodeid = cpu_to_le32(nodeid);
6255         ms->m_pid = cpu_to_le32(pid);
6256
6257         return send_message(mh, ms, NULL, 0);
6258 }
6259
6260 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6261                    int nodeid, int pid)
6262 {
6263         int error = 0;
6264
6265         if (nodeid && (nodeid != dlm_our_nodeid())) {
6266                 error = send_purge(ls, nodeid, pid);
6267         } else {
6268                 dlm_lock_recovery(ls);
6269                 if (pid == current->pid)
6270                         purge_proc_locks(ls, proc);
6271                 else
6272                         do_purge(ls, nodeid, pid);
6273                 dlm_unlock_recovery(ls);
6274         }
6275         return error;
6276 }
6277
6278 /* debug functionality */
6279 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6280                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6281 {
6282         struct dlm_lksb *lksb;
6283         struct dlm_lkb *lkb;
6284         struct dlm_rsb *r;
6285         int error;
6286
6287         /* we currently can't set a valid user lock */
6288         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6289                 return -EOPNOTSUPP;
6290
6291         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6292         if (!lksb)
6293                 return -ENOMEM;
6294
6295         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6296         if (error) {
6297                 kfree(lksb);
6298                 return error;
6299         }
6300
6301         dlm_set_dflags_val(lkb, lkb_dflags);
6302         lkb->lkb_nodeid = lkb_nodeid;
6303         lkb->lkb_lksb = lksb;
6304         /* user specific pointer, just don't have it NULL for kernel locks */
6305         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6306                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6307
6308         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6309         if (error) {
6310                 kfree(lksb);
6311                 __put_lkb(ls, lkb);
6312                 return error;
6313         }
6314
6315         lock_rsb(r);
6316         attach_lkb(r, lkb);
6317         add_lkb(r, lkb, lkb_status);
6318         unlock_rsb(r);
6319         put_rsb(r);
6320
6321         return 0;
6322 }
6323
6324 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6325                                  int mstype, int to_nodeid)
6326 {
6327         struct dlm_lkb *lkb;
6328         int error;
6329
6330         error = find_lkb(ls, lkb_id, &lkb);
6331         if (error)
6332                 return error;
6333
6334         add_to_waiters(lkb, mstype, to_nodeid);
6335         dlm_put_lkb(lkb);
6336         return 0;
6337 }
6338
This page took 0.37713 seconds and 4 git commands to generate.