]> Git Repo - linux.git/blob - fs/dlm/lock.c
kconfig: recursive checks drop file/lineno
[linux.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325         return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333         /* rsbs in toss state never get referenced */
334         WARN_ON(rsb_flag(r, RSB_TOSS));
335         kref_get(&r->res_ref);
336 }
337
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340         hold_rsb(r);
341 }
342
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348         if (refcount_dec_not_one(r))
349                 return false;
350
351         write_lock_bh(lock);
352         if (!refcount_dec_and_test(r)) {
353                 write_unlock_bh(lock);
354                 return false;
355         }
356
357         return true;
358 }
359
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362                                              void (*release)(struct kref *kref),
363                                              rwlock_t *lock)
364 {
365         if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366                 release(kref);
367                 return 1;
368         }
369
370         return 0;
371 }
372
373 /* When all references to the rsb are gone it's transferred to
374    the tossed list for later disposal. */
375
376 static void put_rsb(struct dlm_rsb *r)
377 {
378         struct dlm_ls *ls = r->res_ls;
379         int rv;
380
381         rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
382                                         &ls->ls_rsbtbl_lock);
383         if (rv)
384                 write_unlock_bh(&ls->ls_rsbtbl_lock);
385 }
386
387 void dlm_put_rsb(struct dlm_rsb *r)
388 {
389         put_rsb(r);
390 }
391
392 static int pre_rsb_struct(struct dlm_ls *ls)
393 {
394         struct dlm_rsb *r1, *r2;
395         int count = 0;
396
397         spin_lock_bh(&ls->ls_new_rsb_spin);
398         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
399                 spin_unlock_bh(&ls->ls_new_rsb_spin);
400                 return 0;
401         }
402         spin_unlock_bh(&ls->ls_new_rsb_spin);
403
404         r1 = dlm_allocate_rsb(ls);
405         r2 = dlm_allocate_rsb(ls);
406
407         spin_lock_bh(&ls->ls_new_rsb_spin);
408         if (r1) {
409                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
410                 ls->ls_new_rsb_count++;
411         }
412         if (r2) {
413                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
414                 ls->ls_new_rsb_count++;
415         }
416         count = ls->ls_new_rsb_count;
417         spin_unlock_bh(&ls->ls_new_rsb_spin);
418
419         if (!count)
420                 return -ENOMEM;
421         return 0;
422 }
423
424 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
425  * new timers when recovery is triggered and don't run them
426  * again until a dlm_timer_resume() tries it again.
427  */
428 static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
429 {
430         if (!dlm_locking_stopped(ls))
431                 mod_timer(&ls->ls_timer, jiffies);
432 }
433
434 /* This function tries to resume the timer callback if a rsb
435  * is on the toss list and no timer is pending. It might that
436  * the first entry is on currently executed as timer callback
437  * but we don't care if a timer queued up again and does
438  * nothing. Should be a rare case.
439  */
440 void dlm_timer_resume(struct dlm_ls *ls)
441 {
442         struct dlm_rsb *r;
443
444         spin_lock_bh(&ls->ls_toss_q_lock);
445         r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
446                                      res_toss_q_list);
447         if (r && !timer_pending(&ls->ls_timer))
448                 __rsb_mod_timer(ls, r->res_toss_time);
449         spin_unlock_bh(&ls->ls_toss_q_lock);
450 }
451
452 /* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
453 static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
454 {
455         struct dlm_rsb *first;
456
457         spin_lock_bh(&ls->ls_toss_q_lock);
458         r->res_toss_time = 0;
459
460         /* if the rsb is not queued do nothing */
461         if (list_empty(&r->res_toss_q_list))
462                 goto out;
463
464         /* get the first element before delete */
465         first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
466                                  res_toss_q_list);
467         list_del_init(&r->res_toss_q_list);
468         /* check if the first element was the rsb we deleted */
469         if (first == r) {
470                 /* try to get the new first element, if the list
471                  * is empty now try to delete the timer, if we are
472                  * too late we don't care.
473                  *
474                  * if the list isn't empty and a new first element got
475                  * in place, set the new timer expire time.
476                  */
477                 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
478                                                  res_toss_q_list);
479                 if (!first)
480                         timer_delete(&ls->ls_timer);
481                 else
482                         __rsb_mod_timer(ls, first->res_toss_time);
483         }
484
485 out:
486         spin_unlock_bh(&ls->ls_toss_q_lock);
487 }
488
489 /* Caller must held ls_rsbtbl_lock and need to be called every time
490  * when either the rsb enters toss state or the toss state changes
491  * the dir/master nodeid.
492  */
493 static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
494 {
495         int our_nodeid = dlm_our_nodeid();
496         struct dlm_rsb *first;
497
498         /* If we're the directory record for this rsb, and
499          * we're not the master of it, then we need to wait
500          * for the master node to send us a dir remove for
501          * before removing the dir record.
502          */
503         if (!dlm_no_directory(ls) &&
504             (r->res_master_nodeid != our_nodeid) &&
505             (dlm_dir_nodeid(r) == our_nodeid)) {
506                 rsb_delete_toss_timer(ls, r);
507                 return;
508         }
509
510         spin_lock_bh(&ls->ls_toss_q_lock);
511         /* set the new rsb absolute expire time in the rsb */
512         r->res_toss_time = rsb_toss_jiffies();
513         if (list_empty(&ls->ls_toss_q)) {
514                 /* if the queue is empty add the element and it's
515                  * our new expire time
516                  */
517                 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
518                 __rsb_mod_timer(ls, r->res_toss_time);
519         } else {
520                 /* check if the rsb was already queued, if so delete
521                  * it from the toss queue
522                  */
523                 if (!list_empty(&r->res_toss_q_list))
524                         list_del(&r->res_toss_q_list);
525
526                 /* try to get the maybe new first element and then add
527                  * to this rsb with the oldest expire time to the end
528                  * of the queue. If the list was empty before this
529                  * rsb expire time is our next expiration if it wasn't
530                  * the now new first elemet is our new expiration time
531                  */
532                 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
533                                                  res_toss_q_list);
534                 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
535                 if (!first)
536                         __rsb_mod_timer(ls, r->res_toss_time);
537                 else
538                         __rsb_mod_timer(ls, first->res_toss_time);
539         }
540         spin_unlock_bh(&ls->ls_toss_q_lock);
541 }
542
543 /* if we hit contention we do in 250 ms a retry to trylock.
544  * if there is any other mod_timer in between we don't care
545  * about that it expires earlier again this is only for the
546  * unlikely case nothing happened in this time.
547  */
548 #define DLM_TOSS_TIMER_RETRY    (jiffies + msecs_to_jiffies(250))
549
550 void dlm_rsb_toss_timer(struct timer_list *timer)
551 {
552         struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
553         int our_nodeid = dlm_our_nodeid();
554         struct dlm_rsb *r;
555         int rv;
556
557         while (1) {
558                 /* interrupting point to leave iteration when
559                  * recovery waits for timer_delete_sync(), recovery
560                  * will take care to delete everything in toss queue.
561                  */
562                 if (dlm_locking_stopped(ls))
563                         break;
564
565                 rv = spin_trylock(&ls->ls_toss_q_lock);
566                 if (!rv) {
567                         /* rearm again try timer */
568                         __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
569                         break;
570                 }
571
572                 r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
573                                              res_toss_q_list);
574                 if (!r) {
575                         /* nothing to do anymore next rsb queue will
576                          * set next mod_timer() expire.
577                          */
578                         spin_unlock(&ls->ls_toss_q_lock);
579                         break;
580                 }
581
582                 /* test if the first rsb isn't expired yet, if
583                  * so we stop freeing rsb from toss queue as
584                  * the order in queue is ascending to the
585                  * absolute res_toss_time jiffies
586                  */
587                 if (time_before(jiffies, r->res_toss_time)) {
588                         /* rearm with the next rsb to expire in the future */
589                         __rsb_mod_timer(ls, r->res_toss_time);
590                         spin_unlock(&ls->ls_toss_q_lock);
591                         break;
592                 }
593
594                 /* in find_rsb_dir/nodir there is a reverse order of this
595                  * lock, however this is only a trylock if we hit some
596                  * possible contention we try it again.
597                  *
598                  * This lock synchronized while holding ls_toss_q_lock
599                  * synchronize everything that rsb_delete_toss_timer()
600                  * or rsb_mod_timer() can't run after this timer callback
601                  * deletes the rsb from the ls_toss_q. Whereas the other
602                  * holders have always a priority to run as this is only
603                  * a caching handling and the other holders might to put
604                  * this rsb out of the toss state.
605                  */
606                 rv = write_trylock(&ls->ls_rsbtbl_lock);
607                 if (!rv) {
608                         spin_unlock(&ls->ls_toss_q_lock);
609                         /* rearm again try timer */
610                         __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
611                         break;
612                 }
613
614                 list_del(&r->res_rsbs_list);
615                 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
616                                        dlm_rhash_rsb_params);
617
618                 /* not necessary to held the ls_rsbtbl_lock when
619                  * calling send_remove()
620                  */
621                 write_unlock(&ls->ls_rsbtbl_lock);
622
623                 /* remove the rsb out of the toss queue its gone
624                  * drom DLM now
625                  */
626                 list_del_init(&r->res_toss_q_list);
627                 spin_unlock(&ls->ls_toss_q_lock);
628
629                 /* no rsb in this state should ever run a timer */
630                 WARN_ON(!dlm_no_directory(ls) &&
631                         (r->res_master_nodeid != our_nodeid) &&
632                         (dlm_dir_nodeid(r) == our_nodeid));
633
634                 /* We're the master of this rsb but we're not
635                  * the directory record, so we need to tell the
636                  * dir node to remove the dir record
637                  */
638                 if (!dlm_no_directory(ls) &&
639                     (r->res_master_nodeid == our_nodeid) &&
640                     (dlm_dir_nodeid(r) != our_nodeid))
641                         send_remove(r);
642
643                 free_toss_rsb(r);
644         }
645 }
646
647 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
648    unlock any spinlocks, go back and call pre_rsb_struct again.
649    Otherwise, take an rsb off the list and return it. */
650
651 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
652                           struct dlm_rsb **r_ret)
653 {
654         struct dlm_rsb *r;
655         int count;
656
657         spin_lock_bh(&ls->ls_new_rsb_spin);
658         if (list_empty(&ls->ls_new_rsb)) {
659                 count = ls->ls_new_rsb_count;
660                 spin_unlock_bh(&ls->ls_new_rsb_spin);
661                 log_debug(ls, "find_rsb retry %d %d %s",
662                           count, dlm_config.ci_new_rsb_count,
663                           (const char *)name);
664                 return -EAGAIN;
665         }
666
667         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
668         list_del(&r->res_hashchain);
669         ls->ls_new_rsb_count--;
670         spin_unlock_bh(&ls->ls_new_rsb_spin);
671
672         r->res_ls = ls;
673         r->res_length = len;
674         memcpy(r->res_name, name, len);
675         spin_lock_init(&r->res_lock);
676
677         INIT_LIST_HEAD(&r->res_lookup);
678         INIT_LIST_HEAD(&r->res_grantqueue);
679         INIT_LIST_HEAD(&r->res_convertqueue);
680         INIT_LIST_HEAD(&r->res_waitqueue);
681         INIT_LIST_HEAD(&r->res_root_list);
682         INIT_LIST_HEAD(&r->res_toss_q_list);
683         INIT_LIST_HEAD(&r->res_recover_list);
684         INIT_LIST_HEAD(&r->res_masters_list);
685
686         *r_ret = r;
687         return 0;
688 }
689
690 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
691                         struct dlm_rsb **r_ret)
692 {
693         char key[DLM_RESNAME_MAXLEN] = {};
694
695         memcpy(key, name, len);
696         *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
697         if (*r_ret)
698                 return 0;
699
700         return -EBADR;
701 }
702
703 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
704 {
705         return rhashtable_insert_fast(rhash, &rsb->res_node,
706                                       dlm_rhash_rsb_params);
707 }
708
709 /*
710  * Find rsb in rsbtbl and potentially create/add one
711  *
712  * Delaying the release of rsb's has a similar benefit to applications keeping
713  * NL locks on an rsb, but without the guarantee that the cached master value
714  * will still be valid when the rsb is reused.  Apps aren't always smart enough
715  * to keep NL locks on an rsb that they may lock again shortly; this can lead
716  * to excessive master lookups and removals if we don't delay the release.
717  *
718  * Searching for an rsb means looking through both the normal list and toss
719  * list.  When found on the toss list the rsb is moved to the normal list with
720  * ref count of 1; when found on normal list the ref count is incremented.
721  *
722  * rsb's on the keep list are being used locally and refcounted.
723  * rsb's on the toss list are not being used locally, and are not refcounted.
724  *
725  * The toss list rsb's were either
726  * - previously used locally but not any more (were on keep list, then
727  *   moved to toss list when last refcount dropped)
728  * - created and put on toss list as a directory record for a lookup
729  *   (we are the dir node for the res, but are not using the res right now,
730  *   but some other node is)
731  *
732  * The purpose of find_rsb() is to return a refcounted rsb for local use.
733  * So, if the given rsb is on the toss list, it is moved to the keep list
734  * before being returned.
735  *
736  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
737  * more refcounts exist, so the rsb is moved from the keep list to the
738  * toss list.
739  *
740  * rsb's on both keep and toss lists are used for doing a name to master
741  * lookups.  rsb's that are in use locally (and being refcounted) are on
742  * the keep list, rsb's that are not in use locally (not refcounted) and
743  * only exist for name/master lookups are on the toss list.
744  *
745  * rsb's on the toss list who's dir_nodeid is not local can have stale
746  * name/master mappings.  So, remote requests on such rsb's can potentially
747  * return with an error, which means the mapping is stale and needs to
748  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
749  * first_lkid is to keep only a single outstanding request on an rsb
750  * while that rsb has a potentially stale master.)
751  */
752
753 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
754                         uint32_t hash, int dir_nodeid, int from_nodeid,
755                         unsigned int flags, struct dlm_rsb **r_ret)
756 {
757         struct dlm_rsb *r = NULL;
758         int our_nodeid = dlm_our_nodeid();
759         int from_local = 0;
760         int from_other = 0;
761         int from_dir = 0;
762         int create = 0;
763         int error;
764
765         if (flags & R_RECEIVE_REQUEST) {
766                 if (from_nodeid == dir_nodeid)
767                         from_dir = 1;
768                 else
769                         from_other = 1;
770         } else if (flags & R_REQUEST) {
771                 from_local = 1;
772         }
773
774         /*
775          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
776          * from_nodeid has sent us a lock in dlm_recover_locks, believing
777          * we're the new master.  Our local recovery may not have set
778          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
779          * create the rsb; dlm_recover_process_copy() will handle EBADR
780          * by resending.
781          *
782          * If someone sends us a request, we are the dir node, and we do
783          * not find the rsb anywhere, then recreate it.  This happens if
784          * someone sends us a request after we have removed/freed an rsb
785          * from our toss list.  (They sent a request instead of lookup
786          * because they are using an rsb from their toss list.)
787          */
788
789         if (from_local || from_dir ||
790             (from_other && (dir_nodeid == our_nodeid))) {
791                 create = 1;
792         }
793
794  retry:
795         if (create) {
796                 error = pre_rsb_struct(ls);
797                 if (error < 0)
798                         goto out;
799         }
800
801  retry_lookup:
802
803         /* check if the rsb is in keep state under read lock - likely path */
804         read_lock_bh(&ls->ls_rsbtbl_lock);
805         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
806         if (error) {
807                 read_unlock_bh(&ls->ls_rsbtbl_lock);
808                 goto do_new;
809         }
810         
811         /*
812          * rsb is active, so we can't check master_nodeid without lock_rsb.
813          */
814
815         if (rsb_flag(r, RSB_TOSS)) {
816                 read_unlock_bh(&ls->ls_rsbtbl_lock);
817                 goto do_toss;
818         }
819
820         kref_get(&r->res_ref);
821         read_unlock_bh(&ls->ls_rsbtbl_lock);
822         goto out;
823
824
825  do_toss:
826         write_lock_bh(&ls->ls_rsbtbl_lock);
827
828         /* retry lookup under write lock to see if its still in toss state
829          * if not it's in keep state and we relookup - unlikely path.
830          */
831         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
832         if (!error) {
833                 if (!rsb_flag(r, RSB_TOSS)) {
834                         write_unlock_bh(&ls->ls_rsbtbl_lock);
835                         goto retry_lookup;
836                 }
837         } else {
838                 write_unlock_bh(&ls->ls_rsbtbl_lock);
839                 goto do_new;
840         }
841
842         /*
843          * rsb found inactive (master_nodeid may be out of date unless
844          * we are the dir_nodeid or were the master)  No other thread
845          * is using this rsb because it's on the toss list, so we can
846          * look at or update res_master_nodeid without lock_rsb.
847          */
848
849         if ((r->res_master_nodeid != our_nodeid) && from_other) {
850                 /* our rsb was not master, and another node (not the dir node)
851                    has sent us a request */
852                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
853                           from_nodeid, r->res_master_nodeid, dir_nodeid,
854                           r->res_name);
855                 write_unlock_bh(&ls->ls_rsbtbl_lock);
856                 error = -ENOTBLK;
857                 goto out;
858         }
859
860         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
861                 /* don't think this should ever happen */
862                 log_error(ls, "find_rsb toss from_dir %d master %d",
863                           from_nodeid, r->res_master_nodeid);
864                 dlm_print_rsb(r);
865                 /* fix it and go on */
866                 r->res_master_nodeid = our_nodeid;
867                 r->res_nodeid = 0;
868                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
869                 r->res_first_lkid = 0;
870         }
871
872         if (from_local && (r->res_master_nodeid != our_nodeid)) {
873                 /* Because we have held no locks on this rsb,
874                    res_master_nodeid could have become stale. */
875                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
876                 r->res_first_lkid = 0;
877         }
878
879         list_move(&r->res_rsbs_list, &ls->ls_keep);
880         rsb_clear_flag(r, RSB_TOSS);
881         /* rsb got out of toss state, it becomes alive again
882          * and we reinit the reference counter that is only
883          * valid for keep state rsbs
884          */
885         kref_init(&r->res_ref);
886         rsb_delete_toss_timer(ls, r);
887         write_unlock_bh(&ls->ls_rsbtbl_lock);
888
889         goto out;
890
891
892  do_new:
893         /*
894          * rsb not found
895          */
896
897         if (error == -EBADR && !create)
898                 goto out;
899
900         error = get_rsb_struct(ls, name, len, &r);
901         if (error == -EAGAIN)
902                 goto retry;
903         if (error)
904                 goto out;
905
906         r->res_hash = hash;
907         r->res_dir_nodeid = dir_nodeid;
908         kref_init(&r->res_ref);
909
910         if (from_dir) {
911                 /* want to see how often this happens */
912                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
913                           from_nodeid, r->res_name);
914                 r->res_master_nodeid = our_nodeid;
915                 r->res_nodeid = 0;
916                 goto out_add;
917         }
918
919         if (from_other && (dir_nodeid != our_nodeid)) {
920                 /* should never happen */
921                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
922                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
923                 dlm_free_rsb(r);
924                 r = NULL;
925                 error = -ENOTBLK;
926                 goto out;
927         }
928
929         if (from_other) {
930                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
931                           from_nodeid, dir_nodeid, r->res_name);
932         }
933
934         if (dir_nodeid == our_nodeid) {
935                 /* When we are the dir nodeid, we can set the master
936                    node immediately */
937                 r->res_master_nodeid = our_nodeid;
938                 r->res_nodeid = 0;
939         } else {
940                 /* set_master will send_lookup to dir_nodeid */
941                 r->res_master_nodeid = 0;
942                 r->res_nodeid = -1;
943         }
944
945  out_add:
946
947         write_lock_bh(&ls->ls_rsbtbl_lock);
948         error = rsb_insert(r, &ls->ls_rsbtbl);
949         if (error == -EEXIST) {
950                 /* somebody else was faster and it seems the
951                  * rsb exists now, we do a whole relookup
952                  */
953                 write_unlock_bh(&ls->ls_rsbtbl_lock);
954                 dlm_free_rsb(r);
955                 goto retry_lookup;
956         } else if (!error) {
957                 list_add(&r->res_rsbs_list, &ls->ls_keep);
958         }
959         write_unlock_bh(&ls->ls_rsbtbl_lock);
960  out:
961         *r_ret = r;
962         return error;
963 }
964
965 /* During recovery, other nodes can send us new MSTCPY locks (from
966    dlm_recover_locks) before we've made ourself master (in
967    dlm_recover_masters). */
968
969 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
970                           uint32_t hash, int dir_nodeid, int from_nodeid,
971                           unsigned int flags, struct dlm_rsb **r_ret)
972 {
973         struct dlm_rsb *r = NULL;
974         int our_nodeid = dlm_our_nodeid();
975         int recover = (flags & R_RECEIVE_RECOVER);
976         int error;
977
978  retry:
979         error = pre_rsb_struct(ls);
980         if (error < 0)
981                 goto out;
982
983  retry_lookup:
984
985         /* check if the rsb is in keep state under read lock - likely path */
986         read_lock_bh(&ls->ls_rsbtbl_lock);
987         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
988         if (error) {
989                 read_unlock_bh(&ls->ls_rsbtbl_lock);
990                 goto do_new;
991         }
992
993         if (rsb_flag(r, RSB_TOSS)) {
994                 read_unlock_bh(&ls->ls_rsbtbl_lock);
995                 goto do_toss;
996         }
997
998         /*
999          * rsb is active, so we can't check master_nodeid without lock_rsb.
1000          */
1001
1002         kref_get(&r->res_ref);
1003         read_unlock_bh(&ls->ls_rsbtbl_lock);
1004
1005         goto out;
1006
1007
1008  do_toss:
1009         write_lock_bh(&ls->ls_rsbtbl_lock);
1010
1011         /* retry lookup under write lock to see if its still in toss state
1012          * if not it's in keep state and we relookup - unlikely path.
1013          */
1014         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1015         if (!error) {
1016                 if (!rsb_flag(r, RSB_TOSS)) {
1017                         write_unlock_bh(&ls->ls_rsbtbl_lock);
1018                         goto retry_lookup;
1019                 }
1020         } else {
1021                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1022                 goto do_new;
1023         }
1024
1025
1026         /*
1027          * rsb found inactive. No other thread is using this rsb because
1028          * it's on the toss list, so we can look at or update
1029          * res_master_nodeid without lock_rsb.
1030          */
1031
1032         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
1033                 /* our rsb is not master, and another node has sent us a
1034                    request; this should never happen */
1035                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
1036                           from_nodeid, r->res_master_nodeid, dir_nodeid);
1037                 dlm_print_rsb(r);
1038                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1039                 error = -ENOTBLK;
1040                 goto out;
1041         }
1042
1043         if (!recover && (r->res_master_nodeid != our_nodeid) &&
1044             (dir_nodeid == our_nodeid)) {
1045                 /* our rsb is not master, and we are dir; may as well fix it;
1046                    this should never happen */
1047                 log_error(ls, "find_rsb toss our %d master %d dir %d",
1048                           our_nodeid, r->res_master_nodeid, dir_nodeid);
1049                 dlm_print_rsb(r);
1050                 r->res_master_nodeid = our_nodeid;
1051                 r->res_nodeid = 0;
1052         }
1053
1054         list_move(&r->res_rsbs_list, &ls->ls_keep);
1055         rsb_clear_flag(r, RSB_TOSS);
1056         /* rsb got out of toss state, it becomes alive again
1057          * and we reinit the reference counter that is only
1058          * valid for keep state rsbs
1059          */
1060         kref_init(&r->res_ref);
1061         rsb_delete_toss_timer(ls, r);
1062         write_unlock_bh(&ls->ls_rsbtbl_lock);
1063
1064         goto out;
1065
1066
1067  do_new:
1068         /*
1069          * rsb not found
1070          */
1071
1072         error = get_rsb_struct(ls, name, len, &r);
1073         if (error == -EAGAIN) {
1074                 goto retry;
1075         }
1076         if (error)
1077                 goto out;
1078
1079         r->res_hash = hash;
1080         r->res_dir_nodeid = dir_nodeid;
1081         r->res_master_nodeid = dir_nodeid;
1082         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1083         kref_init(&r->res_ref);
1084
1085         write_lock_bh(&ls->ls_rsbtbl_lock);
1086         error = rsb_insert(r, &ls->ls_rsbtbl);
1087         if (error == -EEXIST) {
1088                 /* somebody else was faster and it seems the
1089                  * rsb exists now, we do a whole relookup
1090                  */
1091                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1092                 dlm_free_rsb(r);
1093                 goto retry_lookup;
1094         } else if (!error) {
1095                 list_add(&r->res_rsbs_list, &ls->ls_keep);
1096         }
1097         write_unlock_bh(&ls->ls_rsbtbl_lock);
1098
1099  out:
1100         *r_ret = r;
1101         return error;
1102 }
1103
1104 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1105                     int from_nodeid, unsigned int flags,
1106                     struct dlm_rsb **r_ret)
1107 {
1108         int dir_nodeid;
1109         uint32_t hash;
1110
1111         if (len > DLM_RESNAME_MAXLEN)
1112                 return -EINVAL;
1113
1114         hash = jhash(name, len, 0);
1115         dir_nodeid = dlm_hash2nodeid(ls, hash);
1116
1117         if (dlm_no_directory(ls))
1118                 return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1119                                       from_nodeid, flags, r_ret);
1120         else
1121                 return find_rsb_dir(ls, name, len, hash, dir_nodeid,
1122                                     from_nodeid, flags, r_ret);
1123 }
1124
1125 /* we have received a request and found that res_master_nodeid != our_nodeid,
1126    so we need to return an error or make ourself the master */
1127
1128 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1129                                   int from_nodeid)
1130 {
1131         if (dlm_no_directory(ls)) {
1132                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1133                           from_nodeid, r->res_master_nodeid,
1134                           r->res_dir_nodeid);
1135                 dlm_print_rsb(r);
1136                 return -ENOTBLK;
1137         }
1138
1139         if (from_nodeid != r->res_dir_nodeid) {
1140                 /* our rsb is not master, and another node (not the dir node)
1141                    has sent us a request.  this is much more common when our
1142                    master_nodeid is zero, so limit debug to non-zero.  */
1143
1144                 if (r->res_master_nodeid) {
1145                         log_debug(ls, "validate master from_other %d master %d "
1146                                   "dir %d first %x %s", from_nodeid,
1147                                   r->res_master_nodeid, r->res_dir_nodeid,
1148                                   r->res_first_lkid, r->res_name);
1149                 }
1150                 return -ENOTBLK;
1151         } else {
1152                 /* our rsb is not master, but the dir nodeid has sent us a
1153                    request; this could happen with master 0 / res_nodeid -1 */
1154
1155                 if (r->res_master_nodeid) {
1156                         log_error(ls, "validate master from_dir %d master %d "
1157                                   "first %x %s",
1158                                   from_nodeid, r->res_master_nodeid,
1159                                   r->res_first_lkid, r->res_name);
1160                 }
1161
1162                 r->res_master_nodeid = dlm_our_nodeid();
1163                 r->res_nodeid = 0;
1164                 return 0;
1165         }
1166 }
1167
1168 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1169                                 int from_nodeid, bool toss_list, unsigned int flags,
1170                                 int *r_nodeid, int *result)
1171 {
1172         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1173         int from_master = (flags & DLM_LU_RECOVER_DIR);
1174
1175         if (r->res_dir_nodeid != our_nodeid) {
1176                 /* should not happen, but may as well fix it and carry on */
1177                 log_error(ls, "%s res_dir %d our %d %s", __func__,
1178                           r->res_dir_nodeid, our_nodeid, r->res_name);
1179                 r->res_dir_nodeid = our_nodeid;
1180         }
1181
1182         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
1183                 /* Recovery uses this function to set a new master when
1184                  * the previous master failed.  Setting NEW_MASTER will
1185                  * force dlm_recover_masters to call recover_master on this
1186                  * rsb even though the res_nodeid is no longer removed.
1187                  */
1188
1189                 r->res_master_nodeid = from_nodeid;
1190                 r->res_nodeid = from_nodeid;
1191                 rsb_set_flag(r, RSB_NEW_MASTER);
1192
1193                 if (toss_list) {
1194                         /* I don't think we should ever find it on toss list. */
1195                         log_error(ls, "%s fix_master on toss", __func__);
1196                         dlm_dump_rsb(r);
1197                 }
1198         }
1199
1200         if (from_master && (r->res_master_nodeid != from_nodeid)) {
1201                 /* this will happen if from_nodeid became master during
1202                  * a previous recovery cycle, and we aborted the previous
1203                  * cycle before recovering this master value
1204                  */
1205
1206                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1207                           __func__, from_nodeid, r->res_master_nodeid,
1208                           r->res_nodeid, r->res_first_lkid, r->res_name);
1209
1210                 if (r->res_master_nodeid == our_nodeid) {
1211                         log_error(ls, "from_master %d our_master", from_nodeid);
1212                         dlm_dump_rsb(r);
1213                         goto ret_assign;
1214                 }
1215
1216                 r->res_master_nodeid = from_nodeid;
1217                 r->res_nodeid = from_nodeid;
1218                 rsb_set_flag(r, RSB_NEW_MASTER);
1219         }
1220
1221         if (!r->res_master_nodeid) {
1222                 /* this will happen if recovery happens while we're looking
1223                  * up the master for this rsb
1224                  */
1225
1226                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1227                           from_nodeid, r->res_first_lkid, r->res_name);
1228                 r->res_master_nodeid = from_nodeid;
1229                 r->res_nodeid = from_nodeid;
1230         }
1231
1232         if (!from_master && !fix_master &&
1233             (r->res_master_nodeid == from_nodeid)) {
1234                 /* this can happen when the master sends remove, the dir node
1235                  * finds the rsb on the keep list and ignores the remove,
1236                  * and the former master sends a lookup
1237                  */
1238
1239                 log_limit(ls, "%s from master %d flags %x first %x %s",
1240                           __func__, from_nodeid, flags, r->res_first_lkid,
1241                           r->res_name);
1242         }
1243
1244  ret_assign:
1245         *r_nodeid = r->res_master_nodeid;
1246         if (result)
1247                 *result = DLM_LU_MATCH;
1248 }
1249
1250 /*
1251  * We're the dir node for this res and another node wants to know the
1252  * master nodeid.  During normal operation (non recovery) this is only
1253  * called from receive_lookup(); master lookups when the local node is
1254  * the dir node are done by find_rsb().
1255  *
1256  * normal operation, we are the dir node for a resource
1257  * . _request_lock
1258  * . set_master
1259  * . send_lookup
1260  * . receive_lookup
1261  * . dlm_master_lookup flags 0
1262  *
1263  * recover directory, we are rebuilding dir for all resources
1264  * . dlm_recover_directory
1265  * . dlm_rcom_names
1266  *   remote node sends back the rsb names it is master of and we are dir of
1267  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1268  *   we either create new rsb setting remote node as master, or find existing
1269  *   rsb and set master to be the remote node.
1270  *
1271  * recover masters, we are finding the new master for resources
1272  * . dlm_recover_masters
1273  * . recover_master
1274  * . dlm_send_rcom_lookup
1275  * . receive_rcom_lookup
1276  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1277  */
1278
1279 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1280                       int len, unsigned int flags, int *r_nodeid, int *result)
1281 {
1282         struct dlm_rsb *r = NULL;
1283         uint32_t hash;
1284         int our_nodeid = dlm_our_nodeid();
1285         int dir_nodeid, error;
1286
1287         if (len > DLM_RESNAME_MAXLEN)
1288                 return -EINVAL;
1289
1290         if (from_nodeid == our_nodeid) {
1291                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1292                           our_nodeid, flags);
1293                 return -EINVAL;
1294         }
1295
1296         hash = jhash(name, len, 0);
1297         dir_nodeid = dlm_hash2nodeid(ls, hash);
1298         if (dir_nodeid != our_nodeid) {
1299                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1300                           from_nodeid, dir_nodeid, our_nodeid, hash,
1301                           ls->ls_num_nodes);
1302                 *r_nodeid = -1;
1303                 return -EINVAL;
1304         }
1305
1306  retry:
1307         error = pre_rsb_struct(ls);
1308         if (error < 0)
1309                 return error;
1310
1311  retry_lookup:
1312
1313         /* check if the rsb is in keep state under read lock - likely path */
1314         read_lock_bh(&ls->ls_rsbtbl_lock);
1315         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1316         if (!error) {
1317                 if (rsb_flag(r, RSB_TOSS)) {
1318                         read_unlock_bh(&ls->ls_rsbtbl_lock);
1319                         goto do_toss;
1320                 }
1321
1322                 /* because the rsb is active, we need to lock_rsb before
1323                  * checking/changing re_master_nodeid
1324                  */
1325
1326                 hold_rsb(r);
1327                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1328                 lock_rsb(r);
1329
1330                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1331                                     flags, r_nodeid, result);
1332
1333                 /* the rsb was active */
1334                 unlock_rsb(r);
1335                 put_rsb(r);
1336
1337                 return 0;
1338         } else {
1339                 read_unlock_bh(&ls->ls_rsbtbl_lock);
1340                 goto not_found;
1341         }
1342
1343  do_toss:
1344         /* unlikely path - relookup under write */
1345         write_lock_bh(&ls->ls_rsbtbl_lock);
1346
1347         /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
1348          * check if the rsb is still in toss state, if not relookup
1349          */
1350         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1351         if (!error) {
1352                 if (!rsb_flag(r, RSB_TOSS)) {
1353                         write_unlock_bh(&ls->ls_rsbtbl_lock);
1354                         /* something as changed, very unlikely but
1355                          * try again
1356                          */
1357                         goto retry_lookup;
1358                 }
1359         } else {
1360                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1361                 goto not_found;
1362         }
1363
1364         /* because the rsb is inactive (on toss list), it's not refcounted
1365          * and lock_rsb is not used, but is protected by the rsbtbl lock
1366          */
1367
1368         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1369                             r_nodeid, result);
1370
1371         rsb_mod_timer(ls, r);
1372         /* the rsb was inactive (on toss list) */
1373         write_unlock_bh(&ls->ls_rsbtbl_lock);
1374
1375         return 0;
1376
1377  not_found:
1378         error = get_rsb_struct(ls, name, len, &r);
1379         if (error == -EAGAIN)
1380                 goto retry;
1381         if (error)
1382                 goto out;
1383
1384         r->res_hash = hash;
1385         r->res_dir_nodeid = our_nodeid;
1386         r->res_master_nodeid = from_nodeid;
1387         r->res_nodeid = from_nodeid;
1388         kref_init(&r->res_ref);
1389         rsb_set_flag(r, RSB_TOSS);
1390
1391         write_lock_bh(&ls->ls_rsbtbl_lock);
1392         error = rsb_insert(r, &ls->ls_rsbtbl);
1393         if (error == -EEXIST) {
1394                 /* somebody else was faster and it seems the
1395                  * rsb exists now, we do a whole relookup
1396                  */
1397                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1398                 dlm_free_rsb(r);
1399                 goto retry_lookup;
1400         } else if (error) {
1401                 write_unlock_bh(&ls->ls_rsbtbl_lock);
1402                 /* should never happen */
1403                 dlm_free_rsb(r);
1404                 goto retry;
1405         }
1406
1407         list_add(&r->res_rsbs_list, &ls->ls_toss);
1408         rsb_mod_timer(ls, r);
1409         write_unlock_bh(&ls->ls_rsbtbl_lock);
1410
1411         if (result)
1412                 *result = DLM_LU_ADD;
1413         *r_nodeid = from_nodeid;
1414  out:
1415         return error;
1416 }
1417
1418 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1419 {
1420         struct dlm_rsb *r;
1421
1422         read_lock_bh(&ls->ls_rsbtbl_lock);
1423         list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
1424                 if (r->res_hash == hash)
1425                         dlm_dump_rsb(r);
1426         }
1427         read_unlock_bh(&ls->ls_rsbtbl_lock);
1428 }
1429
1430 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1431 {
1432         struct dlm_rsb *r = NULL;
1433         int error;
1434
1435         read_lock_bh(&ls->ls_rsbtbl_lock);
1436         error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1437         if (!error)
1438                 goto out;
1439
1440         dlm_dump_rsb(r);
1441  out:
1442         read_unlock_bh(&ls->ls_rsbtbl_lock);
1443 }
1444
1445 static void toss_rsb(struct kref *kref)
1446 {
1447         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1448         struct dlm_ls *ls = r->res_ls;
1449
1450         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1451         rsb_set_flag(r, RSB_TOSS);
1452         list_move(&r->res_rsbs_list, &ls->ls_toss);
1453         rsb_mod_timer(ls, r);
1454
1455         if (r->res_lvbptr) {
1456                 dlm_free_lvb(r->res_lvbptr);
1457                 r->res_lvbptr = NULL;
1458         }
1459 }
1460
1461 /* See comment for unhold_lkb */
1462
1463 static void unhold_rsb(struct dlm_rsb *r)
1464 {
1465         int rv;
1466
1467         /* rsbs in toss state never get referenced */
1468         WARN_ON(rsb_flag(r, RSB_TOSS));
1469         rv = kref_put(&r->res_ref, toss_rsb);
1470         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1471 }
1472
1473 void free_toss_rsb(struct dlm_rsb *r)
1474 {
1475         WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
1476
1477         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1478         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1479         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1480         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1481         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1482         DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
1483         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1484         DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1485
1486         dlm_free_rsb(r);
1487 }
1488
1489 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1490    The rsb must exist as long as any lkb's for it do. */
1491
1492 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1493 {
1494         hold_rsb(r);
1495         lkb->lkb_resource = r;
1496 }
1497
1498 static void detach_lkb(struct dlm_lkb *lkb)
1499 {
1500         if (lkb->lkb_resource) {
1501                 put_rsb(lkb->lkb_resource);
1502                 lkb->lkb_resource = NULL;
1503         }
1504 }
1505
1506 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1507                        int start, int end)
1508 {
1509         struct dlm_lkb *lkb;
1510         int rv;
1511
1512         lkb = dlm_allocate_lkb(ls);
1513         if (!lkb)
1514                 return -ENOMEM;
1515
1516         lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1517         lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1518         lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1519         lkb->lkb_nodeid = -1;
1520         lkb->lkb_grmode = DLM_LOCK_IV;
1521         kref_init(&lkb->lkb_ref);
1522         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1523         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1524
1525         write_lock_bh(&ls->ls_lkbidr_lock);
1526         rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1527         if (rv >= 0)
1528                 lkb->lkb_id = rv;
1529         write_unlock_bh(&ls->ls_lkbidr_lock);
1530
1531         if (rv < 0) {
1532                 log_error(ls, "create_lkb idr error %d", rv);
1533                 dlm_free_lkb(lkb);
1534                 return rv;
1535         }
1536
1537         *lkb_ret = lkb;
1538         return 0;
1539 }
1540
1541 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1542 {
1543         return _create_lkb(ls, lkb_ret, 1, 0);
1544 }
1545
1546 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1547 {
1548         struct dlm_lkb *lkb;
1549
1550         read_lock_bh(&ls->ls_lkbidr_lock);
1551         lkb = idr_find(&ls->ls_lkbidr, lkid);
1552         if (lkb)
1553                 kref_get(&lkb->lkb_ref);
1554         read_unlock_bh(&ls->ls_lkbidr_lock);
1555
1556         *lkb_ret = lkb;
1557         return lkb ? 0 : -ENOENT;
1558 }
1559
1560 static void kill_lkb(struct kref *kref)
1561 {
1562         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1563
1564         /* All work is done after the return from kref_put() so we
1565            can release the write_lock before the detach_lkb */
1566
1567         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1568 }
1569
1570 /* __put_lkb() is used when an lkb may not have an rsb attached to
1571    it so we need to provide the lockspace explicitly */
1572
1573 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1574 {
1575         uint32_t lkid = lkb->lkb_id;
1576         int rv;
1577
1578         rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1579                                         &ls->ls_lkbidr_lock);
1580         if (rv) {
1581                 idr_remove(&ls->ls_lkbidr, lkid);
1582                 write_unlock_bh(&ls->ls_lkbidr_lock);
1583
1584                 detach_lkb(lkb);
1585
1586                 /* for local/process lkbs, lvbptr points to caller's lksb */
1587                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1588                         dlm_free_lvb(lkb->lkb_lvbptr);
1589                 dlm_free_lkb(lkb);
1590         }
1591
1592         return rv;
1593 }
1594
1595 int dlm_put_lkb(struct dlm_lkb *lkb)
1596 {
1597         struct dlm_ls *ls;
1598
1599         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1600         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1601
1602         ls = lkb->lkb_resource->res_ls;
1603         return __put_lkb(ls, lkb);
1604 }
1605
1606 /* This is only called to add a reference when the code already holds
1607    a valid reference to the lkb, so there's no need for locking. */
1608
1609 static inline void hold_lkb(struct dlm_lkb *lkb)
1610 {
1611         kref_get(&lkb->lkb_ref);
1612 }
1613
1614 static void unhold_lkb_assert(struct kref *kref)
1615 {
1616         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1617
1618         DLM_ASSERT(false, dlm_print_lkb(lkb););
1619 }
1620
1621 /* This is called when we need to remove a reference and are certain
1622    it's not the last ref.  e.g. del_lkb is always called between a
1623    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1624    put_lkb would work fine, but would involve unnecessary locking */
1625
1626 static inline void unhold_lkb(struct dlm_lkb *lkb)
1627 {
1628         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1629 }
1630
1631 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1632                             int mode)
1633 {
1634         struct dlm_lkb *lkb = NULL, *iter;
1635
1636         list_for_each_entry(iter, head, lkb_statequeue)
1637                 if (iter->lkb_rqmode < mode) {
1638                         lkb = iter;
1639                         list_add_tail(new, &iter->lkb_statequeue);
1640                         break;
1641                 }
1642
1643         if (!lkb)
1644                 list_add_tail(new, head);
1645 }
1646
1647 /* add/remove lkb to rsb's grant/convert/wait queue */
1648
1649 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1650 {
1651         kref_get(&lkb->lkb_ref);
1652
1653         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1654
1655         lkb->lkb_timestamp = ktime_get();
1656
1657         lkb->lkb_status = status;
1658
1659         switch (status) {
1660         case DLM_LKSTS_WAITING:
1661                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1662                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1663                 else
1664                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1665                 break;
1666         case DLM_LKSTS_GRANTED:
1667                 /* convention says granted locks kept in order of grmode */
1668                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1669                                 lkb->lkb_grmode);
1670                 break;
1671         case DLM_LKSTS_CONVERT:
1672                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1673                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1674                 else
1675                         list_add_tail(&lkb->lkb_statequeue,
1676                                       &r->res_convertqueue);
1677                 break;
1678         default:
1679                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1680         }
1681 }
1682
1683 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1684 {
1685         lkb->lkb_status = 0;
1686         list_del(&lkb->lkb_statequeue);
1687         unhold_lkb(lkb);
1688 }
1689
1690 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1691 {
1692         hold_lkb(lkb);
1693         del_lkb(r, lkb);
1694         add_lkb(r, lkb, sts);
1695         unhold_lkb(lkb);
1696 }
1697
1698 static int msg_reply_type(int mstype)
1699 {
1700         switch (mstype) {
1701         case DLM_MSG_REQUEST:
1702                 return DLM_MSG_REQUEST_REPLY;
1703         case DLM_MSG_CONVERT:
1704                 return DLM_MSG_CONVERT_REPLY;
1705         case DLM_MSG_UNLOCK:
1706                 return DLM_MSG_UNLOCK_REPLY;
1707         case DLM_MSG_CANCEL:
1708                 return DLM_MSG_CANCEL_REPLY;
1709         case DLM_MSG_LOOKUP:
1710                 return DLM_MSG_LOOKUP_REPLY;
1711         }
1712         return -1;
1713 }
1714
1715 /* add/remove lkb from global waiters list of lkb's waiting for
1716    a reply from a remote node */
1717
1718 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1719 {
1720         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1721         int error = 0;
1722
1723         spin_lock_bh(&ls->ls_waiters_lock);
1724
1725         if (is_overlap_unlock(lkb) ||
1726             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1727                 error = -EINVAL;
1728                 goto out;
1729         }
1730
1731         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1732                 switch (mstype) {
1733                 case DLM_MSG_UNLOCK:
1734                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1735                         break;
1736                 case DLM_MSG_CANCEL:
1737                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1738                         break;
1739                 default:
1740                         error = -EBUSY;
1741                         goto out;
1742                 }
1743                 lkb->lkb_wait_count++;
1744                 hold_lkb(lkb);
1745
1746                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748                           lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749                 goto out;
1750         }
1751
1752         DLM_ASSERT(!lkb->lkb_wait_count,
1753                    dlm_print_lkb(lkb);
1754                    printk("wait_count %d\n", lkb->lkb_wait_count););
1755
1756         lkb->lkb_wait_count++;
1757         lkb->lkb_wait_type = mstype;
1758         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759         hold_lkb(lkb);
1760         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761  out:
1762         if (error)
1763                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1764                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1765                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1766         spin_unlock_bh(&ls->ls_waiters_lock);
1767         return error;
1768 }
1769
1770 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1771    list as part of process_requestqueue (e.g. a lookup that has an optimized
1772    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1773    set RESEND and dlm_recover_waiters_post() */
1774
1775 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1776                                 const struct dlm_message *ms)
1777 {
1778         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1779         int overlap_done = 0;
1780
1781         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1782             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1783                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1784                 overlap_done = 1;
1785                 goto out_del;
1786         }
1787
1788         if (mstype == DLM_MSG_CANCEL_REPLY &&
1789             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1790                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1791                 overlap_done = 1;
1792                 goto out_del;
1793         }
1794
1795         /* Cancel state was preemptively cleared by a successful convert,
1796            see next comment, nothing to do. */
1797
1798         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1799             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1800                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1801                           lkb->lkb_id, lkb->lkb_wait_type);
1802                 return -1;
1803         }
1804
1805         /* Remove for the convert reply, and premptively remove for the
1806            cancel reply.  A convert has been granted while there's still
1807            an outstanding cancel on it (the cancel is moot and the result
1808            in the cancel reply should be 0).  We preempt the cancel reply
1809            because the app gets the convert result and then can follow up
1810            with another op, like convert.  This subsequent op would see the
1811            lingering state of the cancel and fail with -EBUSY. */
1812
1813         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1814             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1815             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1816                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1817                           lkb->lkb_id);
1818                 lkb->lkb_wait_type = 0;
1819                 lkb->lkb_wait_count--;
1820                 unhold_lkb(lkb);
1821                 goto out_del;
1822         }
1823
1824         /* N.B. type of reply may not always correspond to type of original
1825            msg due to lookup->request optimization, verify others? */
1826
1827         if (lkb->lkb_wait_type) {
1828                 lkb->lkb_wait_type = 0;
1829                 goto out_del;
1830         }
1831
1832         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1833                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1834                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1835         return -1;
1836
1837  out_del:
1838         /* the force-unlock/cancel has completed and we haven't recvd a reply
1839            to the op that was in progress prior to the unlock/cancel; we
1840            give up on any reply to the earlier op.  FIXME: not sure when/how
1841            this would happen */
1842
1843         if (overlap_done && lkb->lkb_wait_type) {
1844                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1845                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1846                 lkb->lkb_wait_count--;
1847                 unhold_lkb(lkb);
1848                 lkb->lkb_wait_type = 0;
1849         }
1850
1851         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1852
1853         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1854         lkb->lkb_wait_count--;
1855         if (!lkb->lkb_wait_count)
1856                 list_del_init(&lkb->lkb_wait_reply);
1857         unhold_lkb(lkb);
1858         return 0;
1859 }
1860
1861 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1862 {
1863         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1864         int error;
1865
1866         spin_lock_bh(&ls->ls_waiters_lock);
1867         error = _remove_from_waiters(lkb, mstype, NULL);
1868         spin_unlock_bh(&ls->ls_waiters_lock);
1869         return error;
1870 }
1871
1872 /* Handles situations where we might be processing a "fake" or "local" reply in
1873  * the recovery context which stops any locking activity. Only debugfs might
1874  * change the lockspace waiters but they will held the recovery lock to ensure
1875  * remove_from_waiters_ms() in local case will be the only user manipulating the
1876  * lockspace waiters in recovery context.
1877  */
1878
1879 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1880                                   const struct dlm_message *ms, bool local)
1881 {
1882         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1883         int error;
1884
1885         if (!local)
1886                 spin_lock_bh(&ls->ls_waiters_lock);
1887         else
1888                 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1889                              !dlm_locking_stopped(ls));
1890         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1891         if (!local)
1892                 spin_unlock_bh(&ls->ls_waiters_lock);
1893         return error;
1894 }
1895
1896 /* lkb is master or local copy */
1897
1898 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1899 {
1900         int b, len = r->res_ls->ls_lvblen;
1901
1902         /* b=1 lvb returned to caller
1903            b=0 lvb written to rsb or invalidated
1904            b=-1 do nothing */
1905
1906         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1907
1908         if (b == 1) {
1909                 if (!lkb->lkb_lvbptr)
1910                         return;
1911
1912                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1913                         return;
1914
1915                 if (!r->res_lvbptr)
1916                         return;
1917
1918                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1919                 lkb->lkb_lvbseq = r->res_lvbseq;
1920
1921         } else if (b == 0) {
1922                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1923                         rsb_set_flag(r, RSB_VALNOTVALID);
1924                         return;
1925                 }
1926
1927                 if (!lkb->lkb_lvbptr)
1928                         return;
1929
1930                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1931                         return;
1932
1933                 if (!r->res_lvbptr)
1934                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1935
1936                 if (!r->res_lvbptr)
1937                         return;
1938
1939                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1940                 r->res_lvbseq++;
1941                 lkb->lkb_lvbseq = r->res_lvbseq;
1942                 rsb_clear_flag(r, RSB_VALNOTVALID);
1943         }
1944
1945         if (rsb_flag(r, RSB_VALNOTVALID))
1946                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1947 }
1948
1949 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1950 {
1951         if (lkb->lkb_grmode < DLM_LOCK_PW)
1952                 return;
1953
1954         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1955                 rsb_set_flag(r, RSB_VALNOTVALID);
1956                 return;
1957         }
1958
1959         if (!lkb->lkb_lvbptr)
1960                 return;
1961
1962         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963                 return;
1964
1965         if (!r->res_lvbptr)
1966                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1967
1968         if (!r->res_lvbptr)
1969                 return;
1970
1971         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1972         r->res_lvbseq++;
1973         rsb_clear_flag(r, RSB_VALNOTVALID);
1974 }
1975
1976 /* lkb is process copy (pc) */
1977
1978 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1979                             const struct dlm_message *ms)
1980 {
1981         int b;
1982
1983         if (!lkb->lkb_lvbptr)
1984                 return;
1985
1986         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1987                 return;
1988
1989         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1990         if (b == 1) {
1991                 int len = receive_extralen(ms);
1992                 if (len > r->res_ls->ls_lvblen)
1993                         len = r->res_ls->ls_lvblen;
1994                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1995                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1996         }
1997 }
1998
1999 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2000    remove_lock -- used for unlock, removes lkb from granted
2001    revert_lock -- used for cancel, moves lkb from convert to granted
2002    grant_lock  -- used for request and convert, adds lkb to granted or
2003                   moves lkb from convert or waiting to granted
2004
2005    Each of these is used for master or local copy lkb's.  There is
2006    also a _pc() variation used to make the corresponding change on
2007    a process copy (pc) lkb. */
2008
2009 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010 {
2011         del_lkb(r, lkb);
2012         lkb->lkb_grmode = DLM_LOCK_IV;
2013         /* this unhold undoes the original ref from create_lkb()
2014            so this leads to the lkb being freed */
2015         unhold_lkb(lkb);
2016 }
2017
2018 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2019 {
2020         set_lvb_unlock(r, lkb);
2021         _remove_lock(r, lkb);
2022 }
2023
2024 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025 {
2026         _remove_lock(r, lkb);
2027 }
2028
2029 /* returns: 0 did nothing
2030             1 moved lock to granted
2031            -1 removed lock */
2032
2033 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2034 {
2035         int rv = 0;
2036
2037         lkb->lkb_rqmode = DLM_LOCK_IV;
2038
2039         switch (lkb->lkb_status) {
2040         case DLM_LKSTS_GRANTED:
2041                 break;
2042         case DLM_LKSTS_CONVERT:
2043                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2044                 rv = 1;
2045                 break;
2046         case DLM_LKSTS_WAITING:
2047                 del_lkb(r, lkb);
2048                 lkb->lkb_grmode = DLM_LOCK_IV;
2049                 /* this unhold undoes the original ref from create_lkb()
2050                    so this leads to the lkb being freed */
2051                 unhold_lkb(lkb);
2052                 rv = -1;
2053                 break;
2054         default:
2055                 log_print("invalid status for revert %d", lkb->lkb_status);
2056         }
2057         return rv;
2058 }
2059
2060 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061 {
2062         return revert_lock(r, lkb);
2063 }
2064
2065 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 {
2067         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2068                 lkb->lkb_grmode = lkb->lkb_rqmode;
2069                 if (lkb->lkb_status)
2070                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2071                 else
2072                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2073         }
2074
2075         lkb->lkb_rqmode = DLM_LOCK_IV;
2076         lkb->lkb_highbast = 0;
2077 }
2078
2079 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2080 {
2081         set_lvb_lock(r, lkb);
2082         _grant_lock(r, lkb);
2083 }
2084
2085 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2086                           const struct dlm_message *ms)
2087 {
2088         set_lvb_lock_pc(r, lkb, ms);
2089         _grant_lock(r, lkb);
2090 }
2091
2092 /* called by grant_pending_locks() which means an async grant message must
2093    be sent to the requesting node in addition to granting the lock if the
2094    lkb belongs to a remote node. */
2095
2096 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2097 {
2098         grant_lock(r, lkb);
2099         if (is_master_copy(lkb))
2100                 send_grant(r, lkb);
2101         else
2102                 queue_cast(r, lkb, 0);
2103 }
2104
2105 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2106    change the granted/requested modes.  We're munging things accordingly in
2107    the process copy.
2108    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2109    conversion deadlock
2110    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2111    compatible with other granted locks */
2112
2113 static void munge_demoted(struct dlm_lkb *lkb)
2114 {
2115         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2116                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2117                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2118                 return;
2119         }
2120
2121         lkb->lkb_grmode = DLM_LOCK_NL;
2122 }
2123
2124 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2125 {
2126         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2127             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2128                 log_print("munge_altmode %x invalid reply type %d",
2129                           lkb->lkb_id, le32_to_cpu(ms->m_type));
2130                 return;
2131         }
2132
2133         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2134                 lkb->lkb_rqmode = DLM_LOCK_PR;
2135         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2136                 lkb->lkb_rqmode = DLM_LOCK_CW;
2137         else {
2138                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2139                 dlm_print_lkb(lkb);
2140         }
2141 }
2142
2143 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2144 {
2145         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2146                                            lkb_statequeue);
2147         if (lkb->lkb_id == first->lkb_id)
2148                 return 1;
2149
2150         return 0;
2151 }
2152
2153 /* Check if the given lkb conflicts with another lkb on the queue. */
2154
2155 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2156 {
2157         struct dlm_lkb *this;
2158
2159         list_for_each_entry(this, head, lkb_statequeue) {
2160                 if (this == lkb)
2161                         continue;
2162                 if (!modes_compat(this, lkb))
2163                         return 1;
2164         }
2165         return 0;
2166 }
2167
2168 /*
2169  * "A conversion deadlock arises with a pair of lock requests in the converting
2170  * queue for one resource.  The granted mode of each lock blocks the requested
2171  * mode of the other lock."
2172  *
2173  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2174  * convert queue from being granted, then deadlk/demote lkb.
2175  *
2176  * Example:
2177  * Granted Queue: empty
2178  * Convert Queue: NL->EX (first lock)
2179  *                PR->EX (second lock)
2180  *
2181  * The first lock can't be granted because of the granted mode of the second
2182  * lock and the second lock can't be granted because it's not first in the
2183  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2184  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2185  * flag set and return DEMOTED in the lksb flags.
2186  *
2187  * Originally, this function detected conv-deadlk in a more limited scope:
2188  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2189  * - if lkb1 was the first entry in the queue (not just earlier), and was
2190  *   blocked by the granted mode of lkb2, and there was nothing on the
2191  *   granted queue preventing lkb1 from being granted immediately, i.e.
2192  *   lkb2 was the only thing preventing lkb1 from being granted.
2193  *
2194  * That second condition meant we'd only say there was conv-deadlk if
2195  * resolving it (by demotion) would lead to the first lock on the convert
2196  * queue being granted right away.  It allowed conversion deadlocks to exist
2197  * between locks on the convert queue while they couldn't be granted anyway.
2198  *
2199  * Now, we detect and take action on conversion deadlocks immediately when
2200  * they're created, even if they may not be immediately consequential.  If
2201  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2202  * mode that would prevent lkb1's conversion from being granted, we do a
2203  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2204  * I think this means that the lkb_is_ahead condition below should always
2205  * be zero, i.e. there will never be conv-deadlk between two locks that are
2206  * both already on the convert queue.
2207  */
2208
2209 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2210 {
2211         struct dlm_lkb *lkb1;
2212         int lkb_is_ahead = 0;
2213
2214         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2215                 if (lkb1 == lkb2) {
2216                         lkb_is_ahead = 1;
2217                         continue;
2218                 }
2219
2220                 if (!lkb_is_ahead) {
2221                         if (!modes_compat(lkb2, lkb1))
2222                                 return 1;
2223                 } else {
2224                         if (!modes_compat(lkb2, lkb1) &&
2225                             !modes_compat(lkb1, lkb2))
2226                                 return 1;
2227                 }
2228         }
2229         return 0;
2230 }
2231
2232 /*
2233  * Return 1 if the lock can be granted, 0 otherwise.
2234  * Also detect and resolve conversion deadlocks.
2235  *
2236  * lkb is the lock to be granted
2237  *
2238  * now is 1 if the function is being called in the context of the
2239  * immediate request, it is 0 if called later, after the lock has been
2240  * queued.
2241  *
2242  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2243  * after recovery.
2244  *
2245  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2246  */
2247
2248 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2249                            int recover)
2250 {
2251         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2252
2253         /*
2254          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2255          * a new request for a NL mode lock being blocked.
2256          *
2257          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2258          * request, then it would be granted.  In essence, the use of this flag
2259          * tells the Lock Manager to expedite theis request by not considering
2260          * what may be in the CONVERTING or WAITING queues...  As of this
2261          * writing, the EXPEDITE flag can be used only with new requests for NL
2262          * mode locks.  This flag is not valid for conversion requests.
2263          *
2264          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2265          * conversion or used with a non-NL requested mode.  We also know an
2266          * EXPEDITE request is always granted immediately, so now must always
2267          * be 1.  The full condition to grant an expedite request: (now &&
2268          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2269          * therefore be shortened to just checking the flag.
2270          */
2271
2272         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2273                 return 1;
2274
2275         /*
2276          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2277          * added to the remaining conditions.
2278          */
2279
2280         if (queue_conflict(&r->res_grantqueue, lkb))
2281                 return 0;
2282
2283         /*
2284          * 6-3: By default, a conversion request is immediately granted if the
2285          * requested mode is compatible with the modes of all other granted
2286          * locks
2287          */
2288
2289         if (queue_conflict(&r->res_convertqueue, lkb))
2290                 return 0;
2291
2292         /*
2293          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2294          * locks for a recovered rsb, on which lkb's have been rebuilt.
2295          * The lkb's may have been rebuilt on the queues in a different
2296          * order than they were in on the previous master.  So, granting
2297          * queued conversions in order after recovery doesn't make sense
2298          * since the order hasn't been preserved anyway.  The new order
2299          * could also have created a new "in place" conversion deadlock.
2300          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2301          * After recovery, there would be no granted locks, and possibly
2302          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2303          * recovery, grant conversions without considering order.
2304          */
2305
2306         if (conv && recover)
2307                 return 1;
2308
2309         /*
2310          * 6-5: But the default algorithm for deciding whether to grant or
2311          * queue conversion requests does not by itself guarantee that such
2312          * requests are serviced on a "first come first serve" basis.  This, in
2313          * turn, can lead to a phenomenon known as "indefinate postponement".
2314          *
2315          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2316          * the system service employed to request a lock conversion.  This flag
2317          * forces certain conversion requests to be queued, even if they are
2318          * compatible with the granted modes of other locks on the same
2319          * resource.  Thus, the use of this flag results in conversion requests
2320          * being ordered on a "first come first servce" basis.
2321          *
2322          * DCT: This condition is all about new conversions being able to occur
2323          * "in place" while the lock remains on the granted queue (assuming
2324          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2325          * doesn't _have_ to go onto the convert queue where it's processed in
2326          * order.  The "now" variable is necessary to distinguish converts
2327          * being received and processed for the first time now, because once a
2328          * convert is moved to the conversion queue the condition below applies
2329          * requiring fifo granting.
2330          */
2331
2332         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2333                 return 1;
2334
2335         /*
2336          * Even if the convert is compat with all granted locks,
2337          * QUECVT forces it behind other locks on the convert queue.
2338          */
2339
2340         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2341                 if (list_empty(&r->res_convertqueue))
2342                         return 1;
2343                 else
2344                         return 0;
2345         }
2346
2347         /*
2348          * The NOORDER flag is set to avoid the standard vms rules on grant
2349          * order.
2350          */
2351
2352         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2353                 return 1;
2354
2355         /*
2356          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2357          * granted until all other conversion requests ahead of it are granted
2358          * and/or canceled.
2359          */
2360
2361         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2362                 return 1;
2363
2364         /*
2365          * 6-4: By default, a new request is immediately granted only if all
2366          * three of the following conditions are satisfied when the request is
2367          * issued:
2368          * - The queue of ungranted conversion requests for the resource is
2369          *   empty.
2370          * - The queue of ungranted new requests for the resource is empty.
2371          * - The mode of the new request is compatible with the most
2372          *   restrictive mode of all granted locks on the resource.
2373          */
2374
2375         if (now && !conv && list_empty(&r->res_convertqueue) &&
2376             list_empty(&r->res_waitqueue))
2377                 return 1;
2378
2379         /*
2380          * 6-4: Once a lock request is in the queue of ungranted new requests,
2381          * it cannot be granted until the queue of ungranted conversion
2382          * requests is empty, all ungranted new requests ahead of it are
2383          * granted and/or canceled, and it is compatible with the granted mode
2384          * of the most restrictive lock granted on the resource.
2385          */
2386
2387         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2388             first_in_list(lkb, &r->res_waitqueue))
2389                 return 1;
2390
2391         return 0;
2392 }
2393
2394 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2395                           int recover, int *err)
2396 {
2397         int rv;
2398         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2399         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2400
2401         if (err)
2402                 *err = 0;
2403
2404         rv = _can_be_granted(r, lkb, now, recover);
2405         if (rv)
2406                 goto out;
2407
2408         /*
2409          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2410          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2411          * cancels one of the locks.
2412          */
2413
2414         if (is_convert && can_be_queued(lkb) &&
2415             conversion_deadlock_detect(r, lkb)) {
2416                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2417                         lkb->lkb_grmode = DLM_LOCK_NL;
2418                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2419                 } else if (err) {
2420                         *err = -EDEADLK;
2421                 } else {
2422                         log_print("can_be_granted deadlock %x now %d",
2423                                   lkb->lkb_id, now);
2424                         dlm_dump_rsb(r);
2425                 }
2426                 goto out;
2427         }
2428
2429         /*
2430          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2431          * to grant a request in a mode other than the normal rqmode.  It's a
2432          * simple way to provide a big optimization to applications that can
2433          * use them.
2434          */
2435
2436         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2437                 alt = DLM_LOCK_PR;
2438         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2439                 alt = DLM_LOCK_CW;
2440
2441         if (alt) {
2442                 lkb->lkb_rqmode = alt;
2443                 rv = _can_be_granted(r, lkb, now, 0);
2444                 if (rv)
2445                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2446                 else
2447                         lkb->lkb_rqmode = rqmode;
2448         }
2449  out:
2450         return rv;
2451 }
2452
2453 /* Returns the highest requested mode of all blocked conversions; sets
2454    cw if there's a blocked conversion to DLM_LOCK_CW. */
2455
2456 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2457                                  unsigned int *count)
2458 {
2459         struct dlm_lkb *lkb, *s;
2460         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2461         int hi, demoted, quit, grant_restart, demote_restart;
2462         int deadlk;
2463
2464         quit = 0;
2465  restart:
2466         grant_restart = 0;
2467         demote_restart = 0;
2468         hi = DLM_LOCK_IV;
2469
2470         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2471                 demoted = is_demoted(lkb);
2472                 deadlk = 0;
2473
2474                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2475                         grant_lock_pending(r, lkb);
2476                         grant_restart = 1;
2477                         if (count)
2478                                 (*count)++;
2479                         continue;
2480                 }
2481
2482                 if (!demoted && is_demoted(lkb)) {
2483                         log_print("WARN: pending demoted %x node %d %s",
2484                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2485                         demote_restart = 1;
2486                         continue;
2487                 }
2488
2489                 if (deadlk) {
2490                         /*
2491                          * If DLM_LKB_NODLKWT flag is set and conversion
2492                          * deadlock is detected, we request blocking AST and
2493                          * down (or cancel) conversion.
2494                          */
2495                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2496                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2497                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2498                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2499                                 }
2500                         } else {
2501                                 log_print("WARN: pending deadlock %x node %d %s",
2502                                           lkb->lkb_id, lkb->lkb_nodeid,
2503                                           r->res_name);
2504                                 dlm_dump_rsb(r);
2505                         }
2506                         continue;
2507                 }
2508
2509                 hi = max_t(int, lkb->lkb_rqmode, hi);
2510
2511                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2512                         *cw = 1;
2513         }
2514
2515         if (grant_restart)
2516                 goto restart;
2517         if (demote_restart && !quit) {
2518                 quit = 1;
2519                 goto restart;
2520         }
2521
2522         return max_t(int, high, hi);
2523 }
2524
2525 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2526                               unsigned int *count)
2527 {
2528         struct dlm_lkb *lkb, *s;
2529
2530         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2531                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2532                         grant_lock_pending(r, lkb);
2533                         if (count)
2534                                 (*count)++;
2535                 } else {
2536                         high = max_t(int, lkb->lkb_rqmode, high);
2537                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2538                                 *cw = 1;
2539                 }
2540         }
2541
2542         return high;
2543 }
2544
2545 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2546    on either the convert or waiting queue.
2547    high is the largest rqmode of all locks blocked on the convert or
2548    waiting queue. */
2549
2550 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2551 {
2552         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2553                 if (gr->lkb_highbast < DLM_LOCK_EX)
2554                         return 1;
2555                 return 0;
2556         }
2557
2558         if (gr->lkb_highbast < high &&
2559             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2560                 return 1;
2561         return 0;
2562 }
2563
2564 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2565 {
2566         struct dlm_lkb *lkb, *s;
2567         int high = DLM_LOCK_IV;
2568         int cw = 0;
2569
2570         if (!is_master(r)) {
2571                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2572                 dlm_dump_rsb(r);
2573                 return;
2574         }
2575
2576         high = grant_pending_convert(r, high, &cw, count);
2577         high = grant_pending_wait(r, high, &cw, count);
2578
2579         if (high == DLM_LOCK_IV)
2580                 return;
2581
2582         /*
2583          * If there are locks left on the wait/convert queue then send blocking
2584          * ASTs to granted locks based on the largest requested mode (high)
2585          * found above.
2586          */
2587
2588         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2589                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2590                         if (cw && high == DLM_LOCK_PR &&
2591                             lkb->lkb_grmode == DLM_LOCK_PR)
2592                                 queue_bast(r, lkb, DLM_LOCK_CW);
2593                         else
2594                                 queue_bast(r, lkb, high);
2595                         lkb->lkb_highbast = high;
2596                 }
2597         }
2598 }
2599
2600 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2601 {
2602         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2603             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2604                 if (gr->lkb_highbast < DLM_LOCK_EX)
2605                         return 1;
2606                 return 0;
2607         }
2608
2609         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2610                 return 1;
2611         return 0;
2612 }
2613
2614 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2615                             struct dlm_lkb *lkb)
2616 {
2617         struct dlm_lkb *gr;
2618
2619         list_for_each_entry(gr, head, lkb_statequeue) {
2620                 /* skip self when sending basts to convertqueue */
2621                 if (gr == lkb)
2622                         continue;
2623                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2624                         queue_bast(r, gr, lkb->lkb_rqmode);
2625                         gr->lkb_highbast = lkb->lkb_rqmode;
2626                 }
2627         }
2628 }
2629
2630 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631 {
2632         send_bast_queue(r, &r->res_grantqueue, lkb);
2633 }
2634
2635 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2636 {
2637         send_bast_queue(r, &r->res_grantqueue, lkb);
2638         send_bast_queue(r, &r->res_convertqueue, lkb);
2639 }
2640
2641 /* set_master(r, lkb) -- set the master nodeid of a resource
2642
2643    The purpose of this function is to set the nodeid field in the given
2644    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2645    known, it can just be copied to the lkb and the function will return
2646    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2647    before it can be copied to the lkb.
2648
2649    When the rsb nodeid is being looked up remotely, the initial lkb
2650    causing the lookup is kept on the ls_waiters list waiting for the
2651    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2652    on the rsb's res_lookup list until the master is verified.
2653
2654    Return values:
2655    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2656    1: the rsb master is not available and the lkb has been placed on
2657       a wait queue
2658 */
2659
2660 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2661 {
2662         int our_nodeid = dlm_our_nodeid();
2663
2664         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2665                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2666                 r->res_first_lkid = lkb->lkb_id;
2667                 lkb->lkb_nodeid = r->res_nodeid;
2668                 return 0;
2669         }
2670
2671         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2672                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2673                 return 1;
2674         }
2675
2676         if (r->res_master_nodeid == our_nodeid) {
2677                 lkb->lkb_nodeid = 0;
2678                 return 0;
2679         }
2680
2681         if (r->res_master_nodeid) {
2682                 lkb->lkb_nodeid = r->res_master_nodeid;
2683                 return 0;
2684         }
2685
2686         if (dlm_dir_nodeid(r) == our_nodeid) {
2687                 /* This is a somewhat unusual case; find_rsb will usually
2688                    have set res_master_nodeid when dir nodeid is local, but
2689                    there are cases where we become the dir node after we've
2690                    past find_rsb and go through _request_lock again.
2691                    confirm_master() or process_lookup_list() needs to be
2692                    called after this. */
2693                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2694                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2695                           r->res_name);
2696                 r->res_master_nodeid = our_nodeid;
2697                 r->res_nodeid = 0;
2698                 lkb->lkb_nodeid = 0;
2699                 return 0;
2700         }
2701
2702         r->res_first_lkid = lkb->lkb_id;
2703         send_lookup(r, lkb);
2704         return 1;
2705 }
2706
2707 static void process_lookup_list(struct dlm_rsb *r)
2708 {
2709         struct dlm_lkb *lkb, *safe;
2710
2711         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2712                 list_del_init(&lkb->lkb_rsb_lookup);
2713                 _request_lock(r, lkb);
2714         }
2715 }
2716
2717 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2718
2719 static void confirm_master(struct dlm_rsb *r, int error)
2720 {
2721         struct dlm_lkb *lkb;
2722
2723         if (!r->res_first_lkid)
2724                 return;
2725
2726         switch (error) {
2727         case 0:
2728         case -EINPROGRESS:
2729                 r->res_first_lkid = 0;
2730                 process_lookup_list(r);
2731                 break;
2732
2733         case -EAGAIN:
2734         case -EBADR:
2735         case -ENOTBLK:
2736                 /* the remote request failed and won't be retried (it was
2737                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2738                    lkb the first_lkid */
2739
2740                 r->res_first_lkid = 0;
2741
2742                 if (!list_empty(&r->res_lookup)) {
2743                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2744                                          lkb_rsb_lookup);
2745                         list_del_init(&lkb->lkb_rsb_lookup);
2746                         r->res_first_lkid = lkb->lkb_id;
2747                         _request_lock(r, lkb);
2748                 }
2749                 break;
2750
2751         default:
2752                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2753         }
2754 }
2755
2756 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2757                          int namelen, void (*ast)(void *astparam),
2758                          void *astparam,
2759                          void (*bast)(void *astparam, int mode),
2760                          struct dlm_args *args)
2761 {
2762         int rv = -EINVAL;
2763
2764         /* check for invalid arg usage */
2765
2766         if (mode < 0 || mode > DLM_LOCK_EX)
2767                 goto out;
2768
2769         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2770                 goto out;
2771
2772         if (flags & DLM_LKF_CANCEL)
2773                 goto out;
2774
2775         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2776                 goto out;
2777
2778         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2779                 goto out;
2780
2781         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2782                 goto out;
2783
2784         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2785                 goto out;
2786
2787         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2788                 goto out;
2789
2790         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2791                 goto out;
2792
2793         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2794                 goto out;
2795
2796         if (!ast || !lksb)
2797                 goto out;
2798
2799         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2800                 goto out;
2801
2802         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2803                 goto out;
2804
2805         /* these args will be copied to the lkb in validate_lock_args,
2806            it cannot be done now because when converting locks, fields in
2807            an active lkb cannot be modified before locking the rsb */
2808
2809         args->flags = flags;
2810         args->astfn = ast;
2811         args->astparam = astparam;
2812         args->bastfn = bast;
2813         args->mode = mode;
2814         args->lksb = lksb;
2815         rv = 0;
2816  out:
2817         return rv;
2818 }
2819
2820 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2821 {
2822         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2823                       DLM_LKF_FORCEUNLOCK))
2824                 return -EINVAL;
2825
2826         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2827                 return -EINVAL;
2828
2829         args->flags = flags;
2830         args->astparam = astarg;
2831         return 0;
2832 }
2833
2834 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2835                               struct dlm_args *args)
2836 {
2837         int rv = -EBUSY;
2838
2839         if (args->flags & DLM_LKF_CONVERT) {
2840                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2841                         goto out;
2842
2843                 /* lock not allowed if there's any op in progress */
2844                 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2845                         goto out;
2846
2847                 if (is_overlap(lkb))
2848                         goto out;
2849
2850                 rv = -EINVAL;
2851                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2852                         goto out;
2853
2854                 if (args->flags & DLM_LKF_QUECVT &&
2855                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2856                         goto out;
2857         }
2858
2859         lkb->lkb_exflags = args->flags;
2860         dlm_set_sbflags_val(lkb, 0);
2861         lkb->lkb_astfn = args->astfn;
2862         lkb->lkb_astparam = args->astparam;
2863         lkb->lkb_bastfn = args->bastfn;
2864         lkb->lkb_rqmode = args->mode;
2865         lkb->lkb_lksb = args->lksb;
2866         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2867         lkb->lkb_ownpid = (int) current->pid;
2868         rv = 0;
2869  out:
2870         switch (rv) {
2871         case 0:
2872                 break;
2873         case -EINVAL:
2874                 /* annoy the user because dlm usage is wrong */
2875                 WARN_ON(1);
2876                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2877                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878                           lkb->lkb_status, lkb->lkb_wait_type,
2879                           lkb->lkb_resource->res_name);
2880                 break;
2881         default:
2882                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2883                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2884                           lkb->lkb_status, lkb->lkb_wait_type,
2885                           lkb->lkb_resource->res_name);
2886                 break;
2887         }
2888
2889         return rv;
2890 }
2891
2892 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2893    for success */
2894
2895 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2896    because there may be a lookup in progress and it's valid to do
2897    cancel/unlockf on it */
2898
2899 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2900 {
2901         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2902         int rv = -EBUSY;
2903
2904         /* normal unlock not allowed if there's any op in progress */
2905         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2906             (lkb->lkb_wait_type || lkb->lkb_wait_count))
2907                 goto out;
2908
2909         /* an lkb may be waiting for an rsb lookup to complete where the
2910            lookup was initiated by another lock */
2911
2912         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2913                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2914                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2915                         list_del_init(&lkb->lkb_rsb_lookup);
2916                         queue_cast(lkb->lkb_resource, lkb,
2917                                    args->flags & DLM_LKF_CANCEL ?
2918                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2919                         unhold_lkb(lkb); /* undoes create_lkb() */
2920                 }
2921                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2922                 goto out;
2923         }
2924
2925         rv = -EINVAL;
2926         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2927                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2928                 dlm_print_lkb(lkb);
2929                 goto out;
2930         }
2931
2932         /* an lkb may still exist even though the lock is EOL'ed due to a
2933          * cancel, unlock or failed noqueue request; an app can't use these
2934          * locks; return same error as if the lkid had not been found at all
2935          */
2936
2937         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2938                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2939                 rv = -ENOENT;
2940                 goto out;
2941         }
2942
2943         /* cancel not allowed with another cancel/unlock in progress */
2944
2945         if (args->flags & DLM_LKF_CANCEL) {
2946                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2947                         goto out;
2948
2949                 if (is_overlap(lkb))
2950                         goto out;
2951
2952                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2953                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2954                         rv = -EBUSY;
2955                         goto out;
2956                 }
2957
2958                 /* there's nothing to cancel */
2959                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2960                     !lkb->lkb_wait_type) {
2961                         rv = -EBUSY;
2962                         goto out;
2963                 }
2964
2965                 switch (lkb->lkb_wait_type) {
2966                 case DLM_MSG_LOOKUP:
2967                 case DLM_MSG_REQUEST:
2968                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2969                         rv = -EBUSY;
2970                         goto out;
2971                 case DLM_MSG_UNLOCK:
2972                 case DLM_MSG_CANCEL:
2973                         goto out;
2974                 }
2975                 /* add_to_waiters() will set OVERLAP_CANCEL */
2976                 goto out_ok;
2977         }
2978
2979         /* do we need to allow a force-unlock if there's a normal unlock
2980            already in progress?  in what conditions could the normal unlock
2981            fail such that we'd want to send a force-unlock to be sure? */
2982
2983         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2984                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2985                         goto out;
2986
2987                 if (is_overlap_unlock(lkb))
2988                         goto out;
2989
2990                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2991                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2992                         rv = -EBUSY;
2993                         goto out;
2994                 }
2995
2996                 switch (lkb->lkb_wait_type) {
2997                 case DLM_MSG_LOOKUP:
2998                 case DLM_MSG_REQUEST:
2999                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
3000                         rv = -EBUSY;
3001                         goto out;
3002                 case DLM_MSG_UNLOCK:
3003                         goto out;
3004                 }
3005                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3006         }
3007
3008  out_ok:
3009         /* an overlapping op shouldn't blow away exflags from other op */
3010         lkb->lkb_exflags |= args->flags;
3011         dlm_set_sbflags_val(lkb, 0);
3012         lkb->lkb_astparam = args->astparam;
3013         rv = 0;
3014  out:
3015         switch (rv) {
3016         case 0:
3017                 break;
3018         case -EINVAL:
3019                 /* annoy the user because dlm usage is wrong */
3020                 WARN_ON(1);
3021                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3022                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3023                           args->flags, lkb->lkb_wait_type,
3024                           lkb->lkb_resource->res_name);
3025                 break;
3026         default:
3027                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3028                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3029                           args->flags, lkb->lkb_wait_type,
3030                           lkb->lkb_resource->res_name);
3031                 break;
3032         }
3033
3034         return rv;
3035 }
3036
3037 /*
3038  * Four stage 4 varieties:
3039  * do_request(), do_convert(), do_unlock(), do_cancel()
3040  * These are called on the master node for the given lock and
3041  * from the central locking logic.
3042  */
3043
3044 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3045 {
3046         int error = 0;
3047
3048         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3049                 grant_lock(r, lkb);
3050                 queue_cast(r, lkb, 0);
3051                 goto out;
3052         }
3053
3054         if (can_be_queued(lkb)) {
3055                 error = -EINPROGRESS;
3056                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3057                 goto out;
3058         }
3059
3060         error = -EAGAIN;
3061         queue_cast(r, lkb, -EAGAIN);
3062  out:
3063         return error;
3064 }
3065
3066 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3067                                int error)
3068 {
3069         switch (error) {
3070         case -EAGAIN:
3071                 if (force_blocking_asts(lkb))
3072                         send_blocking_asts_all(r, lkb);
3073                 break;
3074         case -EINPROGRESS:
3075                 send_blocking_asts(r, lkb);
3076                 break;
3077         }
3078 }
3079
3080 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3081 {
3082         int error = 0;
3083         int deadlk = 0;
3084
3085         /* changing an existing lock may allow others to be granted */
3086
3087         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3088                 grant_lock(r, lkb);
3089                 queue_cast(r, lkb, 0);
3090                 goto out;
3091         }
3092
3093         /* can_be_granted() detected that this lock would block in a conversion
3094            deadlock, so we leave it on the granted queue and return EDEADLK in
3095            the ast for the convert. */
3096
3097         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3098                 /* it's left on the granted queue */
3099                 revert_lock(r, lkb);
3100                 queue_cast(r, lkb, -EDEADLK);
3101                 error = -EDEADLK;
3102                 goto out;
3103         }
3104
3105         /* is_demoted() means the can_be_granted() above set the grmode
3106            to NL, and left us on the granted queue.  This auto-demotion
3107            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3108            now grantable.  We have to try to grant other converting locks
3109            before we try again to grant this one. */
3110
3111         if (is_demoted(lkb)) {
3112                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3113                 if (_can_be_granted(r, lkb, 1, 0)) {
3114                         grant_lock(r, lkb);
3115                         queue_cast(r, lkb, 0);
3116                         goto out;
3117                 }
3118                 /* else fall through and move to convert queue */
3119         }
3120
3121         if (can_be_queued(lkb)) {
3122                 error = -EINPROGRESS;
3123                 del_lkb(r, lkb);
3124                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3125                 goto out;
3126         }
3127
3128         error = -EAGAIN;
3129         queue_cast(r, lkb, -EAGAIN);
3130  out:
3131         return error;
3132 }
3133
3134 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3135                                int error)
3136 {
3137         switch (error) {
3138         case 0:
3139                 grant_pending_locks(r, NULL);
3140                 /* grant_pending_locks also sends basts */
3141                 break;
3142         case -EAGAIN:
3143                 if (force_blocking_asts(lkb))
3144                         send_blocking_asts_all(r, lkb);
3145                 break;
3146         case -EINPROGRESS:
3147                 send_blocking_asts(r, lkb);
3148                 break;
3149         }
3150 }
3151
3152 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3153 {
3154         remove_lock(r, lkb);
3155         queue_cast(r, lkb, -DLM_EUNLOCK);
3156         return -DLM_EUNLOCK;
3157 }
3158
3159 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3160                               int error)
3161 {
3162         grant_pending_locks(r, NULL);
3163 }
3164
3165 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3166
3167 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3168 {
3169         int error;
3170
3171         error = revert_lock(r, lkb);
3172         if (error) {
3173                 queue_cast(r, lkb, -DLM_ECANCEL);
3174                 return -DLM_ECANCEL;
3175         }
3176         return 0;
3177 }
3178
3179 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3180                               int error)
3181 {
3182         if (error)
3183                 grant_pending_locks(r, NULL);
3184 }
3185
3186 /*
3187  * Four stage 3 varieties:
3188  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3189  */
3190
3191 /* add a new lkb to a possibly new rsb, called by requesting process */
3192
3193 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3194 {
3195         int error;
3196
3197         /* set_master: sets lkb nodeid from r */
3198
3199         error = set_master(r, lkb);
3200         if (error < 0)
3201                 goto out;
3202         if (error) {
3203                 error = 0;
3204                 goto out;
3205         }
3206
3207         if (is_remote(r)) {
3208                 /* receive_request() calls do_request() on remote node */
3209                 error = send_request(r, lkb);
3210         } else {
3211                 error = do_request(r, lkb);
3212                 /* for remote locks the request_reply is sent
3213                    between do_request and do_request_effects */
3214                 do_request_effects(r, lkb, error);
3215         }
3216  out:
3217         return error;
3218 }
3219
3220 /* change some property of an existing lkb, e.g. mode */
3221
3222 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3223 {
3224         int error;
3225
3226         if (is_remote(r)) {
3227                 /* receive_convert() calls do_convert() on remote node */
3228                 error = send_convert(r, lkb);
3229         } else {
3230                 error = do_convert(r, lkb);
3231                 /* for remote locks the convert_reply is sent
3232                    between do_convert and do_convert_effects */
3233                 do_convert_effects(r, lkb, error);
3234         }
3235
3236         return error;
3237 }
3238
3239 /* remove an existing lkb from the granted queue */
3240
3241 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3242 {
3243         int error;
3244
3245         if (is_remote(r)) {
3246                 /* receive_unlock() calls do_unlock() on remote node */
3247                 error = send_unlock(r, lkb);
3248         } else {
3249                 error = do_unlock(r, lkb);
3250                 /* for remote locks the unlock_reply is sent
3251                    between do_unlock and do_unlock_effects */
3252                 do_unlock_effects(r, lkb, error);
3253         }
3254
3255         return error;
3256 }
3257
3258 /* remove an existing lkb from the convert or wait queue */
3259
3260 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3261 {
3262         int error;
3263
3264         if (is_remote(r)) {
3265                 /* receive_cancel() calls do_cancel() on remote node */
3266                 error = send_cancel(r, lkb);
3267         } else {
3268                 error = do_cancel(r, lkb);
3269                 /* for remote locks the cancel_reply is sent
3270                    between do_cancel and do_cancel_effects */
3271                 do_cancel_effects(r, lkb, error);
3272         }
3273
3274         return error;
3275 }
3276
3277 /*
3278  * Four stage 2 varieties:
3279  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3280  */
3281
3282 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3283                         const void *name, int len,
3284                         struct dlm_args *args)
3285 {
3286         struct dlm_rsb *r;
3287         int error;
3288
3289         error = validate_lock_args(ls, lkb, args);
3290         if (error)
3291                 return error;
3292
3293         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3294         if (error)
3295                 return error;
3296
3297         lock_rsb(r);
3298
3299         attach_lkb(r, lkb);
3300         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3301
3302         error = _request_lock(r, lkb);
3303
3304         unlock_rsb(r);
3305         put_rsb(r);
3306         return error;
3307 }
3308
3309 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3310                         struct dlm_args *args)
3311 {
3312         struct dlm_rsb *r;
3313         int error;
3314
3315         r = lkb->lkb_resource;
3316
3317         hold_rsb(r);
3318         lock_rsb(r);
3319
3320         error = validate_lock_args(ls, lkb, args);
3321         if (error)
3322                 goto out;
3323
3324         error = _convert_lock(r, lkb);
3325  out:
3326         unlock_rsb(r);
3327         put_rsb(r);
3328         return error;
3329 }
3330
3331 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3332                        struct dlm_args *args)
3333 {
3334         struct dlm_rsb *r;
3335         int error;
3336
3337         r = lkb->lkb_resource;
3338
3339         hold_rsb(r);
3340         lock_rsb(r);
3341
3342         error = validate_unlock_args(lkb, args);
3343         if (error)
3344                 goto out;
3345
3346         error = _unlock_lock(r, lkb);
3347  out:
3348         unlock_rsb(r);
3349         put_rsb(r);
3350         return error;
3351 }
3352
3353 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3354                        struct dlm_args *args)
3355 {
3356         struct dlm_rsb *r;
3357         int error;
3358
3359         r = lkb->lkb_resource;
3360
3361         hold_rsb(r);
3362         lock_rsb(r);
3363
3364         error = validate_unlock_args(lkb, args);
3365         if (error)
3366                 goto out;
3367
3368         error = _cancel_lock(r, lkb);
3369  out:
3370         unlock_rsb(r);
3371         put_rsb(r);
3372         return error;
3373 }
3374
3375 /*
3376  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3377  */
3378
3379 int dlm_lock(dlm_lockspace_t *lockspace,
3380              int mode,
3381              struct dlm_lksb *lksb,
3382              uint32_t flags,
3383              const void *name,
3384              unsigned int namelen,
3385              uint32_t parent_lkid,
3386              void (*ast) (void *astarg),
3387              void *astarg,
3388              void (*bast) (void *astarg, int mode))
3389 {
3390         struct dlm_ls *ls;
3391         struct dlm_lkb *lkb;
3392         struct dlm_args args;
3393         int error, convert = flags & DLM_LKF_CONVERT;
3394
3395         ls = dlm_find_lockspace_local(lockspace);
3396         if (!ls)
3397                 return -EINVAL;
3398
3399         dlm_lock_recovery(ls);
3400
3401         if (convert)
3402                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3403         else
3404                 error = create_lkb(ls, &lkb);
3405
3406         if (error)
3407                 goto out;
3408
3409         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3410
3411         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3412                               &args);
3413         if (error)
3414                 goto out_put;
3415
3416         if (convert)
3417                 error = convert_lock(ls, lkb, &args);
3418         else
3419                 error = request_lock(ls, lkb, name, namelen, &args);
3420
3421         if (error == -EINPROGRESS)
3422                 error = 0;
3423  out_put:
3424         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3425
3426         if (convert || error)
3427                 __put_lkb(ls, lkb);
3428         if (error == -EAGAIN || error == -EDEADLK)
3429                 error = 0;
3430  out:
3431         dlm_unlock_recovery(ls);
3432         dlm_put_lockspace(ls);
3433         return error;
3434 }
3435
3436 int dlm_unlock(dlm_lockspace_t *lockspace,
3437                uint32_t lkid,
3438                uint32_t flags,
3439                struct dlm_lksb *lksb,
3440                void *astarg)
3441 {
3442         struct dlm_ls *ls;
3443         struct dlm_lkb *lkb;
3444         struct dlm_args args;
3445         int error;
3446
3447         ls = dlm_find_lockspace_local(lockspace);
3448         if (!ls)
3449                 return -EINVAL;
3450
3451         dlm_lock_recovery(ls);
3452
3453         error = find_lkb(ls, lkid, &lkb);
3454         if (error)
3455                 goto out;
3456
3457         trace_dlm_unlock_start(ls, lkb, flags);
3458
3459         error = set_unlock_args(flags, astarg, &args);
3460         if (error)
3461                 goto out_put;
3462
3463         if (flags & DLM_LKF_CANCEL)
3464                 error = cancel_lock(ls, lkb, &args);
3465         else
3466                 error = unlock_lock(ls, lkb, &args);
3467
3468         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3469                 error = 0;
3470         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3471                 error = 0;
3472  out_put:
3473         trace_dlm_unlock_end(ls, lkb, flags, error);
3474
3475         dlm_put_lkb(lkb);
3476  out:
3477         dlm_unlock_recovery(ls);
3478         dlm_put_lockspace(ls);
3479         return error;
3480 }
3481
3482 /*
3483  * send/receive routines for remote operations and replies
3484  *
3485  * send_args
3486  * send_common
3487  * send_request                 receive_request
3488  * send_convert                 receive_convert
3489  * send_unlock                  receive_unlock
3490  * send_cancel                  receive_cancel
3491  * send_grant                   receive_grant
3492  * send_bast                    receive_bast
3493  * send_lookup                  receive_lookup
3494  * send_remove                  receive_remove
3495  *
3496  *                              send_common_reply
3497  * receive_request_reply        send_request_reply
3498  * receive_convert_reply        send_convert_reply
3499  * receive_unlock_reply         send_unlock_reply
3500  * receive_cancel_reply         send_cancel_reply
3501  * receive_lookup_reply         send_lookup_reply
3502  */
3503
3504 static int _create_message(struct dlm_ls *ls, int mb_len,
3505                            int to_nodeid, int mstype,
3506                            struct dlm_message **ms_ret,
3507                            struct dlm_mhandle **mh_ret)
3508 {
3509         struct dlm_message *ms;
3510         struct dlm_mhandle *mh;
3511         char *mb;
3512
3513         /* get_buffer gives us a message handle (mh) that we need to
3514            pass into midcomms_commit and a message buffer (mb) that we
3515            write our data into */
3516
3517         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3518         if (!mh)
3519                 return -ENOBUFS;
3520
3521         ms = (struct dlm_message *) mb;
3522
3523         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3524         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3525         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3526         ms->m_header.h_length = cpu_to_le16(mb_len);
3527         ms->m_header.h_cmd = DLM_MSG;
3528
3529         ms->m_type = cpu_to_le32(mstype);
3530
3531         *mh_ret = mh;
3532         *ms_ret = ms;
3533         return 0;
3534 }
3535
3536 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3537                           int to_nodeid, int mstype,
3538                           struct dlm_message **ms_ret,
3539                           struct dlm_mhandle **mh_ret)
3540 {
3541         int mb_len = sizeof(struct dlm_message);
3542
3543         switch (mstype) {
3544         case DLM_MSG_REQUEST:
3545         case DLM_MSG_LOOKUP:
3546         case DLM_MSG_REMOVE:
3547                 mb_len += r->res_length;
3548                 break;
3549         case DLM_MSG_CONVERT:
3550         case DLM_MSG_UNLOCK:
3551         case DLM_MSG_REQUEST_REPLY:
3552         case DLM_MSG_CONVERT_REPLY:
3553         case DLM_MSG_GRANT:
3554                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3555                         mb_len += r->res_ls->ls_lvblen;
3556                 break;
3557         }
3558
3559         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3560                                ms_ret, mh_ret);
3561 }
3562
3563 /* further lowcomms enhancements or alternate implementations may make
3564    the return value from this function useful at some point */
3565
3566 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3567                         const void *name, int namelen)
3568 {
3569         dlm_midcomms_commit_mhandle(mh, name, namelen);
3570         return 0;
3571 }
3572
3573 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3574                       struct dlm_message *ms)
3575 {
3576         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3577         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3578         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3579         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3580         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3581         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3582         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3583         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3584         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3585         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3586         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3587         ms->m_hash     = cpu_to_le32(r->res_hash);
3588
3589         /* m_result and m_bastmode are set from function args,
3590            not from lkb fields */
3591
3592         if (lkb->lkb_bastfn)
3593                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3594         if (lkb->lkb_astfn)
3595                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3596
3597         /* compare with switch in create_message; send_remove() doesn't
3598            use send_args() */
3599
3600         switch (ms->m_type) {
3601         case cpu_to_le32(DLM_MSG_REQUEST):
3602         case cpu_to_le32(DLM_MSG_LOOKUP):
3603                 memcpy(ms->m_extra, r->res_name, r->res_length);
3604                 break;
3605         case cpu_to_le32(DLM_MSG_CONVERT):
3606         case cpu_to_le32(DLM_MSG_UNLOCK):
3607         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3608         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3609         case cpu_to_le32(DLM_MSG_GRANT):
3610                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3611                         break;
3612                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3613                 break;
3614         }
3615 }
3616
3617 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3618 {
3619         struct dlm_message *ms;
3620         struct dlm_mhandle *mh;
3621         int to_nodeid, error;
3622
3623         to_nodeid = r->res_nodeid;
3624
3625         error = add_to_waiters(lkb, mstype, to_nodeid);
3626         if (error)
3627                 return error;
3628
3629         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3630         if (error)
3631                 goto fail;
3632
3633         send_args(r, lkb, ms);
3634
3635         error = send_message(mh, ms, r->res_name, r->res_length);
3636         if (error)
3637                 goto fail;
3638         return 0;
3639
3640  fail:
3641         remove_from_waiters(lkb, msg_reply_type(mstype));
3642         return error;
3643 }
3644
3645 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 {
3647         return send_common(r, lkb, DLM_MSG_REQUEST);
3648 }
3649
3650 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3651 {
3652         int error;
3653
3654         error = send_common(r, lkb, DLM_MSG_CONVERT);
3655
3656         /* down conversions go without a reply from the master */
3657         if (!error && down_conversion(lkb)) {
3658                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3659                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3660                 r->res_ls->ls_local_ms.m_result = 0;
3661                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3662         }
3663
3664         return error;
3665 }
3666
3667 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3668    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3669    that the master is still correct. */
3670
3671 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673         return send_common(r, lkb, DLM_MSG_UNLOCK);
3674 }
3675
3676 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3677 {
3678         return send_common(r, lkb, DLM_MSG_CANCEL);
3679 }
3680
3681 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3682 {
3683         struct dlm_message *ms;
3684         struct dlm_mhandle *mh;
3685         int to_nodeid, error;
3686
3687         to_nodeid = lkb->lkb_nodeid;
3688
3689         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3690         if (error)
3691                 goto out;
3692
3693         send_args(r, lkb, ms);
3694
3695         ms->m_result = 0;
3696
3697         error = send_message(mh, ms, r->res_name, r->res_length);
3698  out:
3699         return error;
3700 }
3701
3702 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3703 {
3704         struct dlm_message *ms;
3705         struct dlm_mhandle *mh;
3706         int to_nodeid, error;
3707
3708         to_nodeid = lkb->lkb_nodeid;
3709
3710         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3711         if (error)
3712                 goto out;
3713
3714         send_args(r, lkb, ms);
3715
3716         ms->m_bastmode = cpu_to_le32(mode);
3717
3718         error = send_message(mh, ms, r->res_name, r->res_length);
3719  out:
3720         return error;
3721 }
3722
3723 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3724 {
3725         struct dlm_message *ms;
3726         struct dlm_mhandle *mh;
3727         int to_nodeid, error;
3728
3729         to_nodeid = dlm_dir_nodeid(r);
3730
3731         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3732         if (error)
3733                 return error;
3734
3735         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3736         if (error)
3737                 goto fail;
3738
3739         send_args(r, lkb, ms);
3740
3741         error = send_message(mh, ms, r->res_name, r->res_length);
3742         if (error)
3743                 goto fail;
3744         return 0;
3745
3746  fail:
3747         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3748         return error;
3749 }
3750
3751 static int send_remove(struct dlm_rsb *r)
3752 {
3753         struct dlm_message *ms;
3754         struct dlm_mhandle *mh;
3755         int to_nodeid, error;
3756
3757         to_nodeid = dlm_dir_nodeid(r);
3758
3759         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3760         if (error)
3761                 goto out;
3762
3763         memcpy(ms->m_extra, r->res_name, r->res_length);
3764         ms->m_hash = cpu_to_le32(r->res_hash);
3765
3766         error = send_message(mh, ms, r->res_name, r->res_length);
3767  out:
3768         return error;
3769 }
3770
3771 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3772                              int mstype, int rv)
3773 {
3774         struct dlm_message *ms;
3775         struct dlm_mhandle *mh;
3776         int to_nodeid, error;
3777
3778         to_nodeid = lkb->lkb_nodeid;
3779
3780         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3781         if (error)
3782                 goto out;
3783
3784         send_args(r, lkb, ms);
3785
3786         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3787
3788         error = send_message(mh, ms, r->res_name, r->res_length);
3789  out:
3790         return error;
3791 }
3792
3793 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3794 {
3795         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3796 }
3797
3798 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3799 {
3800         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3801 }
3802
3803 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3804 {
3805         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3806 }
3807
3808 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3811 }
3812
3813 static int send_lookup_reply(struct dlm_ls *ls,
3814                              const struct dlm_message *ms_in, int ret_nodeid,
3815                              int rv)
3816 {
3817         struct dlm_rsb *r = &ls->ls_local_rsb;
3818         struct dlm_message *ms;
3819         struct dlm_mhandle *mh;
3820         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3821
3822         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3823         if (error)
3824                 goto out;
3825
3826         ms->m_lkid = ms_in->m_lkid;
3827         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3828         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3829
3830         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3831  out:
3832         return error;
3833 }
3834
3835 /* which args we save from a received message depends heavily on the type
3836    of message, unlike the send side where we can safely send everything about
3837    the lkb for any type of message */
3838
3839 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3840 {
3841         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3842         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3843         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3844 }
3845
3846 static void receive_flags_reply(struct dlm_lkb *lkb,
3847                                 const struct dlm_message *ms,
3848                                 bool local)
3849 {
3850         if (local)
3851                 return;
3852
3853         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3854         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3855 }
3856
3857 static int receive_extralen(const struct dlm_message *ms)
3858 {
3859         return (le16_to_cpu(ms->m_header.h_length) -
3860                 sizeof(struct dlm_message));
3861 }
3862
3863 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3864                        const struct dlm_message *ms)
3865 {
3866         int len;
3867
3868         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3869                 if (!lkb->lkb_lvbptr)
3870                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3871                 if (!lkb->lkb_lvbptr)
3872                         return -ENOMEM;
3873                 len = receive_extralen(ms);
3874                 if (len > ls->ls_lvblen)
3875                         len = ls->ls_lvblen;
3876                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3877         }
3878         return 0;
3879 }
3880
3881 static void fake_bastfn(void *astparam, int mode)
3882 {
3883         log_print("fake_bastfn should not be called");
3884 }
3885
3886 static void fake_astfn(void *astparam)
3887 {
3888         log_print("fake_astfn should not be called");
3889 }
3890
3891 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3892                                 const struct dlm_message *ms)
3893 {
3894         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3895         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3896         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3897         lkb->lkb_grmode = DLM_LOCK_IV;
3898         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3899
3900         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3901         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3902
3903         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3904                 /* lkb was just created so there won't be an lvb yet */
3905                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3906                 if (!lkb->lkb_lvbptr)
3907                         return -ENOMEM;
3908         }
3909
3910         return 0;
3911 }
3912
3913 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3914                                 const struct dlm_message *ms)
3915 {
3916         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3917                 return -EBUSY;
3918
3919         if (receive_lvb(ls, lkb, ms))
3920                 return -ENOMEM;
3921
3922         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3923         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3924
3925         return 0;
3926 }
3927
3928 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3929                                const struct dlm_message *ms)
3930 {
3931         if (receive_lvb(ls, lkb, ms))
3932                 return -ENOMEM;
3933         return 0;
3934 }
3935
3936 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3937    uses to send a reply and that the remote end uses to process the reply. */
3938
3939 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3940 {
3941         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3942         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3943         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3944 }
3945
3946 /* This is called after the rsb is locked so that we can safely inspect
3947    fields in the lkb. */
3948
3949 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3950 {
3951         int from = le32_to_cpu(ms->m_header.h_nodeid);
3952         int error = 0;
3953
3954         /* currently mixing of user/kernel locks are not supported */
3955         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3956             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3957                 log_error(lkb->lkb_resource->res_ls,
3958                           "got user dlm message for a kernel lock");
3959                 error = -EINVAL;
3960                 goto out;
3961         }
3962
3963         switch (ms->m_type) {
3964         case cpu_to_le32(DLM_MSG_CONVERT):
3965         case cpu_to_le32(DLM_MSG_UNLOCK):
3966         case cpu_to_le32(DLM_MSG_CANCEL):
3967                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3968                         error = -EINVAL;
3969                 break;
3970
3971         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3972         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3973         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3974         case cpu_to_le32(DLM_MSG_GRANT):
3975         case cpu_to_le32(DLM_MSG_BAST):
3976                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3977                         error = -EINVAL;
3978                 break;
3979
3980         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3981                 if (!is_process_copy(lkb))
3982                         error = -EINVAL;
3983                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3984                         error = -EINVAL;
3985                 break;
3986
3987         default:
3988                 error = -EINVAL;
3989         }
3990
3991 out:
3992         if (error)
3993                 log_error(lkb->lkb_resource->res_ls,
3994                           "ignore invalid message %d from %d %x %x %x %d",
3995                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3996                           lkb->lkb_remid, dlm_iflags_val(lkb),
3997                           lkb->lkb_nodeid);
3998         return error;
3999 }
4000
4001 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
4002 {
4003         struct dlm_lkb *lkb;
4004         struct dlm_rsb *r;
4005         int from_nodeid;
4006         int error, namelen = 0;
4007
4008         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4009
4010         error = create_lkb(ls, &lkb);
4011         if (error)
4012                 goto fail;
4013
4014         receive_flags(lkb, ms);
4015         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4016         error = receive_request_args(ls, lkb, ms);
4017         if (error) {
4018                 __put_lkb(ls, lkb);
4019                 goto fail;
4020         }
4021
4022         /* The dir node is the authority on whether we are the master
4023            for this rsb or not, so if the master sends us a request, we should
4024            recreate the rsb if we've destroyed it.   This race happens when we
4025            send a remove message to the dir node at the same time that the dir
4026            node sends us a request for the rsb. */
4027
4028         namelen = receive_extralen(ms);
4029
4030         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4031                          R_RECEIVE_REQUEST, &r);
4032         if (error) {
4033                 __put_lkb(ls, lkb);
4034                 goto fail;
4035         }
4036
4037         lock_rsb(r);
4038
4039         if (r->res_master_nodeid != dlm_our_nodeid()) {
4040                 error = validate_master_nodeid(ls, r, from_nodeid);
4041                 if (error) {
4042                         unlock_rsb(r);
4043                         put_rsb(r);
4044                         __put_lkb(ls, lkb);
4045                         goto fail;
4046                 }
4047         }
4048
4049         attach_lkb(r, lkb);
4050         error = do_request(r, lkb);
4051         send_request_reply(r, lkb, error);
4052         do_request_effects(r, lkb, error);
4053
4054         unlock_rsb(r);
4055         put_rsb(r);
4056
4057         if (error == -EINPROGRESS)
4058                 error = 0;
4059         if (error)
4060                 dlm_put_lkb(lkb);
4061         return 0;
4062
4063  fail:
4064         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4065            and do this receive_request again from process_lookup_list once
4066            we get the lookup reply.  This would avoid a many repeated
4067            ENOTBLK request failures when the lookup reply designating us
4068            as master is delayed. */
4069
4070         if (error != -ENOTBLK) {
4071                 log_limit(ls, "receive_request %x from %d %d",
4072                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
4073         }
4074
4075         setup_local_lkb(ls, ms);
4076         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4077         return error;
4078 }
4079
4080 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4081 {
4082         struct dlm_lkb *lkb;
4083         struct dlm_rsb *r;
4084         int error, reply = 1;
4085
4086         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4087         if (error)
4088                 goto fail;
4089
4090         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4091                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4092                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4093                           (unsigned long long)lkb->lkb_recover_seq,
4094                           le32_to_cpu(ms->m_header.h_nodeid),
4095                           le32_to_cpu(ms->m_lkid));
4096                 error = -ENOENT;
4097                 dlm_put_lkb(lkb);
4098                 goto fail;
4099         }
4100
4101         r = lkb->lkb_resource;
4102
4103         hold_rsb(r);
4104         lock_rsb(r);
4105
4106         error = validate_message(lkb, ms);
4107         if (error)
4108                 goto out;
4109
4110         receive_flags(lkb, ms);
4111
4112         error = receive_convert_args(ls, lkb, ms);
4113         if (error) {
4114                 send_convert_reply(r, lkb, error);
4115                 goto out;
4116         }
4117
4118         reply = !down_conversion(lkb);
4119
4120         error = do_convert(r, lkb);
4121         if (reply)
4122                 send_convert_reply(r, lkb, error);
4123         do_convert_effects(r, lkb, error);
4124  out:
4125         unlock_rsb(r);
4126         put_rsb(r);
4127         dlm_put_lkb(lkb);
4128         return 0;
4129
4130  fail:
4131         setup_local_lkb(ls, ms);
4132         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4133         return error;
4134 }
4135
4136 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4137 {
4138         struct dlm_lkb *lkb;
4139         struct dlm_rsb *r;
4140         int error;
4141
4142         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4143         if (error)
4144                 goto fail;
4145
4146         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4147                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4148                           lkb->lkb_id, lkb->lkb_remid,
4149                           le32_to_cpu(ms->m_header.h_nodeid),
4150                           le32_to_cpu(ms->m_lkid));
4151                 error = -ENOENT;
4152                 dlm_put_lkb(lkb);
4153                 goto fail;
4154         }
4155
4156         r = lkb->lkb_resource;
4157
4158         hold_rsb(r);
4159         lock_rsb(r);
4160
4161         error = validate_message(lkb, ms);
4162         if (error)
4163                 goto out;
4164
4165         receive_flags(lkb, ms);
4166
4167         error = receive_unlock_args(ls, lkb, ms);
4168         if (error) {
4169                 send_unlock_reply(r, lkb, error);
4170                 goto out;
4171         }
4172
4173         error = do_unlock(r, lkb);
4174         send_unlock_reply(r, lkb, error);
4175         do_unlock_effects(r, lkb, error);
4176  out:
4177         unlock_rsb(r);
4178         put_rsb(r);
4179         dlm_put_lkb(lkb);
4180         return 0;
4181
4182  fail:
4183         setup_local_lkb(ls, ms);
4184         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4185         return error;
4186 }
4187
4188 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4189 {
4190         struct dlm_lkb *lkb;
4191         struct dlm_rsb *r;
4192         int error;
4193
4194         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4195         if (error)
4196                 goto fail;
4197
4198         receive_flags(lkb, ms);
4199
4200         r = lkb->lkb_resource;
4201
4202         hold_rsb(r);
4203         lock_rsb(r);
4204
4205         error = validate_message(lkb, ms);
4206         if (error)
4207                 goto out;
4208
4209         error = do_cancel(r, lkb);
4210         send_cancel_reply(r, lkb, error);
4211         do_cancel_effects(r, lkb, error);
4212  out:
4213         unlock_rsb(r);
4214         put_rsb(r);
4215         dlm_put_lkb(lkb);
4216         return 0;
4217
4218  fail:
4219         setup_local_lkb(ls, ms);
4220         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4221         return error;
4222 }
4223
4224 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4225 {
4226         struct dlm_lkb *lkb;
4227         struct dlm_rsb *r;
4228         int error;
4229
4230         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4231         if (error)
4232                 return error;
4233
4234         r = lkb->lkb_resource;
4235
4236         hold_rsb(r);
4237         lock_rsb(r);
4238
4239         error = validate_message(lkb, ms);
4240         if (error)
4241                 goto out;
4242
4243         receive_flags_reply(lkb, ms, false);
4244         if (is_altmode(lkb))
4245                 munge_altmode(lkb, ms);
4246         grant_lock_pc(r, lkb, ms);
4247         queue_cast(r, lkb, 0);
4248  out:
4249         unlock_rsb(r);
4250         put_rsb(r);
4251         dlm_put_lkb(lkb);
4252         return 0;
4253 }
4254
4255 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4256 {
4257         struct dlm_lkb *lkb;
4258         struct dlm_rsb *r;
4259         int error;
4260
4261         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4262         if (error)
4263                 return error;
4264
4265         r = lkb->lkb_resource;
4266
4267         hold_rsb(r);
4268         lock_rsb(r);
4269
4270         error = validate_message(lkb, ms);
4271         if (error)
4272                 goto out;
4273
4274         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4275         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4276  out:
4277         unlock_rsb(r);
4278         put_rsb(r);
4279         dlm_put_lkb(lkb);
4280         return 0;
4281 }
4282
4283 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4284 {
4285         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4286
4287         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4288         our_nodeid = dlm_our_nodeid();
4289
4290         len = receive_extralen(ms);
4291
4292         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4293                                   &ret_nodeid, NULL);
4294
4295         /* Optimization: we're master so treat lookup as a request */
4296         if (!error && ret_nodeid == our_nodeid) {
4297                 receive_request(ls, ms);
4298                 return;
4299         }
4300         send_lookup_reply(ls, ms, ret_nodeid, error);
4301 }
4302
4303 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4304 {
4305         char name[DLM_RESNAME_MAXLEN+1];
4306         struct dlm_rsb *r;
4307         int rv, len, dir_nodeid, from_nodeid;
4308
4309         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4310
4311         len = receive_extralen(ms);
4312
4313         if (len > DLM_RESNAME_MAXLEN) {
4314                 log_error(ls, "receive_remove from %d bad len %d",
4315                           from_nodeid, len);
4316                 return;
4317         }
4318
4319         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4320         if (dir_nodeid != dlm_our_nodeid()) {
4321                 log_error(ls, "receive_remove from %d bad nodeid %d",
4322                           from_nodeid, dir_nodeid);
4323                 return;
4324         }
4325
4326         /* Look for name in rsb toss state, if it's there, kill it.
4327          * If it's in non toss state, it's being used, and we should ignore this
4328          * message.  This is an expected race between the dir node sending a
4329          * request to the master node at the same time as the master node sends
4330          * a remove to the dir node.  The resolution to that race is for the
4331          * dir node to ignore the remove message, and the master node to
4332          * recreate the master rsb when it gets a request from the dir node for
4333          * an rsb it doesn't have.
4334          */
4335
4336         memset(name, 0, sizeof(name));
4337         memcpy(name, ms->m_extra, len);
4338
4339         write_lock_bh(&ls->ls_rsbtbl_lock);
4340
4341         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4342         if (rv) {
4343                 /* should not happen */
4344                 log_error(ls, "%s from %d not found %s", __func__,
4345                           from_nodeid, name);
4346                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4347                 return;
4348         }
4349
4350         if (!rsb_flag(r, RSB_TOSS)) {
4351                 if (r->res_master_nodeid != from_nodeid) {
4352                         /* should not happen */
4353                         log_error(ls, "receive_remove keep from %d master %d",
4354                                   from_nodeid, r->res_master_nodeid);
4355                         dlm_print_rsb(r);
4356                         write_unlock_bh(&ls->ls_rsbtbl_lock);
4357                         return;
4358                 }
4359
4360                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4361                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4362                           name);
4363                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4364                 return;
4365         }
4366
4367         if (r->res_master_nodeid != from_nodeid) {
4368                 log_error(ls, "receive_remove toss from %d master %d",
4369                           from_nodeid, r->res_master_nodeid);
4370                 dlm_print_rsb(r);
4371                 write_unlock_bh(&ls->ls_rsbtbl_lock);
4372                 return;
4373         }
4374
4375         list_del(&r->res_rsbs_list);
4376         rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4377                                dlm_rhash_rsb_params);
4378         write_unlock_bh(&ls->ls_rsbtbl_lock);
4379
4380         free_toss_rsb(r);
4381 }
4382
4383 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4384 {
4385         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4386 }
4387
4388 static int receive_request_reply(struct dlm_ls *ls,
4389                                  const struct dlm_message *ms)
4390 {
4391         struct dlm_lkb *lkb;
4392         struct dlm_rsb *r;
4393         int error, mstype, result;
4394         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4395
4396         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4397         if (error)
4398                 return error;
4399
4400         r = lkb->lkb_resource;
4401         hold_rsb(r);
4402         lock_rsb(r);
4403
4404         error = validate_message(lkb, ms);
4405         if (error)
4406                 goto out;
4407
4408         mstype = lkb->lkb_wait_type;
4409         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4410         if (error) {
4411                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4412                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4413                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4414                 dlm_dump_rsb(r);
4415                 goto out;
4416         }
4417
4418         /* Optimization: the dir node was also the master, so it took our
4419            lookup as a request and sent request reply instead of lookup reply */
4420         if (mstype == DLM_MSG_LOOKUP) {
4421                 r->res_master_nodeid = from_nodeid;
4422                 r->res_nodeid = from_nodeid;
4423                 lkb->lkb_nodeid = from_nodeid;
4424         }
4425
4426         /* this is the value returned from do_request() on the master */
4427         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4428
4429         switch (result) {
4430         case -EAGAIN:
4431                 /* request would block (be queued) on remote master */
4432                 queue_cast(r, lkb, -EAGAIN);
4433                 confirm_master(r, -EAGAIN);
4434                 unhold_lkb(lkb); /* undoes create_lkb() */
4435                 break;
4436
4437         case -EINPROGRESS:
4438         case 0:
4439                 /* request was queued or granted on remote master */
4440                 receive_flags_reply(lkb, ms, false);
4441                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4442                 if (is_altmode(lkb))
4443                         munge_altmode(lkb, ms);
4444                 if (result) {
4445                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4446                 } else {
4447                         grant_lock_pc(r, lkb, ms);
4448                         queue_cast(r, lkb, 0);
4449                 }
4450                 confirm_master(r, result);
4451                 break;
4452
4453         case -EBADR:
4454         case -ENOTBLK:
4455                 /* find_rsb failed to find rsb or rsb wasn't master */
4456                 log_limit(ls, "receive_request_reply %x from %d %d "
4457                           "master %d dir %d first %x %s", lkb->lkb_id,
4458                           from_nodeid, result, r->res_master_nodeid,
4459                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4460
4461                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4462                     r->res_master_nodeid != dlm_our_nodeid()) {
4463                         /* cause _request_lock->set_master->send_lookup */
4464                         r->res_master_nodeid = 0;
4465                         r->res_nodeid = -1;
4466                         lkb->lkb_nodeid = -1;
4467                 }
4468
4469                 if (is_overlap(lkb)) {
4470                         /* we'll ignore error in cancel/unlock reply */
4471                         queue_cast_overlap(r, lkb);
4472                         confirm_master(r, result);
4473                         unhold_lkb(lkb); /* undoes create_lkb() */
4474                 } else {
4475                         _request_lock(r, lkb);
4476
4477                         if (r->res_master_nodeid == dlm_our_nodeid())
4478                                 confirm_master(r, 0);
4479                 }
4480                 break;
4481
4482         default:
4483                 log_error(ls, "receive_request_reply %x error %d",
4484                           lkb->lkb_id, result);
4485         }
4486
4487         if ((result == 0 || result == -EINPROGRESS) &&
4488             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4489                 log_debug(ls, "receive_request_reply %x result %d unlock",
4490                           lkb->lkb_id, result);
4491                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4492                 send_unlock(r, lkb);
4493         } else if ((result == -EINPROGRESS) &&
4494                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4495                                       &lkb->lkb_iflags)) {
4496                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4497                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4498                 send_cancel(r, lkb);
4499         } else {
4500                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4501                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4502         }
4503  out:
4504         unlock_rsb(r);
4505         put_rsb(r);
4506         dlm_put_lkb(lkb);
4507         return 0;
4508 }
4509
4510 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4511                                     const struct dlm_message *ms, bool local)
4512 {
4513         /* this is the value returned from do_convert() on the master */
4514         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4515         case -EAGAIN:
4516                 /* convert would block (be queued) on remote master */
4517                 queue_cast(r, lkb, -EAGAIN);
4518                 break;
4519
4520         case -EDEADLK:
4521                 receive_flags_reply(lkb, ms, local);
4522                 revert_lock_pc(r, lkb);
4523                 queue_cast(r, lkb, -EDEADLK);
4524                 break;
4525
4526         case -EINPROGRESS:
4527                 /* convert was queued on remote master */
4528                 receive_flags_reply(lkb, ms, local);
4529                 if (is_demoted(lkb))
4530                         munge_demoted(lkb);
4531                 del_lkb(r, lkb);
4532                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4533                 break;
4534
4535         case 0:
4536                 /* convert was granted on remote master */
4537                 receive_flags_reply(lkb, ms, local);
4538                 if (is_demoted(lkb))
4539                         munge_demoted(lkb);
4540                 grant_lock_pc(r, lkb, ms);
4541                 queue_cast(r, lkb, 0);
4542                 break;
4543
4544         default:
4545                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4546                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4547                           le32_to_cpu(ms->m_lkid),
4548                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4549                 dlm_print_rsb(r);
4550                 dlm_print_lkb(lkb);
4551         }
4552 }
4553
4554 static void _receive_convert_reply(struct dlm_lkb *lkb,
4555                                    const struct dlm_message *ms, bool local)
4556 {
4557         struct dlm_rsb *r = lkb->lkb_resource;
4558         int error;
4559
4560         hold_rsb(r);
4561         lock_rsb(r);
4562
4563         error = validate_message(lkb, ms);
4564         if (error)
4565                 goto out;
4566
4567         error = remove_from_waiters_ms(lkb, ms, local);
4568         if (error)
4569                 goto out;
4570
4571         __receive_convert_reply(r, lkb, ms, local);
4572  out:
4573         unlock_rsb(r);
4574         put_rsb(r);
4575 }
4576
4577 static int receive_convert_reply(struct dlm_ls *ls,
4578                                  const struct dlm_message *ms)
4579 {
4580         struct dlm_lkb *lkb;
4581         int error;
4582
4583         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4584         if (error)
4585                 return error;
4586
4587         _receive_convert_reply(lkb, ms, false);
4588         dlm_put_lkb(lkb);
4589         return 0;
4590 }
4591
4592 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4593                                   const struct dlm_message *ms, bool local)
4594 {
4595         struct dlm_rsb *r = lkb->lkb_resource;
4596         int error;
4597
4598         hold_rsb(r);
4599         lock_rsb(r);
4600
4601         error = validate_message(lkb, ms);
4602         if (error)
4603                 goto out;
4604
4605         error = remove_from_waiters_ms(lkb, ms, local);
4606         if (error)
4607                 goto out;
4608
4609         /* this is the value returned from do_unlock() on the master */
4610
4611         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4612         case -DLM_EUNLOCK:
4613                 receive_flags_reply(lkb, ms, local);
4614                 remove_lock_pc(r, lkb);
4615                 queue_cast(r, lkb, -DLM_EUNLOCK);
4616                 break;
4617         case -ENOENT:
4618                 break;
4619         default:
4620                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4621                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4622         }
4623  out:
4624         unlock_rsb(r);
4625         put_rsb(r);
4626 }
4627
4628 static int receive_unlock_reply(struct dlm_ls *ls,
4629                                 const struct dlm_message *ms)
4630 {
4631         struct dlm_lkb *lkb;
4632         int error;
4633
4634         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4635         if (error)
4636                 return error;
4637
4638         _receive_unlock_reply(lkb, ms, false);
4639         dlm_put_lkb(lkb);
4640         return 0;
4641 }
4642
4643 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4644                                   const struct dlm_message *ms, bool local)
4645 {
4646         struct dlm_rsb *r = lkb->lkb_resource;
4647         int error;
4648
4649         hold_rsb(r);
4650         lock_rsb(r);
4651
4652         error = validate_message(lkb, ms);
4653         if (error)
4654                 goto out;
4655
4656         error = remove_from_waiters_ms(lkb, ms, local);
4657         if (error)
4658                 goto out;
4659
4660         /* this is the value returned from do_cancel() on the master */
4661
4662         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4663         case -DLM_ECANCEL:
4664                 receive_flags_reply(lkb, ms, local);
4665                 revert_lock_pc(r, lkb);
4666                 queue_cast(r, lkb, -DLM_ECANCEL);
4667                 break;
4668         case 0:
4669                 break;
4670         default:
4671                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4672                           lkb->lkb_id,
4673                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4674         }
4675  out:
4676         unlock_rsb(r);
4677         put_rsb(r);
4678 }
4679
4680 static int receive_cancel_reply(struct dlm_ls *ls,
4681                                 const struct dlm_message *ms)
4682 {
4683         struct dlm_lkb *lkb;
4684         int error;
4685
4686         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4687         if (error)
4688                 return error;
4689
4690         _receive_cancel_reply(lkb, ms, false);
4691         dlm_put_lkb(lkb);
4692         return 0;
4693 }
4694
4695 static void receive_lookup_reply(struct dlm_ls *ls,
4696                                  const struct dlm_message *ms)
4697 {
4698         struct dlm_lkb *lkb;
4699         struct dlm_rsb *r;
4700         int error, ret_nodeid;
4701         int do_lookup_list = 0;
4702
4703         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4704         if (error) {
4705                 log_error(ls, "%s no lkid %x", __func__,
4706                           le32_to_cpu(ms->m_lkid));
4707                 return;
4708         }
4709
4710         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4711            FIXME: will a non-zero error ever be returned? */
4712
4713         r = lkb->lkb_resource;
4714         hold_rsb(r);
4715         lock_rsb(r);
4716
4717         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4718         if (error)
4719                 goto out;
4720
4721         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4722
4723         /* We sometimes receive a request from the dir node for this
4724            rsb before we've received the dir node's loookup_reply for it.
4725            The request from the dir node implies we're the master, so we set
4726            ourself as master in receive_request_reply, and verify here that
4727            we are indeed the master. */
4728
4729         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4730                 /* This should never happen */
4731                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4732                           "master %d dir %d our %d first %x %s",
4733                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4734                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4735                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4736         }
4737
4738         if (ret_nodeid == dlm_our_nodeid()) {
4739                 r->res_master_nodeid = ret_nodeid;
4740                 r->res_nodeid = 0;
4741                 do_lookup_list = 1;
4742                 r->res_first_lkid = 0;
4743         } else if (ret_nodeid == -1) {
4744                 /* the remote node doesn't believe it's the dir node */
4745                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4746                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4747                 r->res_master_nodeid = 0;
4748                 r->res_nodeid = -1;
4749                 lkb->lkb_nodeid = -1;
4750         } else {
4751                 /* set_master() will set lkb_nodeid from r */
4752                 r->res_master_nodeid = ret_nodeid;
4753                 r->res_nodeid = ret_nodeid;
4754         }
4755
4756         if (is_overlap(lkb)) {
4757                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4758                           lkb->lkb_id, dlm_iflags_val(lkb));
4759                 queue_cast_overlap(r, lkb);
4760                 unhold_lkb(lkb); /* undoes create_lkb() */
4761                 goto out_list;
4762         }
4763
4764         _request_lock(r, lkb);
4765
4766  out_list:
4767         if (do_lookup_list)
4768                 process_lookup_list(r);
4769  out:
4770         unlock_rsb(r);
4771         put_rsb(r);
4772         dlm_put_lkb(lkb);
4773 }
4774
4775 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4776                              uint32_t saved_seq)
4777 {
4778         int error = 0, noent = 0;
4779
4780         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4781                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4782                           le32_to_cpu(ms->m_type),
4783                           le32_to_cpu(ms->m_header.h_nodeid),
4784                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4785                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4786                 return;
4787         }
4788
4789         switch (ms->m_type) {
4790
4791         /* messages sent to a master node */
4792
4793         case cpu_to_le32(DLM_MSG_REQUEST):
4794                 error = receive_request(ls, ms);
4795                 break;
4796
4797         case cpu_to_le32(DLM_MSG_CONVERT):
4798                 error = receive_convert(ls, ms);
4799                 break;
4800
4801         case cpu_to_le32(DLM_MSG_UNLOCK):
4802                 error = receive_unlock(ls, ms);
4803                 break;
4804
4805         case cpu_to_le32(DLM_MSG_CANCEL):
4806                 noent = 1;
4807                 error = receive_cancel(ls, ms);
4808                 break;
4809
4810         /* messages sent from a master node (replies to above) */
4811
4812         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4813                 error = receive_request_reply(ls, ms);
4814                 break;
4815
4816         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4817                 error = receive_convert_reply(ls, ms);
4818                 break;
4819
4820         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4821                 error = receive_unlock_reply(ls, ms);
4822                 break;
4823
4824         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4825                 error = receive_cancel_reply(ls, ms);
4826                 break;
4827
4828         /* messages sent from a master node (only two types of async msg) */
4829
4830         case cpu_to_le32(DLM_MSG_GRANT):
4831                 noent = 1;
4832                 error = receive_grant(ls, ms);
4833                 break;
4834
4835         case cpu_to_le32(DLM_MSG_BAST):
4836                 noent = 1;
4837                 error = receive_bast(ls, ms);
4838                 break;
4839
4840         /* messages sent to a dir node */
4841
4842         case cpu_to_le32(DLM_MSG_LOOKUP):
4843                 receive_lookup(ls, ms);
4844                 break;
4845
4846         case cpu_to_le32(DLM_MSG_REMOVE):
4847                 receive_remove(ls, ms);
4848                 break;
4849
4850         /* messages sent from a dir node (remove has no reply) */
4851
4852         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4853                 receive_lookup_reply(ls, ms);
4854                 break;
4855
4856         /* other messages */
4857
4858         case cpu_to_le32(DLM_MSG_PURGE):
4859                 receive_purge(ls, ms);
4860                 break;
4861
4862         default:
4863                 log_error(ls, "unknown message type %d",
4864                           le32_to_cpu(ms->m_type));
4865         }
4866
4867         /*
4868          * When checking for ENOENT, we're checking the result of
4869          * find_lkb(m_remid):
4870          *
4871          * The lock id referenced in the message wasn't found.  This may
4872          * happen in normal usage for the async messages and cancel, so
4873          * only use log_debug for them.
4874          *
4875          * Some errors are expected and normal.
4876          */
4877
4878         if (error == -ENOENT && noent) {
4879                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4880                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4881                           le32_to_cpu(ms->m_header.h_nodeid),
4882                           le32_to_cpu(ms->m_lkid), saved_seq);
4883         } else if (error == -ENOENT) {
4884                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4885                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4886                           le32_to_cpu(ms->m_header.h_nodeid),
4887                           le32_to_cpu(ms->m_lkid), saved_seq);
4888
4889                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4890                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4891         }
4892
4893         if (error == -EINVAL) {
4894                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4895                           "saved_seq %u",
4896                           le32_to_cpu(ms->m_type),
4897                           le32_to_cpu(ms->m_header.h_nodeid),
4898                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4899                           saved_seq);
4900         }
4901 }
4902
4903 /* If the lockspace is in recovery mode (locking stopped), then normal
4904    messages are saved on the requestqueue for processing after recovery is
4905    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4906    messages off the requestqueue before we process new ones. This occurs right
4907    after recovery completes when we transition from saving all messages on
4908    requestqueue, to processing all the saved messages, to processing new
4909    messages as they arrive. */
4910
4911 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4912                                 int nodeid)
4913 {
4914 try_again:
4915         read_lock_bh(&ls->ls_requestqueue_lock);
4916         if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917                 /* If we were a member of this lockspace, left, and rejoined,
4918                    other nodes may still be sending us messages from the
4919                    lockspace generation before we left. */
4920                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4921                         read_unlock_bh(&ls->ls_requestqueue_lock);
4922                         log_limit(ls, "receive %d from %d ignore old gen",
4923                                   le32_to_cpu(ms->m_type), nodeid);
4924                         return;
4925                 }
4926
4927                 read_unlock_bh(&ls->ls_requestqueue_lock);
4928                 write_lock_bh(&ls->ls_requestqueue_lock);
4929                 /* recheck because we hold writelock now */
4930                 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4931                         write_unlock_bh(&ls->ls_requestqueue_lock);
4932                         goto try_again;
4933                 }
4934
4935                 dlm_add_requestqueue(ls, nodeid, ms);
4936                 write_unlock_bh(&ls->ls_requestqueue_lock);
4937         } else {
4938                 _receive_message(ls, ms, 0);
4939                 read_unlock_bh(&ls->ls_requestqueue_lock);
4940         }
4941 }
4942
4943 /* This is called by dlm_recoverd to process messages that were saved on
4944    the requestqueue. */
4945
4946 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4947                                uint32_t saved_seq)
4948 {
4949         _receive_message(ls, ms, saved_seq);
4950 }
4951
4952 /* This is called by the midcomms layer when something is received for
4953    the lockspace.  It could be either a MSG (normal message sent as part of
4954    standard locking activity) or an RCOM (recovery message sent as part of
4955    lockspace recovery). */
4956
4957 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4958 {
4959         const struct dlm_header *hd = &p->header;
4960         struct dlm_ls *ls;
4961         int type = 0;
4962
4963         switch (hd->h_cmd) {
4964         case DLM_MSG:
4965                 type = le32_to_cpu(p->message.m_type);
4966                 break;
4967         case DLM_RCOM:
4968                 type = le32_to_cpu(p->rcom.rc_type);
4969                 break;
4970         default:
4971                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4972                 return;
4973         }
4974
4975         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4976                 log_print("invalid h_nodeid %d from %d lockspace %x",
4977                           le32_to_cpu(hd->h_nodeid), nodeid,
4978                           le32_to_cpu(hd->u.h_lockspace));
4979                 return;
4980         }
4981
4982         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4983         if (!ls) {
4984                 if (dlm_config.ci_log_debug) {
4985                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4986                                 "%u from %d cmd %d type %d\n",
4987                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4988                                 hd->h_cmd, type);
4989                 }
4990
4991                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4992                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4993                 return;
4994         }
4995
4996         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4997            be inactive (in this ls) before transitioning to recovery mode */
4998
4999         read_lock_bh(&ls->ls_recv_active);
5000         if (hd->h_cmd == DLM_MSG)
5001                 dlm_receive_message(ls, &p->message, nodeid);
5002         else if (hd->h_cmd == DLM_RCOM)
5003                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5004         else
5005                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5006                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5007         read_unlock_bh(&ls->ls_recv_active);
5008
5009         dlm_put_lockspace(ls);
5010 }
5011
5012 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5013                                    struct dlm_message *ms_local)
5014 {
5015         if (middle_conversion(lkb)) {
5016                 hold_lkb(lkb);
5017                 memset(ms_local, 0, sizeof(struct dlm_message));
5018                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5019                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5020                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5021                 _receive_convert_reply(lkb, ms_local, true);
5022
5023                 /* Same special case as in receive_rcom_lock_args() */
5024                 lkb->lkb_grmode = DLM_LOCK_IV;
5025                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5026                 unhold_lkb(lkb);
5027
5028         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5029                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5030         }
5031
5032         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5033            conversions are async; there's no reply from the remote master */
5034 }
5035
5036 /* A waiting lkb needs recovery if the master node has failed, or
5037    the master node is changing (only when no directory is used) */
5038
5039 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5040                                  int dir_nodeid)
5041 {
5042         if (dlm_no_directory(ls))
5043                 return 1;
5044
5045         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5046                 return 1;
5047
5048         return 0;
5049 }
5050
5051 /* Recovery for locks that are waiting for replies from nodes that are now
5052    gone.  We can just complete unlocks and cancels by faking a reply from the
5053    dead node.  Requests and up-conversions we flag to be resent after
5054    recovery.  Down-conversions can just be completed with a fake reply like
5055    unlocks.  Conversions between PR and CW need special attention. */
5056
5057 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5058 {
5059         struct dlm_lkb *lkb, *safe;
5060         struct dlm_message *ms_local;
5061         int wait_type, local_unlock_result, local_cancel_result;
5062         int dir_nodeid;
5063
5064         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5065         if (!ms_local)
5066                 return;
5067
5068         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5069
5070                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5071
5072                 /* exclude debug messages about unlocks because there can be so
5073                    many and they aren't very interesting */
5074
5075                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5076                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5077                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5078                                   lkb->lkb_id,
5079                                   lkb->lkb_remid,
5080                                   lkb->lkb_wait_type,
5081                                   lkb->lkb_resource->res_nodeid,
5082                                   lkb->lkb_nodeid,
5083                                   lkb->lkb_wait_nodeid,
5084                                   dir_nodeid);
5085                 }
5086
5087                 /* all outstanding lookups, regardless of destination  will be
5088                    resent after recovery is done */
5089
5090                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5091                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5092                         continue;
5093                 }
5094
5095                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5096                         continue;
5097
5098                 wait_type = lkb->lkb_wait_type;
5099                 local_unlock_result = -DLM_EUNLOCK;
5100                 local_cancel_result = -DLM_ECANCEL;
5101
5102                 /* Main reply may have been received leaving a zero wait_type,
5103                    but a reply for the overlapping op may not have been
5104                    received.  In that case we need to fake the appropriate
5105                    reply for the overlap op. */
5106
5107                 if (!wait_type) {
5108                         if (is_overlap_cancel(lkb)) {
5109                                 wait_type = DLM_MSG_CANCEL;
5110                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5111                                         local_cancel_result = 0;
5112                         }
5113                         if (is_overlap_unlock(lkb)) {
5114                                 wait_type = DLM_MSG_UNLOCK;
5115                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5116                                         local_unlock_result = -ENOENT;
5117                         }
5118
5119                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5120                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5121                                   local_cancel_result, local_unlock_result);
5122                 }
5123
5124                 switch (wait_type) {
5125
5126                 case DLM_MSG_REQUEST:
5127                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5128                         break;
5129
5130                 case DLM_MSG_CONVERT:
5131                         recover_convert_waiter(ls, lkb, ms_local);
5132                         break;
5133
5134                 case DLM_MSG_UNLOCK:
5135                         hold_lkb(lkb);
5136                         memset(ms_local, 0, sizeof(struct dlm_message));
5137                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5138                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5139                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5140                         _receive_unlock_reply(lkb, ms_local, true);
5141                         dlm_put_lkb(lkb);
5142                         break;
5143
5144                 case DLM_MSG_CANCEL:
5145                         hold_lkb(lkb);
5146                         memset(ms_local, 0, sizeof(struct dlm_message));
5147                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5148                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5149                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5150                         _receive_cancel_reply(lkb, ms_local, true);
5151                         dlm_put_lkb(lkb);
5152                         break;
5153
5154                 default:
5155                         log_error(ls, "invalid lkb wait_type %d %d",
5156                                   lkb->lkb_wait_type, wait_type);
5157                 }
5158                 schedule();
5159         }
5160         kfree(ms_local);
5161 }
5162
5163 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5164 {
5165         struct dlm_lkb *lkb = NULL, *iter;
5166
5167         spin_lock_bh(&ls->ls_waiters_lock);
5168         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5169                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5170                         hold_lkb(iter);
5171                         lkb = iter;
5172                         break;
5173                 }
5174         }
5175         spin_unlock_bh(&ls->ls_waiters_lock);
5176
5177         return lkb;
5178 }
5179
5180 /*
5181  * Forced state reset for locks that were in the middle of remote operations
5182  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5183  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5184  * list need to be reevaluated; some may need resending to a different node
5185  * than previously, and some may now need local handling rather than remote.
5186  *
5187  * First, the lkb state for the voided remote operation is forcibly reset,
5188  * equivalent to what remove_from_waiters() would normally do:
5189  * . lkb removed from ls_waiters list
5190  * . lkb wait_type cleared
5191  * . lkb waiters_count cleared
5192  * . lkb ref count decremented for each waiters_count (almost always 1,
5193  *   but possibly 2 in case of cancel/unlock overlapping, which means
5194  *   two remote replies were being expected for the lkb.)
5195  *
5196  * Second, the lkb is reprocessed like an original operation would be,
5197  * by passing it to _request_lock or _convert_lock, which will either
5198  * process the lkb operation locally, or send it to a remote node again
5199  * and put the lkb back onto the waiters list.
5200  *
5201  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5202  * force-unlock or cancel, either from before recovery began, or after recovery
5203  * finished.  If this is the case, the unlock/cancel is done directly, and the
5204  * original operation is not initiated again (no _request_lock/_convert_lock.)
5205  */
5206
5207 int dlm_recover_waiters_post(struct dlm_ls *ls)
5208 {
5209         struct dlm_lkb *lkb;
5210         struct dlm_rsb *r;
5211         int error = 0, mstype, err, oc, ou;
5212
5213         while (1) {
5214                 if (dlm_locking_stopped(ls)) {
5215                         log_debug(ls, "recover_waiters_post aborted");
5216                         error = -EINTR;
5217                         break;
5218                 }
5219
5220                 /* 
5221                  * Find an lkb from the waiters list that's been affected by
5222                  * recovery node changes, and needs to be reprocessed.  Does
5223                  * hold_lkb(), adding a refcount.
5224                  */
5225                 lkb = find_resend_waiter(ls);
5226                 if (!lkb)
5227                         break;
5228
5229                 r = lkb->lkb_resource;
5230                 hold_rsb(r);
5231                 lock_rsb(r);
5232
5233                 /*
5234                  * If the lkb has been flagged for a force unlock or cancel,
5235                  * then the reprocessing below will be replaced by just doing
5236                  * the unlock/cancel directly.
5237                  */
5238                 mstype = lkb->lkb_wait_type;
5239                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5240                                         &lkb->lkb_iflags);
5241                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5242                                         &lkb->lkb_iflags);
5243                 err = 0;
5244
5245                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5246                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5247                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5248                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5249                           dlm_dir_nodeid(r), oc, ou);
5250
5251                 /*
5252                  * No reply to the pre-recovery operation will now be received,
5253                  * so a forced equivalent of remove_from_waiters() is needed to
5254                  * reset the waiters state that was in place before recovery.
5255                  */
5256
5257                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5258
5259                 /* Forcibly clear wait_type */
5260                 lkb->lkb_wait_type = 0;
5261
5262                 /*
5263                  * Forcibly reset wait_count and associated refcount.  The
5264                  * wait_count will almost always be 1, but in case of an
5265                  * overlapping unlock/cancel it could be 2: see where
5266                  * add_to_waiters() finds the lkb is already on the waiters
5267                  * list and does lkb_wait_count++; hold_lkb().
5268                  */
5269                 while (lkb->lkb_wait_count) {
5270                         lkb->lkb_wait_count--;
5271                         unhold_lkb(lkb);
5272                 }
5273
5274                 /* Forcibly remove from waiters list */
5275                 spin_lock_bh(&ls->ls_waiters_lock);
5276                 list_del_init(&lkb->lkb_wait_reply);
5277                 spin_unlock_bh(&ls->ls_waiters_lock);
5278
5279                 /*
5280                  * The lkb is now clear of all prior waiters state and can be
5281                  * processed locally, or sent to remote node again, or directly
5282                  * cancelled/unlocked.
5283                  */
5284
5285                 if (oc || ou) {
5286                         /* do an unlock or cancel instead of resending */
5287                         switch (mstype) {
5288                         case DLM_MSG_LOOKUP:
5289                         case DLM_MSG_REQUEST:
5290                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5291                                                         -DLM_ECANCEL);
5292                                 unhold_lkb(lkb); /* undoes create_lkb() */
5293                                 break;
5294                         case DLM_MSG_CONVERT:
5295                                 if (oc) {
5296                                         queue_cast(r, lkb, -DLM_ECANCEL);
5297                                 } else {
5298                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5299                                         _unlock_lock(r, lkb);
5300                                 }
5301                                 break;
5302                         default:
5303                                 err = 1;
5304                         }
5305                 } else {
5306                         switch (mstype) {
5307                         case DLM_MSG_LOOKUP:
5308                         case DLM_MSG_REQUEST:
5309                                 _request_lock(r, lkb);
5310                                 if (is_master(r))
5311                                         confirm_master(r, 0);
5312                                 break;
5313                         case DLM_MSG_CONVERT:
5314                                 _convert_lock(r, lkb);
5315                                 break;
5316                         default:
5317                                 err = 1;
5318                         }
5319                 }
5320
5321                 if (err) {
5322                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5323                                   "dir_nodeid %d overlap %d %d",
5324                                   lkb->lkb_id, mstype, r->res_nodeid,
5325                                   dlm_dir_nodeid(r), oc, ou);
5326                 }
5327                 unlock_rsb(r);
5328                 put_rsb(r);
5329                 dlm_put_lkb(lkb);
5330         }
5331
5332         return error;
5333 }
5334
5335 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5336                               struct list_head *list)
5337 {
5338         struct dlm_lkb *lkb, *safe;
5339
5340         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5341                 if (!is_master_copy(lkb))
5342                         continue;
5343
5344                 /* don't purge lkbs we've added in recover_master_copy for
5345                    the current recovery seq */
5346
5347                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5348                         continue;
5349
5350                 del_lkb(r, lkb);
5351
5352                 /* this put should free the lkb */
5353                 if (!dlm_put_lkb(lkb))
5354                         log_error(ls, "purged mstcpy lkb not released");
5355         }
5356 }
5357
5358 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5359 {
5360         struct dlm_ls *ls = r->res_ls;
5361
5362         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5363         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5364         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5365 }
5366
5367 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5368                             struct list_head *list,
5369                             int nodeid_gone, unsigned int *count)
5370 {
5371         struct dlm_lkb *lkb, *safe;
5372
5373         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5374                 if (!is_master_copy(lkb))
5375                         continue;
5376
5377                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5378                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5379
5380                         /* tell recover_lvb to invalidate the lvb
5381                            because a node holding EX/PW failed */
5382                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5383                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5384                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5385                         }
5386
5387                         del_lkb(r, lkb);
5388
5389                         /* this put should free the lkb */
5390                         if (!dlm_put_lkb(lkb))
5391                                 log_error(ls, "purged dead lkb not released");
5392
5393                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5394
5395                         (*count)++;
5396                 }
5397         }
5398 }
5399
5400 /* Get rid of locks held by nodes that are gone. */
5401
5402 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5403 {
5404         struct dlm_rsb *r;
5405         struct dlm_member *memb;
5406         int nodes_count = 0;
5407         int nodeid_gone = 0;
5408         unsigned int lkb_count = 0;
5409
5410         /* cache one removed nodeid to optimize the common
5411            case of a single node removed */
5412
5413         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5414                 nodes_count++;
5415                 nodeid_gone = memb->nodeid;
5416         }
5417
5418         if (!nodes_count)
5419                 return;
5420
5421         list_for_each_entry(r, root_list, res_root_list) {
5422                 hold_rsb(r);
5423                 lock_rsb(r);
5424                 if (is_master(r)) {
5425                         purge_dead_list(ls, r, &r->res_grantqueue,
5426                                         nodeid_gone, &lkb_count);
5427                         purge_dead_list(ls, r, &r->res_convertqueue,
5428                                         nodeid_gone, &lkb_count);
5429                         purge_dead_list(ls, r, &r->res_waitqueue,
5430                                         nodeid_gone, &lkb_count);
5431                 }
5432                 unlock_rsb(r);
5433                 unhold_rsb(r);
5434                 cond_resched();
5435         }
5436
5437         if (lkb_count)
5438                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5439                           lkb_count, nodes_count);
5440 }
5441
5442 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5443 {
5444         struct dlm_rsb *r;
5445
5446         read_lock_bh(&ls->ls_rsbtbl_lock);
5447         list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
5448                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5449                         continue;
5450                 if (!is_master(r)) {
5451                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5452                         continue;
5453                 }
5454                 hold_rsb(r);
5455                 read_unlock_bh(&ls->ls_rsbtbl_lock);
5456                 return r;
5457         }
5458         read_unlock_bh(&ls->ls_rsbtbl_lock);
5459         return NULL;
5460 }
5461
5462 /*
5463  * Attempt to grant locks on resources that we are the master of.
5464  * Locks may have become grantable during recovery because locks
5465  * from departed nodes have been purged (or not rebuilt), allowing
5466  * previously blocked locks to now be granted.  The subset of rsb's
5467  * we are interested in are those with lkb's on either the convert or
5468  * waiting queues.
5469  *
5470  * Simplest would be to go through each master rsb and check for non-empty
5471  * convert or waiting queues, and attempt to grant on those rsbs.
5472  * Checking the queues requires lock_rsb, though, for which we'd need
5473  * to release the rsbtbl lock.  This would make iterating through all
5474  * rsb's very inefficient.  So, we rely on earlier recovery routines
5475  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5476  * locks for.
5477  */
5478
5479 void dlm_recover_grant(struct dlm_ls *ls)
5480 {
5481         struct dlm_rsb *r;
5482         unsigned int count = 0;
5483         unsigned int rsb_count = 0;
5484         unsigned int lkb_count = 0;
5485
5486         while (1) {
5487                 r = find_grant_rsb(ls);
5488                 if (!r)
5489                         break;
5490
5491                 rsb_count++;
5492                 count = 0;
5493                 lock_rsb(r);
5494                 /* the RECOVER_GRANT flag is checked in the grant path */
5495                 grant_pending_locks(r, &count);
5496                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5497                 lkb_count += count;
5498                 confirm_master(r, 0);
5499                 unlock_rsb(r);
5500                 put_rsb(r);
5501                 cond_resched();
5502         }
5503
5504         if (lkb_count)
5505                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5506                           lkb_count, rsb_count);
5507 }
5508
5509 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5510                                          uint32_t remid)
5511 {
5512         struct dlm_lkb *lkb;
5513
5514         list_for_each_entry(lkb, head, lkb_statequeue) {
5515                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5516                         return lkb;
5517         }
5518         return NULL;
5519 }
5520
5521 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5522                                     uint32_t remid)
5523 {
5524         struct dlm_lkb *lkb;
5525
5526         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5527         if (lkb)
5528                 return lkb;
5529         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5530         if (lkb)
5531                 return lkb;
5532         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5533         if (lkb)
5534                 return lkb;
5535         return NULL;
5536 }
5537
5538 /* needs at least dlm_rcom + rcom_lock */
5539 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5540                                   struct dlm_rsb *r, const struct dlm_rcom *rc)
5541 {
5542         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5543
5544         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5545         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5546         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5547         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5548         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5549         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5550         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5551         lkb->lkb_rqmode = rl->rl_rqmode;
5552         lkb->lkb_grmode = rl->rl_grmode;
5553         /* don't set lkb_status because add_lkb wants to itself */
5554
5555         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5556         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5557
5558         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5559                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5560                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5561                 if (lvblen > ls->ls_lvblen)
5562                         return -EINVAL;
5563                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5564                 if (!lkb->lkb_lvbptr)
5565                         return -ENOMEM;
5566                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5567         }
5568
5569         /* Conversions between PR and CW (middle modes) need special handling.
5570            The real granted mode of these converting locks cannot be determined
5571            until all locks have been rebuilt on the rsb (recover_conversion) */
5572
5573         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5574             middle_conversion(lkb)) {
5575                 rl->rl_status = DLM_LKSTS_CONVERT;
5576                 lkb->lkb_grmode = DLM_LOCK_IV;
5577                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5578         }
5579
5580         return 0;
5581 }
5582
5583 /* This lkb may have been recovered in a previous aborted recovery so we need
5584    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5585    If so we just send back a standard reply.  If not, we create a new lkb with
5586    the given values and send back our lkid.  We send back our lkid by sending
5587    back the rcom_lock struct we got but with the remid field filled in. */
5588
5589 /* needs at least dlm_rcom + rcom_lock */
5590 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5591                             __le32 *rl_remid, __le32 *rl_result)
5592 {
5593         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5594         struct dlm_rsb *r;
5595         struct dlm_lkb *lkb;
5596         uint32_t remid = 0;
5597         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5598         int error;
5599
5600         /* init rl_remid with rcom lock rl_remid */
5601         *rl_remid = rl->rl_remid;
5602
5603         if (rl->rl_parent_lkid) {
5604                 error = -EOPNOTSUPP;
5605                 goto out;
5606         }
5607
5608         remid = le32_to_cpu(rl->rl_lkid);
5609
5610         /* In general we expect the rsb returned to be R_MASTER, but we don't
5611            have to require it.  Recovery of masters on one node can overlap
5612            recovery of locks on another node, so one node can send us MSTCPY
5613            locks before we've made ourselves master of this rsb.  We can still
5614            add new MSTCPY locks that we receive here without any harm; when
5615            we make ourselves master, dlm_recover_masters() won't touch the
5616            MSTCPY locks we've received early. */
5617
5618         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5619                          from_nodeid, R_RECEIVE_RECOVER, &r);
5620         if (error)
5621                 goto out;
5622
5623         lock_rsb(r);
5624
5625         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5626                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5627                           from_nodeid, remid);
5628                 error = -EBADR;
5629                 goto out_unlock;
5630         }
5631
5632         lkb = search_remid(r, from_nodeid, remid);
5633         if (lkb) {
5634                 error = -EEXIST;
5635                 goto out_remid;
5636         }
5637
5638         error = create_lkb(ls, &lkb);
5639         if (error)
5640                 goto out_unlock;
5641
5642         error = receive_rcom_lock_args(ls, lkb, r, rc);
5643         if (error) {
5644                 __put_lkb(ls, lkb);
5645                 goto out_unlock;
5646         }
5647
5648         attach_lkb(r, lkb);
5649         add_lkb(r, lkb, rl->rl_status);
5650         ls->ls_recover_locks_in++;
5651
5652         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5653                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5654
5655  out_remid:
5656         /* this is the new value returned to the lock holder for
5657            saving in its process-copy lkb */
5658         *rl_remid = cpu_to_le32(lkb->lkb_id);
5659
5660         lkb->lkb_recover_seq = ls->ls_recover_seq;
5661
5662  out_unlock:
5663         unlock_rsb(r);
5664         put_rsb(r);
5665  out:
5666         if (error && error != -EEXIST)
5667                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5668                           from_nodeid, remid, error);
5669         *rl_result = cpu_to_le32(error);
5670         return error;
5671 }
5672
5673 /* needs at least dlm_rcom + rcom_lock */
5674 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5675                              uint64_t seq)
5676 {
5677         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5678         struct dlm_rsb *r;
5679         struct dlm_lkb *lkb;
5680         uint32_t lkid, remid;
5681         int error, result;
5682
5683         lkid = le32_to_cpu(rl->rl_lkid);
5684         remid = le32_to_cpu(rl->rl_remid);
5685         result = le32_to_cpu(rl->rl_result);
5686
5687         error = find_lkb(ls, lkid, &lkb);
5688         if (error) {
5689                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5690                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691                           result);
5692                 return error;
5693         }
5694
5695         r = lkb->lkb_resource;
5696         hold_rsb(r);
5697         lock_rsb(r);
5698
5699         if (!is_process_copy(lkb)) {
5700                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5701                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5702                           result);
5703                 dlm_dump_rsb(r);
5704                 unlock_rsb(r);
5705                 put_rsb(r);
5706                 dlm_put_lkb(lkb);
5707                 return -EINVAL;
5708         }
5709
5710         switch (result) {
5711         case -EBADR:
5712                 /* There's a chance the new master received our lock before
5713                    dlm_recover_master_reply(), this wouldn't happen if we did
5714                    a barrier between recover_masters and recover_locks. */
5715
5716                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5717                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718                           result);
5719         
5720                 dlm_send_rcom_lock(r, lkb, seq);
5721                 goto out;
5722         case -EEXIST:
5723         case 0:
5724                 lkb->lkb_remid = remid;
5725                 break;
5726         default:
5727                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5728                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5729                           result);
5730         }
5731
5732         /* an ack for dlm_recover_locks() which waits for replies from
5733            all the locks it sends to new masters */
5734         dlm_recovered_lock(r);
5735  out:
5736         unlock_rsb(r);
5737         put_rsb(r);
5738         dlm_put_lkb(lkb);
5739
5740         return 0;
5741 }
5742
5743 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5744                      int mode, uint32_t flags, void *name, unsigned int namelen)
5745 {
5746         struct dlm_lkb *lkb;
5747         struct dlm_args args;
5748         bool do_put = true;
5749         int error;
5750
5751         dlm_lock_recovery(ls);
5752
5753         error = create_lkb(ls, &lkb);
5754         if (error) {
5755                 kfree(ua);
5756                 goto out;
5757         }
5758
5759         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5760
5761         if (flags & DLM_LKF_VALBLK) {
5762                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5763                 if (!ua->lksb.sb_lvbptr) {
5764                         kfree(ua);
5765                         error = -ENOMEM;
5766                         goto out_put;
5767                 }
5768         }
5769         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5770                               fake_bastfn, &args);
5771         if (error) {
5772                 kfree(ua->lksb.sb_lvbptr);
5773                 ua->lksb.sb_lvbptr = NULL;
5774                 kfree(ua);
5775                 goto out_put;
5776         }
5777
5778         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5779            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5780            lock and that lkb_astparam is the dlm_user_args structure. */
5781         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5782         error = request_lock(ls, lkb, name, namelen, &args);
5783
5784         switch (error) {
5785         case 0:
5786                 break;
5787         case -EINPROGRESS:
5788                 error = 0;
5789                 break;
5790         case -EAGAIN:
5791                 error = 0;
5792                 fallthrough;
5793         default:
5794                 goto out_put;
5795         }
5796
5797         /* add this new lkb to the per-process list of locks */
5798         spin_lock_bh(&ua->proc->locks_spin);
5799         hold_lkb(lkb);
5800         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5801         spin_unlock_bh(&ua->proc->locks_spin);
5802         do_put = false;
5803  out_put:
5804         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5805         if (do_put)
5806                 __put_lkb(ls, lkb);
5807  out:
5808         dlm_unlock_recovery(ls);
5809         return error;
5810 }
5811
5812 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5813                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5814 {
5815         struct dlm_lkb *lkb;
5816         struct dlm_args args;
5817         struct dlm_user_args *ua;
5818         int error;
5819
5820         dlm_lock_recovery(ls);
5821
5822         error = find_lkb(ls, lkid, &lkb);
5823         if (error)
5824                 goto out;
5825
5826         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5827
5828         /* user can change the params on its lock when it converts it, or
5829            add an lvb that didn't exist before */
5830
5831         ua = lkb->lkb_ua;
5832
5833         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5834                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5835                 if (!ua->lksb.sb_lvbptr) {
5836                         error = -ENOMEM;
5837                         goto out_put;
5838                 }
5839         }
5840         if (lvb_in && ua->lksb.sb_lvbptr)
5841                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5842
5843         ua->xid = ua_tmp->xid;
5844         ua->castparam = ua_tmp->castparam;
5845         ua->castaddr = ua_tmp->castaddr;
5846         ua->bastparam = ua_tmp->bastparam;
5847         ua->bastaddr = ua_tmp->bastaddr;
5848         ua->user_lksb = ua_tmp->user_lksb;
5849
5850         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5851                               fake_bastfn, &args);
5852         if (error)
5853                 goto out_put;
5854
5855         error = convert_lock(ls, lkb, &args);
5856
5857         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5858                 error = 0;
5859  out_put:
5860         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5861         dlm_put_lkb(lkb);
5862  out:
5863         dlm_unlock_recovery(ls);
5864         kfree(ua_tmp);
5865         return error;
5866 }
5867
5868 /*
5869  * The caller asks for an orphan lock on a given resource with a given mode.
5870  * If a matching lock exists, it's moved to the owner's list of locks and
5871  * the lkid is returned.
5872  */
5873
5874 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5875                      int mode, uint32_t flags, void *name, unsigned int namelen,
5876                      uint32_t *lkid)
5877 {
5878         struct dlm_lkb *lkb = NULL, *iter;
5879         struct dlm_user_args *ua;
5880         int found_other_mode = 0;
5881         int rv = 0;
5882
5883         spin_lock_bh(&ls->ls_orphans_lock);
5884         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5885                 if (iter->lkb_resource->res_length != namelen)
5886                         continue;
5887                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5888                         continue;
5889                 if (iter->lkb_grmode != mode) {
5890                         found_other_mode = 1;
5891                         continue;
5892                 }
5893
5894                 lkb = iter;
5895                 list_del_init(&iter->lkb_ownqueue);
5896                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5897                 *lkid = iter->lkb_id;
5898                 break;
5899         }
5900         spin_unlock_bh(&ls->ls_orphans_lock);
5901
5902         if (!lkb && found_other_mode) {
5903                 rv = -EAGAIN;
5904                 goto out;
5905         }
5906
5907         if (!lkb) {
5908                 rv = -ENOENT;
5909                 goto out;
5910         }
5911
5912         lkb->lkb_exflags = flags;
5913         lkb->lkb_ownpid = (int) current->pid;
5914
5915         ua = lkb->lkb_ua;
5916
5917         ua->proc = ua_tmp->proc;
5918         ua->xid = ua_tmp->xid;
5919         ua->castparam = ua_tmp->castparam;
5920         ua->castaddr = ua_tmp->castaddr;
5921         ua->bastparam = ua_tmp->bastparam;
5922         ua->bastaddr = ua_tmp->bastaddr;
5923         ua->user_lksb = ua_tmp->user_lksb;
5924
5925         /*
5926          * The lkb reference from the ls_orphans list was not
5927          * removed above, and is now considered the reference
5928          * for the proc locks list.
5929          */
5930
5931         spin_lock_bh(&ua->proc->locks_spin);
5932         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5933         spin_unlock_bh(&ua->proc->locks_spin);
5934  out:
5935         kfree(ua_tmp);
5936         return rv;
5937 }
5938
5939 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5940                     uint32_t flags, uint32_t lkid, char *lvb_in)
5941 {
5942         struct dlm_lkb *lkb;
5943         struct dlm_args args;
5944         struct dlm_user_args *ua;
5945         int error;
5946
5947         dlm_lock_recovery(ls);
5948
5949         error = find_lkb(ls, lkid, &lkb);
5950         if (error)
5951                 goto out;
5952
5953         trace_dlm_unlock_start(ls, lkb, flags);
5954
5955         ua = lkb->lkb_ua;
5956
5957         if (lvb_in && ua->lksb.sb_lvbptr)
5958                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5959         if (ua_tmp->castparam)
5960                 ua->castparam = ua_tmp->castparam;
5961         ua->user_lksb = ua_tmp->user_lksb;
5962
5963         error = set_unlock_args(flags, ua, &args);
5964         if (error)
5965                 goto out_put;
5966
5967         error = unlock_lock(ls, lkb, &args);
5968
5969         if (error == -DLM_EUNLOCK)
5970                 error = 0;
5971         /* from validate_unlock_args() */
5972         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5973                 error = 0;
5974         if (error)
5975                 goto out_put;
5976
5977         spin_lock_bh(&ua->proc->locks_spin);
5978         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5979         if (!list_empty(&lkb->lkb_ownqueue))
5980                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5981         spin_unlock_bh(&ua->proc->locks_spin);
5982  out_put:
5983         trace_dlm_unlock_end(ls, lkb, flags, error);
5984         dlm_put_lkb(lkb);
5985  out:
5986         dlm_unlock_recovery(ls);
5987         kfree(ua_tmp);
5988         return error;
5989 }
5990
5991 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5992                     uint32_t flags, uint32_t lkid)
5993 {
5994         struct dlm_lkb *lkb;
5995         struct dlm_args args;
5996         struct dlm_user_args *ua;
5997         int error;
5998
5999         dlm_lock_recovery(ls);
6000
6001         error = find_lkb(ls, lkid, &lkb);
6002         if (error)
6003                 goto out;
6004
6005         trace_dlm_unlock_start(ls, lkb, flags);
6006
6007         ua = lkb->lkb_ua;
6008         if (ua_tmp->castparam)
6009                 ua->castparam = ua_tmp->castparam;
6010         ua->user_lksb = ua_tmp->user_lksb;
6011
6012         error = set_unlock_args(flags, ua, &args);
6013         if (error)
6014                 goto out_put;
6015
6016         error = cancel_lock(ls, lkb, &args);
6017
6018         if (error == -DLM_ECANCEL)
6019                 error = 0;
6020         /* from validate_unlock_args() */
6021         if (error == -EBUSY)
6022                 error = 0;
6023  out_put:
6024         trace_dlm_unlock_end(ls, lkb, flags, error);
6025         dlm_put_lkb(lkb);
6026  out:
6027         dlm_unlock_recovery(ls);
6028         kfree(ua_tmp);
6029         return error;
6030 }
6031
6032 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6033 {
6034         struct dlm_lkb *lkb;
6035         struct dlm_args args;
6036         struct dlm_user_args *ua;
6037         struct dlm_rsb *r;
6038         int error;
6039
6040         dlm_lock_recovery(ls);
6041
6042         error = find_lkb(ls, lkid, &lkb);
6043         if (error)
6044                 goto out;
6045
6046         trace_dlm_unlock_start(ls, lkb, flags);
6047
6048         ua = lkb->lkb_ua;
6049
6050         error = set_unlock_args(flags, ua, &args);
6051         if (error)
6052                 goto out_put;
6053
6054         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6055
6056         r = lkb->lkb_resource;
6057         hold_rsb(r);
6058         lock_rsb(r);
6059
6060         error = validate_unlock_args(lkb, &args);
6061         if (error)
6062                 goto out_r;
6063         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6064
6065         error = _cancel_lock(r, lkb);
6066  out_r:
6067         unlock_rsb(r);
6068         put_rsb(r);
6069
6070         if (error == -DLM_ECANCEL)
6071                 error = 0;
6072         /* from validate_unlock_args() */
6073         if (error == -EBUSY)
6074                 error = 0;
6075  out_put:
6076         trace_dlm_unlock_end(ls, lkb, flags, error);
6077         dlm_put_lkb(lkb);
6078  out:
6079         dlm_unlock_recovery(ls);
6080         return error;
6081 }
6082
6083 /* lkb's that are removed from the waiters list by revert are just left on the
6084    orphans list with the granted orphan locks, to be freed by purge */
6085
6086 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6087 {
6088         struct dlm_args args;
6089         int error;
6090
6091         hold_lkb(lkb); /* reference for the ls_orphans list */
6092         spin_lock_bh(&ls->ls_orphans_lock);
6093         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6094         spin_unlock_bh(&ls->ls_orphans_lock);
6095
6096         set_unlock_args(0, lkb->lkb_ua, &args);
6097
6098         error = cancel_lock(ls, lkb, &args);
6099         if (error == -DLM_ECANCEL)
6100                 error = 0;
6101         return error;
6102 }
6103
6104 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6105    granted.  Regardless of what rsb queue the lock is on, it's removed and
6106    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6107    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6108
6109 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6110 {
6111         struct dlm_args args;
6112         int error;
6113
6114         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6115                         lkb->lkb_ua, &args);
6116
6117         error = unlock_lock(ls, lkb, &args);
6118         if (error == -DLM_EUNLOCK)
6119                 error = 0;
6120         return error;
6121 }
6122
6123 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6124    (which does lock_rsb) due to deadlock with receiving a message that does
6125    lock_rsb followed by dlm_user_add_cb() */
6126
6127 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6128                                      struct dlm_user_proc *proc)
6129 {
6130         struct dlm_lkb *lkb = NULL;
6131
6132         spin_lock_bh(&ls->ls_clear_proc_locks);
6133         if (list_empty(&proc->locks))
6134                 goto out;
6135
6136         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6137         list_del_init(&lkb->lkb_ownqueue);
6138
6139         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6140                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6141         else
6142                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6143  out:
6144         spin_unlock_bh(&ls->ls_clear_proc_locks);
6145         return lkb;
6146 }
6147
6148 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6149    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6150    which we clear here. */
6151
6152 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6153    list, and no more device_writes should add lkb's to proc->locks list; so we
6154    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6155    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6156    them ourself. */
6157
6158 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6159 {
6160         struct dlm_callback *cb, *cb_safe;
6161         struct dlm_lkb *lkb, *safe;
6162
6163         dlm_lock_recovery(ls);
6164
6165         while (1) {
6166                 lkb = del_proc_lock(ls, proc);
6167                 if (!lkb)
6168                         break;
6169                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6170                         orphan_proc_lock(ls, lkb);
6171                 else
6172                         unlock_proc_lock(ls, lkb);
6173
6174                 /* this removes the reference for the proc->locks list
6175                    added by dlm_user_request, it may result in the lkb
6176                    being freed */
6177
6178                 dlm_put_lkb(lkb);
6179         }
6180
6181         spin_lock_bh(&ls->ls_clear_proc_locks);
6182
6183         /* in-progress unlocks */
6184         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6185                 list_del_init(&lkb->lkb_ownqueue);
6186                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6187                 dlm_put_lkb(lkb);
6188         }
6189
6190         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6191                 list_del(&cb->list);
6192                 dlm_free_cb(cb);
6193         }
6194
6195         spin_unlock_bh(&ls->ls_clear_proc_locks);
6196         dlm_unlock_recovery(ls);
6197 }
6198
6199 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6200 {
6201         struct dlm_callback *cb, *cb_safe;
6202         struct dlm_lkb *lkb, *safe;
6203
6204         while (1) {
6205                 lkb = NULL;
6206                 spin_lock_bh(&proc->locks_spin);
6207                 if (!list_empty(&proc->locks)) {
6208                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6209                                          lkb_ownqueue);
6210                         list_del_init(&lkb->lkb_ownqueue);
6211                 }
6212                 spin_unlock_bh(&proc->locks_spin);
6213
6214                 if (!lkb)
6215                         break;
6216
6217                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6218                 unlock_proc_lock(ls, lkb);
6219                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6220         }
6221
6222         spin_lock_bh(&proc->locks_spin);
6223         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6224                 list_del_init(&lkb->lkb_ownqueue);
6225                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6226                 dlm_put_lkb(lkb);
6227         }
6228         spin_unlock_bh(&proc->locks_spin);
6229
6230         spin_lock_bh(&proc->asts_spin);
6231         list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6232                 list_del(&cb->list);
6233                 dlm_free_cb(cb);
6234         }
6235         spin_unlock_bh(&proc->asts_spin);
6236 }
6237
6238 /* pid of 0 means purge all orphans */
6239
6240 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6241 {
6242         struct dlm_lkb *lkb, *safe;
6243
6244         spin_lock_bh(&ls->ls_orphans_lock);
6245         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6246                 if (pid && lkb->lkb_ownpid != pid)
6247                         continue;
6248                 unlock_proc_lock(ls, lkb);
6249                 list_del_init(&lkb->lkb_ownqueue);
6250                 dlm_put_lkb(lkb);
6251         }
6252         spin_unlock_bh(&ls->ls_orphans_lock);
6253 }
6254
6255 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6256 {
6257         struct dlm_message *ms;
6258         struct dlm_mhandle *mh;
6259         int error;
6260
6261         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6262                                 DLM_MSG_PURGE, &ms, &mh);
6263         if (error)
6264                 return error;
6265         ms->m_nodeid = cpu_to_le32(nodeid);
6266         ms->m_pid = cpu_to_le32(pid);
6267
6268         return send_message(mh, ms, NULL, 0);
6269 }
6270
6271 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6272                    int nodeid, int pid)
6273 {
6274         int error = 0;
6275
6276         if (nodeid && (nodeid != dlm_our_nodeid())) {
6277                 error = send_purge(ls, nodeid, pid);
6278         } else {
6279                 dlm_lock_recovery(ls);
6280                 if (pid == current->pid)
6281                         purge_proc_locks(ls, proc);
6282                 else
6283                         do_purge(ls, nodeid, pid);
6284                 dlm_unlock_recovery(ls);
6285         }
6286         return error;
6287 }
6288
6289 /* debug functionality */
6290 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6291                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6292 {
6293         struct dlm_lksb *lksb;
6294         struct dlm_lkb *lkb;
6295         struct dlm_rsb *r;
6296         int error;
6297
6298         /* we currently can't set a valid user lock */
6299         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6300                 return -EOPNOTSUPP;
6301
6302         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6303         if (!lksb)
6304                 return -ENOMEM;
6305
6306         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6307         if (error) {
6308                 kfree(lksb);
6309                 return error;
6310         }
6311
6312         dlm_set_dflags_val(lkb, lkb_dflags);
6313         lkb->lkb_nodeid = lkb_nodeid;
6314         lkb->lkb_lksb = lksb;
6315         /* user specific pointer, just don't have it NULL for kernel locks */
6316         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6317                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6318
6319         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6320         if (error) {
6321                 kfree(lksb);
6322                 __put_lkb(ls, lkb);
6323                 return error;
6324         }
6325
6326         lock_rsb(r);
6327         attach_lkb(r, lkb);
6328         add_lkb(r, lkb, lkb_status);
6329         unlock_rsb(r);
6330         put_rsb(r);
6331
6332         return 0;
6333 }
6334
6335 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6336                                  int mstype, int to_nodeid)
6337 {
6338         struct dlm_lkb *lkb;
6339         int error;
6340
6341         error = find_lkb(ls, lkb_id, &lkb);
6342         if (error)
6343                 return error;
6344
6345         error = add_to_waiters(lkb, mstype, to_nodeid);
6346         dlm_put_lkb(lkb);
6347         return error;
6348 }
6349
This page took 0.388246 seconds and 4 git commands to generate.