1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
8 *******************************************************************************
9 ******************************************************************************/
11 /* Central locking logic has four stages:
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
52 L: send_xxxx() -> R: receive_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
56 #include <trace/events/dlm.h>
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
65 #include "requestqueue.h"
69 #include "lockspace.h"
74 #include "lvb_table.h"
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
102 static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
123 const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
135 #define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138 int dlm_modes_compat(int mode1, int mode2)
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
149 static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
161 void dlm_print_lkb(struct dlm_lkb *lkb)
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
171 static void dlm_print_rsb(struct dlm_rsb *r)
173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
180 void dlm_dump_rsb(struct dlm_rsb *r)
186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 printk(KERN_ERR "rsb lookup list\n");
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191 printk(KERN_ERR "rsb grant queue:\n");
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194 printk(KERN_ERR "rsb convert queue:\n");
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197 printk(KERN_ERR "rsb wait queue:\n");
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
202 /* Threads cannot use the lockspace while it's being recovered */
204 void dlm_lock_recovery(struct dlm_ls *ls)
206 down_read(&ls->ls_in_recovery);
209 void dlm_unlock_recovery(struct dlm_ls *ls)
211 up_read(&ls->ls_in_recovery);
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 return down_read_trylock(&ls->ls_in_recovery);
219 static inline int can_be_queued(struct dlm_lkb *lkb)
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
229 static inline int is_demoted(struct dlm_lkb *lkb)
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
234 static inline int is_altmode(struct dlm_lkb *lkb)
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
239 static inline int is_granted(struct dlm_lkb *lkb)
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
244 static inline int is_remote(struct dlm_rsb *r)
246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 return !!r->res_nodeid;
250 static inline int is_process_copy(struct dlm_lkb *lkb)
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
256 static inline int is_master_copy(struct dlm_lkb *lkb)
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
261 static inline int middle_conversion(struct dlm_lkb *lkb)
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
269 static inline int down_conversion(struct dlm_lkb *lkb)
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
284 static inline int is_overlap(struct dlm_lkb *lkb)
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
292 if (is_master_copy(lkb))
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
297 if (rv == -DLM_ECANCEL &&
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
320 * Basic operations on rsb's and lkb's
323 static inline unsigned long rsb_toss_jiffies(void)
325 return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
328 /* This is only called to add a reference when the code already holds
329 a valid reference to the rsb, so there's no need for locking. */
331 static inline void hold_rsb(struct dlm_rsb *r)
333 /* rsbs in toss state never get referenced */
334 WARN_ON(rsb_flag(r, RSB_TOSS));
335 kref_get(&r->res_ref);
338 void dlm_hold_rsb(struct dlm_rsb *r)
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
348 if (refcount_dec_not_one(r))
352 if (!refcount_dec_and_test(r)) {
353 write_unlock_bh(lock);
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 void (*release)(struct kref *kref),
365 if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
373 /* When all references to the rsb are gone it's transferred to
374 the tossed list for later disposal. */
376 static void put_rsb(struct dlm_rsb *r)
378 struct dlm_ls *ls = r->res_ls;
381 rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
382 &ls->ls_rsbtbl_lock);
384 write_unlock_bh(&ls->ls_rsbtbl_lock);
387 void dlm_put_rsb(struct dlm_rsb *r)
392 static int pre_rsb_struct(struct dlm_ls *ls)
394 struct dlm_rsb *r1, *r2;
397 spin_lock_bh(&ls->ls_new_rsb_spin);
398 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
399 spin_unlock_bh(&ls->ls_new_rsb_spin);
402 spin_unlock_bh(&ls->ls_new_rsb_spin);
404 r1 = dlm_allocate_rsb(ls);
405 r2 = dlm_allocate_rsb(ls);
407 spin_lock_bh(&ls->ls_new_rsb_spin);
409 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
410 ls->ls_new_rsb_count++;
413 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
414 ls->ls_new_rsb_count++;
416 count = ls->ls_new_rsb_count;
417 spin_unlock_bh(&ls->ls_new_rsb_spin);
424 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
425 * new timers when recovery is triggered and don't run them
426 * again until a dlm_timer_resume() tries it again.
428 static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
430 if (!dlm_locking_stopped(ls))
431 mod_timer(&ls->ls_timer, jiffies);
434 /* This function tries to resume the timer callback if a rsb
435 * is on the toss list and no timer is pending. It might that
436 * the first entry is on currently executed as timer callback
437 * but we don't care if a timer queued up again and does
438 * nothing. Should be a rare case.
440 void dlm_timer_resume(struct dlm_ls *ls)
444 spin_lock_bh(&ls->ls_toss_q_lock);
445 r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
447 if (r && !timer_pending(&ls->ls_timer))
448 __rsb_mod_timer(ls, r->res_toss_time);
449 spin_unlock_bh(&ls->ls_toss_q_lock);
452 /* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
453 static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
455 struct dlm_rsb *first;
457 spin_lock_bh(&ls->ls_toss_q_lock);
458 r->res_toss_time = 0;
460 /* if the rsb is not queued do nothing */
461 if (list_empty(&r->res_toss_q_list))
464 /* get the first element before delete */
465 first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
467 list_del_init(&r->res_toss_q_list);
468 /* check if the first element was the rsb we deleted */
470 /* try to get the new first element, if the list
471 * is empty now try to delete the timer, if we are
472 * too late we don't care.
474 * if the list isn't empty and a new first element got
475 * in place, set the new timer expire time.
477 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
480 timer_delete(&ls->ls_timer);
482 __rsb_mod_timer(ls, first->res_toss_time);
486 spin_unlock_bh(&ls->ls_toss_q_lock);
489 /* Caller must held ls_rsbtbl_lock and need to be called every time
490 * when either the rsb enters toss state or the toss state changes
491 * the dir/master nodeid.
493 static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
495 int our_nodeid = dlm_our_nodeid();
496 struct dlm_rsb *first;
498 /* If we're the directory record for this rsb, and
499 * we're not the master of it, then we need to wait
500 * for the master node to send us a dir remove for
501 * before removing the dir record.
503 if (!dlm_no_directory(ls) &&
504 (r->res_master_nodeid != our_nodeid) &&
505 (dlm_dir_nodeid(r) == our_nodeid)) {
506 rsb_delete_toss_timer(ls, r);
510 spin_lock_bh(&ls->ls_toss_q_lock);
511 /* set the new rsb absolute expire time in the rsb */
512 r->res_toss_time = rsb_toss_jiffies();
513 if (list_empty(&ls->ls_toss_q)) {
514 /* if the queue is empty add the element and it's
515 * our new expire time
517 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
518 __rsb_mod_timer(ls, r->res_toss_time);
520 /* check if the rsb was already queued, if so delete
521 * it from the toss queue
523 if (!list_empty(&r->res_toss_q_list))
524 list_del(&r->res_toss_q_list);
526 /* try to get the maybe new first element and then add
527 * to this rsb with the oldest expire time to the end
528 * of the queue. If the list was empty before this
529 * rsb expire time is our next expiration if it wasn't
530 * the now new first elemet is our new expiration time
532 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
534 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
536 __rsb_mod_timer(ls, r->res_toss_time);
538 __rsb_mod_timer(ls, first->res_toss_time);
540 spin_unlock_bh(&ls->ls_toss_q_lock);
543 /* if we hit contention we do in 250 ms a retry to trylock.
544 * if there is any other mod_timer in between we don't care
545 * about that it expires earlier again this is only for the
546 * unlikely case nothing happened in this time.
548 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
550 void dlm_rsb_toss_timer(struct timer_list *timer)
552 struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
553 int our_nodeid = dlm_our_nodeid();
558 /* interrupting point to leave iteration when
559 * recovery waits for timer_delete_sync(), recovery
560 * will take care to delete everything in toss queue.
562 if (dlm_locking_stopped(ls))
565 rv = spin_trylock(&ls->ls_toss_q_lock);
567 /* rearm again try timer */
568 __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
572 r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
575 /* nothing to do anymore next rsb queue will
576 * set next mod_timer() expire.
578 spin_unlock(&ls->ls_toss_q_lock);
582 /* test if the first rsb isn't expired yet, if
583 * so we stop freeing rsb from toss queue as
584 * the order in queue is ascending to the
585 * absolute res_toss_time jiffies
587 if (time_before(jiffies, r->res_toss_time)) {
588 /* rearm with the next rsb to expire in the future */
589 __rsb_mod_timer(ls, r->res_toss_time);
590 spin_unlock(&ls->ls_toss_q_lock);
594 /* in find_rsb_dir/nodir there is a reverse order of this
595 * lock, however this is only a trylock if we hit some
596 * possible contention we try it again.
598 * This lock synchronized while holding ls_toss_q_lock
599 * synchronize everything that rsb_delete_toss_timer()
600 * or rsb_mod_timer() can't run after this timer callback
601 * deletes the rsb from the ls_toss_q. Whereas the other
602 * holders have always a priority to run as this is only
603 * a caching handling and the other holders might to put
604 * this rsb out of the toss state.
606 rv = write_trylock(&ls->ls_rsbtbl_lock);
608 spin_unlock(&ls->ls_toss_q_lock);
609 /* rearm again try timer */
610 __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
614 list_del(&r->res_rsbs_list);
615 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
616 dlm_rhash_rsb_params);
618 /* not necessary to held the ls_rsbtbl_lock when
619 * calling send_remove()
621 write_unlock(&ls->ls_rsbtbl_lock);
623 /* remove the rsb out of the toss queue its gone
626 list_del_init(&r->res_toss_q_list);
627 spin_unlock(&ls->ls_toss_q_lock);
629 /* no rsb in this state should ever run a timer */
630 WARN_ON(!dlm_no_directory(ls) &&
631 (r->res_master_nodeid != our_nodeid) &&
632 (dlm_dir_nodeid(r) == our_nodeid));
634 /* We're the master of this rsb but we're not
635 * the directory record, so we need to tell the
636 * dir node to remove the dir record
638 if (!dlm_no_directory(ls) &&
639 (r->res_master_nodeid == our_nodeid) &&
640 (dlm_dir_nodeid(r) != our_nodeid))
647 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
648 unlock any spinlocks, go back and call pre_rsb_struct again.
649 Otherwise, take an rsb off the list and return it. */
651 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
652 struct dlm_rsb **r_ret)
657 spin_lock_bh(&ls->ls_new_rsb_spin);
658 if (list_empty(&ls->ls_new_rsb)) {
659 count = ls->ls_new_rsb_count;
660 spin_unlock_bh(&ls->ls_new_rsb_spin);
661 log_debug(ls, "find_rsb retry %d %d %s",
662 count, dlm_config.ci_new_rsb_count,
667 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
668 list_del(&r->res_hashchain);
669 ls->ls_new_rsb_count--;
670 spin_unlock_bh(&ls->ls_new_rsb_spin);
674 memcpy(r->res_name, name, len);
675 spin_lock_init(&r->res_lock);
677 INIT_LIST_HEAD(&r->res_lookup);
678 INIT_LIST_HEAD(&r->res_grantqueue);
679 INIT_LIST_HEAD(&r->res_convertqueue);
680 INIT_LIST_HEAD(&r->res_waitqueue);
681 INIT_LIST_HEAD(&r->res_root_list);
682 INIT_LIST_HEAD(&r->res_toss_q_list);
683 INIT_LIST_HEAD(&r->res_recover_list);
684 INIT_LIST_HEAD(&r->res_masters_list);
690 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
691 struct dlm_rsb **r_ret)
693 char key[DLM_RESNAME_MAXLEN] = {};
695 memcpy(key, name, len);
696 *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
703 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
705 return rhashtable_insert_fast(rhash, &rsb->res_node,
706 dlm_rhash_rsb_params);
710 * Find rsb in rsbtbl and potentially create/add one
712 * Delaying the release of rsb's has a similar benefit to applications keeping
713 * NL locks on an rsb, but without the guarantee that the cached master value
714 * will still be valid when the rsb is reused. Apps aren't always smart enough
715 * to keep NL locks on an rsb that they may lock again shortly; this can lead
716 * to excessive master lookups and removals if we don't delay the release.
718 * Searching for an rsb means looking through both the normal list and toss
719 * list. When found on the toss list the rsb is moved to the normal list with
720 * ref count of 1; when found on normal list the ref count is incremented.
722 * rsb's on the keep list are being used locally and refcounted.
723 * rsb's on the toss list are not being used locally, and are not refcounted.
725 * The toss list rsb's were either
726 * - previously used locally but not any more (were on keep list, then
727 * moved to toss list when last refcount dropped)
728 * - created and put on toss list as a directory record for a lookup
729 * (we are the dir node for the res, but are not using the res right now,
730 * but some other node is)
732 * The purpose of find_rsb() is to return a refcounted rsb for local use.
733 * So, if the given rsb is on the toss list, it is moved to the keep list
734 * before being returned.
736 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
737 * more refcounts exist, so the rsb is moved from the keep list to the
740 * rsb's on both keep and toss lists are used for doing a name to master
741 * lookups. rsb's that are in use locally (and being refcounted) are on
742 * the keep list, rsb's that are not in use locally (not refcounted) and
743 * only exist for name/master lookups are on the toss list.
745 * rsb's on the toss list who's dir_nodeid is not local can have stale
746 * name/master mappings. So, remote requests on such rsb's can potentially
747 * return with an error, which means the mapping is stale and needs to
748 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
749 * first_lkid is to keep only a single outstanding request on an rsb
750 * while that rsb has a potentially stale master.)
753 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
754 uint32_t hash, int dir_nodeid, int from_nodeid,
755 unsigned int flags, struct dlm_rsb **r_ret)
757 struct dlm_rsb *r = NULL;
758 int our_nodeid = dlm_our_nodeid();
765 if (flags & R_RECEIVE_REQUEST) {
766 if (from_nodeid == dir_nodeid)
770 } else if (flags & R_REQUEST) {
775 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
776 * from_nodeid has sent us a lock in dlm_recover_locks, believing
777 * we're the new master. Our local recovery may not have set
778 * res_master_nodeid to our_nodeid yet, so allow either. Don't
779 * create the rsb; dlm_recover_process_copy() will handle EBADR
782 * If someone sends us a request, we are the dir node, and we do
783 * not find the rsb anywhere, then recreate it. This happens if
784 * someone sends us a request after we have removed/freed an rsb
785 * from our toss list. (They sent a request instead of lookup
786 * because they are using an rsb from their toss list.)
789 if (from_local || from_dir ||
790 (from_other && (dir_nodeid == our_nodeid))) {
796 error = pre_rsb_struct(ls);
803 /* check if the rsb is in keep state under read lock - likely path */
804 read_lock_bh(&ls->ls_rsbtbl_lock);
805 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
807 read_unlock_bh(&ls->ls_rsbtbl_lock);
812 * rsb is active, so we can't check master_nodeid without lock_rsb.
815 if (rsb_flag(r, RSB_TOSS)) {
816 read_unlock_bh(&ls->ls_rsbtbl_lock);
820 kref_get(&r->res_ref);
821 read_unlock_bh(&ls->ls_rsbtbl_lock);
826 write_lock_bh(&ls->ls_rsbtbl_lock);
828 /* retry lookup under write lock to see if its still in toss state
829 * if not it's in keep state and we relookup - unlikely path.
831 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
833 if (!rsb_flag(r, RSB_TOSS)) {
834 write_unlock_bh(&ls->ls_rsbtbl_lock);
838 write_unlock_bh(&ls->ls_rsbtbl_lock);
843 * rsb found inactive (master_nodeid may be out of date unless
844 * we are the dir_nodeid or were the master) No other thread
845 * is using this rsb because it's on the toss list, so we can
846 * look at or update res_master_nodeid without lock_rsb.
849 if ((r->res_master_nodeid != our_nodeid) && from_other) {
850 /* our rsb was not master, and another node (not the dir node)
851 has sent us a request */
852 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
853 from_nodeid, r->res_master_nodeid, dir_nodeid,
855 write_unlock_bh(&ls->ls_rsbtbl_lock);
860 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
861 /* don't think this should ever happen */
862 log_error(ls, "find_rsb toss from_dir %d master %d",
863 from_nodeid, r->res_master_nodeid);
865 /* fix it and go on */
866 r->res_master_nodeid = our_nodeid;
868 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
869 r->res_first_lkid = 0;
872 if (from_local && (r->res_master_nodeid != our_nodeid)) {
873 /* Because we have held no locks on this rsb,
874 res_master_nodeid could have become stale. */
875 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
876 r->res_first_lkid = 0;
879 list_move(&r->res_rsbs_list, &ls->ls_keep);
880 rsb_clear_flag(r, RSB_TOSS);
881 /* rsb got out of toss state, it becomes alive again
882 * and we reinit the reference counter that is only
883 * valid for keep state rsbs
885 kref_init(&r->res_ref);
886 rsb_delete_toss_timer(ls, r);
887 write_unlock_bh(&ls->ls_rsbtbl_lock);
897 if (error == -EBADR && !create)
900 error = get_rsb_struct(ls, name, len, &r);
901 if (error == -EAGAIN)
907 r->res_dir_nodeid = dir_nodeid;
908 kref_init(&r->res_ref);
911 /* want to see how often this happens */
912 log_debug(ls, "find_rsb new from_dir %d recreate %s",
913 from_nodeid, r->res_name);
914 r->res_master_nodeid = our_nodeid;
919 if (from_other && (dir_nodeid != our_nodeid)) {
920 /* should never happen */
921 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
922 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
930 log_debug(ls, "find_rsb new from_other %d dir %d %s",
931 from_nodeid, dir_nodeid, r->res_name);
934 if (dir_nodeid == our_nodeid) {
935 /* When we are the dir nodeid, we can set the master
937 r->res_master_nodeid = our_nodeid;
940 /* set_master will send_lookup to dir_nodeid */
941 r->res_master_nodeid = 0;
947 write_lock_bh(&ls->ls_rsbtbl_lock);
948 error = rsb_insert(r, &ls->ls_rsbtbl);
949 if (error == -EEXIST) {
950 /* somebody else was faster and it seems the
951 * rsb exists now, we do a whole relookup
953 write_unlock_bh(&ls->ls_rsbtbl_lock);
957 list_add(&r->res_rsbs_list, &ls->ls_keep);
959 write_unlock_bh(&ls->ls_rsbtbl_lock);
965 /* During recovery, other nodes can send us new MSTCPY locks (from
966 dlm_recover_locks) before we've made ourself master (in
967 dlm_recover_masters). */
969 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
970 uint32_t hash, int dir_nodeid, int from_nodeid,
971 unsigned int flags, struct dlm_rsb **r_ret)
973 struct dlm_rsb *r = NULL;
974 int our_nodeid = dlm_our_nodeid();
975 int recover = (flags & R_RECEIVE_RECOVER);
979 error = pre_rsb_struct(ls);
985 /* check if the rsb is in keep state under read lock - likely path */
986 read_lock_bh(&ls->ls_rsbtbl_lock);
987 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
989 read_unlock_bh(&ls->ls_rsbtbl_lock);
993 if (rsb_flag(r, RSB_TOSS)) {
994 read_unlock_bh(&ls->ls_rsbtbl_lock);
999 * rsb is active, so we can't check master_nodeid without lock_rsb.
1002 kref_get(&r->res_ref);
1003 read_unlock_bh(&ls->ls_rsbtbl_lock);
1009 write_lock_bh(&ls->ls_rsbtbl_lock);
1011 /* retry lookup under write lock to see if its still in toss state
1012 * if not it's in keep state and we relookup - unlikely path.
1014 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1016 if (!rsb_flag(r, RSB_TOSS)) {
1017 write_unlock_bh(&ls->ls_rsbtbl_lock);
1021 write_unlock_bh(&ls->ls_rsbtbl_lock);
1027 * rsb found inactive. No other thread is using this rsb because
1028 * it's on the toss list, so we can look at or update
1029 * res_master_nodeid without lock_rsb.
1032 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
1033 /* our rsb is not master, and another node has sent us a
1034 request; this should never happen */
1035 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
1036 from_nodeid, r->res_master_nodeid, dir_nodeid);
1038 write_unlock_bh(&ls->ls_rsbtbl_lock);
1043 if (!recover && (r->res_master_nodeid != our_nodeid) &&
1044 (dir_nodeid == our_nodeid)) {
1045 /* our rsb is not master, and we are dir; may as well fix it;
1046 this should never happen */
1047 log_error(ls, "find_rsb toss our %d master %d dir %d",
1048 our_nodeid, r->res_master_nodeid, dir_nodeid);
1050 r->res_master_nodeid = our_nodeid;
1054 list_move(&r->res_rsbs_list, &ls->ls_keep);
1055 rsb_clear_flag(r, RSB_TOSS);
1056 /* rsb got out of toss state, it becomes alive again
1057 * and we reinit the reference counter that is only
1058 * valid for keep state rsbs
1060 kref_init(&r->res_ref);
1061 rsb_delete_toss_timer(ls, r);
1062 write_unlock_bh(&ls->ls_rsbtbl_lock);
1072 error = get_rsb_struct(ls, name, len, &r);
1073 if (error == -EAGAIN) {
1080 r->res_dir_nodeid = dir_nodeid;
1081 r->res_master_nodeid = dir_nodeid;
1082 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1083 kref_init(&r->res_ref);
1085 write_lock_bh(&ls->ls_rsbtbl_lock);
1086 error = rsb_insert(r, &ls->ls_rsbtbl);
1087 if (error == -EEXIST) {
1088 /* somebody else was faster and it seems the
1089 * rsb exists now, we do a whole relookup
1091 write_unlock_bh(&ls->ls_rsbtbl_lock);
1094 } else if (!error) {
1095 list_add(&r->res_rsbs_list, &ls->ls_keep);
1097 write_unlock_bh(&ls->ls_rsbtbl_lock);
1104 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1105 int from_nodeid, unsigned int flags,
1106 struct dlm_rsb **r_ret)
1111 if (len > DLM_RESNAME_MAXLEN)
1114 hash = jhash(name, len, 0);
1115 dir_nodeid = dlm_hash2nodeid(ls, hash);
1117 if (dlm_no_directory(ls))
1118 return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1119 from_nodeid, flags, r_ret);
1121 return find_rsb_dir(ls, name, len, hash, dir_nodeid,
1122 from_nodeid, flags, r_ret);
1125 /* we have received a request and found that res_master_nodeid != our_nodeid,
1126 so we need to return an error or make ourself the master */
1128 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1131 if (dlm_no_directory(ls)) {
1132 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1133 from_nodeid, r->res_master_nodeid,
1139 if (from_nodeid != r->res_dir_nodeid) {
1140 /* our rsb is not master, and another node (not the dir node)
1141 has sent us a request. this is much more common when our
1142 master_nodeid is zero, so limit debug to non-zero. */
1144 if (r->res_master_nodeid) {
1145 log_debug(ls, "validate master from_other %d master %d "
1146 "dir %d first %x %s", from_nodeid,
1147 r->res_master_nodeid, r->res_dir_nodeid,
1148 r->res_first_lkid, r->res_name);
1152 /* our rsb is not master, but the dir nodeid has sent us a
1153 request; this could happen with master 0 / res_nodeid -1 */
1155 if (r->res_master_nodeid) {
1156 log_error(ls, "validate master from_dir %d master %d "
1158 from_nodeid, r->res_master_nodeid,
1159 r->res_first_lkid, r->res_name);
1162 r->res_master_nodeid = dlm_our_nodeid();
1168 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1169 int from_nodeid, bool toss_list, unsigned int flags,
1170 int *r_nodeid, int *result)
1172 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1173 int from_master = (flags & DLM_LU_RECOVER_DIR);
1175 if (r->res_dir_nodeid != our_nodeid) {
1176 /* should not happen, but may as well fix it and carry on */
1177 log_error(ls, "%s res_dir %d our %d %s", __func__,
1178 r->res_dir_nodeid, our_nodeid, r->res_name);
1179 r->res_dir_nodeid = our_nodeid;
1182 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
1183 /* Recovery uses this function to set a new master when
1184 * the previous master failed. Setting NEW_MASTER will
1185 * force dlm_recover_masters to call recover_master on this
1186 * rsb even though the res_nodeid is no longer removed.
1189 r->res_master_nodeid = from_nodeid;
1190 r->res_nodeid = from_nodeid;
1191 rsb_set_flag(r, RSB_NEW_MASTER);
1194 /* I don't think we should ever find it on toss list. */
1195 log_error(ls, "%s fix_master on toss", __func__);
1200 if (from_master && (r->res_master_nodeid != from_nodeid)) {
1201 /* this will happen if from_nodeid became master during
1202 * a previous recovery cycle, and we aborted the previous
1203 * cycle before recovering this master value
1206 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1207 __func__, from_nodeid, r->res_master_nodeid,
1208 r->res_nodeid, r->res_first_lkid, r->res_name);
1210 if (r->res_master_nodeid == our_nodeid) {
1211 log_error(ls, "from_master %d our_master", from_nodeid);
1216 r->res_master_nodeid = from_nodeid;
1217 r->res_nodeid = from_nodeid;
1218 rsb_set_flag(r, RSB_NEW_MASTER);
1221 if (!r->res_master_nodeid) {
1222 /* this will happen if recovery happens while we're looking
1223 * up the master for this rsb
1226 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1227 from_nodeid, r->res_first_lkid, r->res_name);
1228 r->res_master_nodeid = from_nodeid;
1229 r->res_nodeid = from_nodeid;
1232 if (!from_master && !fix_master &&
1233 (r->res_master_nodeid == from_nodeid)) {
1234 /* this can happen when the master sends remove, the dir node
1235 * finds the rsb on the keep list and ignores the remove,
1236 * and the former master sends a lookup
1239 log_limit(ls, "%s from master %d flags %x first %x %s",
1240 __func__, from_nodeid, flags, r->res_first_lkid,
1245 *r_nodeid = r->res_master_nodeid;
1247 *result = DLM_LU_MATCH;
1251 * We're the dir node for this res and another node wants to know the
1252 * master nodeid. During normal operation (non recovery) this is only
1253 * called from receive_lookup(); master lookups when the local node is
1254 * the dir node are done by find_rsb().
1256 * normal operation, we are the dir node for a resource
1261 * . dlm_master_lookup flags 0
1263 * recover directory, we are rebuilding dir for all resources
1264 * . dlm_recover_directory
1266 * remote node sends back the rsb names it is master of and we are dir of
1267 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1268 * we either create new rsb setting remote node as master, or find existing
1269 * rsb and set master to be the remote node.
1271 * recover masters, we are finding the new master for resources
1272 * . dlm_recover_masters
1274 * . dlm_send_rcom_lookup
1275 * . receive_rcom_lookup
1276 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1279 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1280 int len, unsigned int flags, int *r_nodeid, int *result)
1282 struct dlm_rsb *r = NULL;
1284 int our_nodeid = dlm_our_nodeid();
1285 int dir_nodeid, error;
1287 if (len > DLM_RESNAME_MAXLEN)
1290 if (from_nodeid == our_nodeid) {
1291 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1296 hash = jhash(name, len, 0);
1297 dir_nodeid = dlm_hash2nodeid(ls, hash);
1298 if (dir_nodeid != our_nodeid) {
1299 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1300 from_nodeid, dir_nodeid, our_nodeid, hash,
1307 error = pre_rsb_struct(ls);
1313 /* check if the rsb is in keep state under read lock - likely path */
1314 read_lock_bh(&ls->ls_rsbtbl_lock);
1315 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1317 if (rsb_flag(r, RSB_TOSS)) {
1318 read_unlock_bh(&ls->ls_rsbtbl_lock);
1322 /* because the rsb is active, we need to lock_rsb before
1323 * checking/changing re_master_nodeid
1327 read_unlock_bh(&ls->ls_rsbtbl_lock);
1330 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1331 flags, r_nodeid, result);
1333 /* the rsb was active */
1339 read_unlock_bh(&ls->ls_rsbtbl_lock);
1344 /* unlikely path - relookup under write */
1345 write_lock_bh(&ls->ls_rsbtbl_lock);
1347 /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
1348 * check if the rsb is still in toss state, if not relookup
1350 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1352 if (!rsb_flag(r, RSB_TOSS)) {
1353 write_unlock_bh(&ls->ls_rsbtbl_lock);
1354 /* something as changed, very unlikely but
1360 write_unlock_bh(&ls->ls_rsbtbl_lock);
1364 /* because the rsb is inactive (on toss list), it's not refcounted
1365 * and lock_rsb is not used, but is protected by the rsbtbl lock
1368 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1371 rsb_mod_timer(ls, r);
1372 /* the rsb was inactive (on toss list) */
1373 write_unlock_bh(&ls->ls_rsbtbl_lock);
1378 error = get_rsb_struct(ls, name, len, &r);
1379 if (error == -EAGAIN)
1385 r->res_dir_nodeid = our_nodeid;
1386 r->res_master_nodeid = from_nodeid;
1387 r->res_nodeid = from_nodeid;
1388 kref_init(&r->res_ref);
1389 rsb_set_flag(r, RSB_TOSS);
1391 write_lock_bh(&ls->ls_rsbtbl_lock);
1392 error = rsb_insert(r, &ls->ls_rsbtbl);
1393 if (error == -EEXIST) {
1394 /* somebody else was faster and it seems the
1395 * rsb exists now, we do a whole relookup
1397 write_unlock_bh(&ls->ls_rsbtbl_lock);
1401 write_unlock_bh(&ls->ls_rsbtbl_lock);
1402 /* should never happen */
1407 list_add(&r->res_rsbs_list, &ls->ls_toss);
1408 rsb_mod_timer(ls, r);
1409 write_unlock_bh(&ls->ls_rsbtbl_lock);
1412 *result = DLM_LU_ADD;
1413 *r_nodeid = from_nodeid;
1418 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1422 read_lock_bh(&ls->ls_rsbtbl_lock);
1423 list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
1424 if (r->res_hash == hash)
1427 read_unlock_bh(&ls->ls_rsbtbl_lock);
1430 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1432 struct dlm_rsb *r = NULL;
1435 read_lock_bh(&ls->ls_rsbtbl_lock);
1436 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1442 read_unlock_bh(&ls->ls_rsbtbl_lock);
1445 static void toss_rsb(struct kref *kref)
1447 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1448 struct dlm_ls *ls = r->res_ls;
1450 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1451 rsb_set_flag(r, RSB_TOSS);
1452 list_move(&r->res_rsbs_list, &ls->ls_toss);
1453 rsb_mod_timer(ls, r);
1455 if (r->res_lvbptr) {
1456 dlm_free_lvb(r->res_lvbptr);
1457 r->res_lvbptr = NULL;
1461 /* See comment for unhold_lkb */
1463 static void unhold_rsb(struct dlm_rsb *r)
1467 /* rsbs in toss state never get referenced */
1468 WARN_ON(rsb_flag(r, RSB_TOSS));
1469 rv = kref_put(&r->res_ref, toss_rsb);
1470 DLM_ASSERT(!rv, dlm_dump_rsb(r););
1473 void free_toss_rsb(struct dlm_rsb *r)
1475 WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
1477 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1478 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1479 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1480 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1481 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1482 DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
1483 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1484 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1489 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1490 The rsb must exist as long as any lkb's for it do. */
1492 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1495 lkb->lkb_resource = r;
1498 static void detach_lkb(struct dlm_lkb *lkb)
1500 if (lkb->lkb_resource) {
1501 put_rsb(lkb->lkb_resource);
1502 lkb->lkb_resource = NULL;
1506 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1509 struct dlm_lkb *lkb;
1512 lkb = dlm_allocate_lkb(ls);
1516 lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1517 lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1518 lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1519 lkb->lkb_nodeid = -1;
1520 lkb->lkb_grmode = DLM_LOCK_IV;
1521 kref_init(&lkb->lkb_ref);
1522 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1523 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1525 write_lock_bh(&ls->ls_lkbidr_lock);
1526 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1529 write_unlock_bh(&ls->ls_lkbidr_lock);
1532 log_error(ls, "create_lkb idr error %d", rv);
1541 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1543 return _create_lkb(ls, lkb_ret, 1, 0);
1546 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1548 struct dlm_lkb *lkb;
1550 read_lock_bh(&ls->ls_lkbidr_lock);
1551 lkb = idr_find(&ls->ls_lkbidr, lkid);
1553 kref_get(&lkb->lkb_ref);
1554 read_unlock_bh(&ls->ls_lkbidr_lock);
1557 return lkb ? 0 : -ENOENT;
1560 static void kill_lkb(struct kref *kref)
1562 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1564 /* All work is done after the return from kref_put() so we
1565 can release the write_lock before the detach_lkb */
1567 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1570 /* __put_lkb() is used when an lkb may not have an rsb attached to
1571 it so we need to provide the lockspace explicitly */
1573 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1575 uint32_t lkid = lkb->lkb_id;
1578 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1579 &ls->ls_lkbidr_lock);
1581 idr_remove(&ls->ls_lkbidr, lkid);
1582 write_unlock_bh(&ls->ls_lkbidr_lock);
1586 /* for local/process lkbs, lvbptr points to caller's lksb */
1587 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1588 dlm_free_lvb(lkb->lkb_lvbptr);
1595 int dlm_put_lkb(struct dlm_lkb *lkb)
1599 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1600 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1602 ls = lkb->lkb_resource->res_ls;
1603 return __put_lkb(ls, lkb);
1606 /* This is only called to add a reference when the code already holds
1607 a valid reference to the lkb, so there's no need for locking. */
1609 static inline void hold_lkb(struct dlm_lkb *lkb)
1611 kref_get(&lkb->lkb_ref);
1614 static void unhold_lkb_assert(struct kref *kref)
1616 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1618 DLM_ASSERT(false, dlm_print_lkb(lkb););
1621 /* This is called when we need to remove a reference and are certain
1622 it's not the last ref. e.g. del_lkb is always called between a
1623 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1624 put_lkb would work fine, but would involve unnecessary locking */
1626 static inline void unhold_lkb(struct dlm_lkb *lkb)
1628 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1631 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1634 struct dlm_lkb *lkb = NULL, *iter;
1636 list_for_each_entry(iter, head, lkb_statequeue)
1637 if (iter->lkb_rqmode < mode) {
1639 list_add_tail(new, &iter->lkb_statequeue);
1644 list_add_tail(new, head);
1647 /* add/remove lkb to rsb's grant/convert/wait queue */
1649 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1651 kref_get(&lkb->lkb_ref);
1653 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1655 lkb->lkb_timestamp = ktime_get();
1657 lkb->lkb_status = status;
1660 case DLM_LKSTS_WAITING:
1661 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1662 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1664 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1666 case DLM_LKSTS_GRANTED:
1667 /* convention says granted locks kept in order of grmode */
1668 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1671 case DLM_LKSTS_CONVERT:
1672 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1673 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1675 list_add_tail(&lkb->lkb_statequeue,
1676 &r->res_convertqueue);
1679 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1683 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1685 lkb->lkb_status = 0;
1686 list_del(&lkb->lkb_statequeue);
1690 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1694 add_lkb(r, lkb, sts);
1698 static int msg_reply_type(int mstype)
1701 case DLM_MSG_REQUEST:
1702 return DLM_MSG_REQUEST_REPLY;
1703 case DLM_MSG_CONVERT:
1704 return DLM_MSG_CONVERT_REPLY;
1705 case DLM_MSG_UNLOCK:
1706 return DLM_MSG_UNLOCK_REPLY;
1707 case DLM_MSG_CANCEL:
1708 return DLM_MSG_CANCEL_REPLY;
1709 case DLM_MSG_LOOKUP:
1710 return DLM_MSG_LOOKUP_REPLY;
1715 /* add/remove lkb from global waiters list of lkb's waiting for
1716 a reply from a remote node */
1718 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1720 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1723 spin_lock_bh(&ls->ls_waiters_lock);
1725 if (is_overlap_unlock(lkb) ||
1726 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1731 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1733 case DLM_MSG_UNLOCK:
1734 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1736 case DLM_MSG_CANCEL:
1737 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1743 lkb->lkb_wait_count++;
1746 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748 lkb->lkb_wait_count, dlm_iflags_val(lkb));
1752 DLM_ASSERT(!lkb->lkb_wait_count,
1754 printk("wait_count %d\n", lkb->lkb_wait_count););
1756 lkb->lkb_wait_count++;
1757 lkb->lkb_wait_type = mstype;
1758 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1760 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1763 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1764 lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1765 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1766 spin_unlock_bh(&ls->ls_waiters_lock);
1770 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1771 list as part of process_requestqueue (e.g. a lookup that has an optimized
1772 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1773 set RESEND and dlm_recover_waiters_post() */
1775 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1776 const struct dlm_message *ms)
1778 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1779 int overlap_done = 0;
1781 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1782 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1783 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1788 if (mstype == DLM_MSG_CANCEL_REPLY &&
1789 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1790 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1795 /* Cancel state was preemptively cleared by a successful convert,
1796 see next comment, nothing to do. */
1798 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1799 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1800 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1801 lkb->lkb_id, lkb->lkb_wait_type);
1805 /* Remove for the convert reply, and premptively remove for the
1806 cancel reply. A convert has been granted while there's still
1807 an outstanding cancel on it (the cancel is moot and the result
1808 in the cancel reply should be 0). We preempt the cancel reply
1809 because the app gets the convert result and then can follow up
1810 with another op, like convert. This subsequent op would see the
1811 lingering state of the cancel and fail with -EBUSY. */
1813 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1814 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1815 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1816 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1818 lkb->lkb_wait_type = 0;
1819 lkb->lkb_wait_count--;
1824 /* N.B. type of reply may not always correspond to type of original
1825 msg due to lookup->request optimization, verify others? */
1827 if (lkb->lkb_wait_type) {
1828 lkb->lkb_wait_type = 0;
1832 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1833 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1834 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1838 /* the force-unlock/cancel has completed and we haven't recvd a reply
1839 to the op that was in progress prior to the unlock/cancel; we
1840 give up on any reply to the earlier op. FIXME: not sure when/how
1841 this would happen */
1843 if (overlap_done && lkb->lkb_wait_type) {
1844 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1845 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1846 lkb->lkb_wait_count--;
1848 lkb->lkb_wait_type = 0;
1851 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1853 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1854 lkb->lkb_wait_count--;
1855 if (!lkb->lkb_wait_count)
1856 list_del_init(&lkb->lkb_wait_reply);
1861 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1866 spin_lock_bh(&ls->ls_waiters_lock);
1867 error = _remove_from_waiters(lkb, mstype, NULL);
1868 spin_unlock_bh(&ls->ls_waiters_lock);
1872 /* Handles situations where we might be processing a "fake" or "local" reply in
1873 * the recovery context which stops any locking activity. Only debugfs might
1874 * change the lockspace waiters but they will held the recovery lock to ensure
1875 * remove_from_waiters_ms() in local case will be the only user manipulating the
1876 * lockspace waiters in recovery context.
1879 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1880 const struct dlm_message *ms, bool local)
1882 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1886 spin_lock_bh(&ls->ls_waiters_lock);
1888 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1889 !dlm_locking_stopped(ls));
1890 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1892 spin_unlock_bh(&ls->ls_waiters_lock);
1896 /* lkb is master or local copy */
1898 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1900 int b, len = r->res_ls->ls_lvblen;
1902 /* b=1 lvb returned to caller
1903 b=0 lvb written to rsb or invalidated
1906 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1909 if (!lkb->lkb_lvbptr)
1912 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1918 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1919 lkb->lkb_lvbseq = r->res_lvbseq;
1921 } else if (b == 0) {
1922 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1923 rsb_set_flag(r, RSB_VALNOTVALID);
1927 if (!lkb->lkb_lvbptr)
1930 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1934 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1939 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1941 lkb->lkb_lvbseq = r->res_lvbseq;
1942 rsb_clear_flag(r, RSB_VALNOTVALID);
1945 if (rsb_flag(r, RSB_VALNOTVALID))
1946 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1949 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1951 if (lkb->lkb_grmode < DLM_LOCK_PW)
1954 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1955 rsb_set_flag(r, RSB_VALNOTVALID);
1959 if (!lkb->lkb_lvbptr)
1962 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1966 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1971 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1973 rsb_clear_flag(r, RSB_VALNOTVALID);
1976 /* lkb is process copy (pc) */
1978 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1979 const struct dlm_message *ms)
1983 if (!lkb->lkb_lvbptr)
1986 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1989 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1991 int len = receive_extralen(ms);
1992 if (len > r->res_ls->ls_lvblen)
1993 len = r->res_ls->ls_lvblen;
1994 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1995 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1999 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2000 remove_lock -- used for unlock, removes lkb from granted
2001 revert_lock -- used for cancel, moves lkb from convert to granted
2002 grant_lock -- used for request and convert, adds lkb to granted or
2003 moves lkb from convert or waiting to granted
2005 Each of these is used for master or local copy lkb's. There is
2006 also a _pc() variation used to make the corresponding change on
2007 a process copy (pc) lkb. */
2009 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2012 lkb->lkb_grmode = DLM_LOCK_IV;
2013 /* this unhold undoes the original ref from create_lkb()
2014 so this leads to the lkb being freed */
2018 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 set_lvb_unlock(r, lkb);
2021 _remove_lock(r, lkb);
2024 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2026 _remove_lock(r, lkb);
2029 /* returns: 0 did nothing
2030 1 moved lock to granted
2033 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2037 lkb->lkb_rqmode = DLM_LOCK_IV;
2039 switch (lkb->lkb_status) {
2040 case DLM_LKSTS_GRANTED:
2042 case DLM_LKSTS_CONVERT:
2043 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2046 case DLM_LKSTS_WAITING:
2048 lkb->lkb_grmode = DLM_LOCK_IV;
2049 /* this unhold undoes the original ref from create_lkb()
2050 so this leads to the lkb being freed */
2055 log_print("invalid status for revert %d", lkb->lkb_status);
2060 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2062 return revert_lock(r, lkb);
2065 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2067 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2068 lkb->lkb_grmode = lkb->lkb_rqmode;
2069 if (lkb->lkb_status)
2070 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2072 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2075 lkb->lkb_rqmode = DLM_LOCK_IV;
2076 lkb->lkb_highbast = 0;
2079 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2081 set_lvb_lock(r, lkb);
2082 _grant_lock(r, lkb);
2085 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2086 const struct dlm_message *ms)
2088 set_lvb_lock_pc(r, lkb, ms);
2089 _grant_lock(r, lkb);
2092 /* called by grant_pending_locks() which means an async grant message must
2093 be sent to the requesting node in addition to granting the lock if the
2094 lkb belongs to a remote node. */
2096 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2099 if (is_master_copy(lkb))
2102 queue_cast(r, lkb, 0);
2105 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2106 change the granted/requested modes. We're munging things accordingly in
2108 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2110 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2111 compatible with other granted locks */
2113 static void munge_demoted(struct dlm_lkb *lkb)
2115 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2116 log_print("munge_demoted %x invalid modes gr %d rq %d",
2117 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2121 lkb->lkb_grmode = DLM_LOCK_NL;
2124 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2126 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2127 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2128 log_print("munge_altmode %x invalid reply type %d",
2129 lkb->lkb_id, le32_to_cpu(ms->m_type));
2133 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2134 lkb->lkb_rqmode = DLM_LOCK_PR;
2135 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2136 lkb->lkb_rqmode = DLM_LOCK_CW;
2138 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2143 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2145 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2147 if (lkb->lkb_id == first->lkb_id)
2153 /* Check if the given lkb conflicts with another lkb on the queue. */
2155 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2157 struct dlm_lkb *this;
2159 list_for_each_entry(this, head, lkb_statequeue) {
2162 if (!modes_compat(this, lkb))
2169 * "A conversion deadlock arises with a pair of lock requests in the converting
2170 * queue for one resource. The granted mode of each lock blocks the requested
2171 * mode of the other lock."
2173 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2174 * convert queue from being granted, then deadlk/demote lkb.
2177 * Granted Queue: empty
2178 * Convert Queue: NL->EX (first lock)
2179 * PR->EX (second lock)
2181 * The first lock can't be granted because of the granted mode of the second
2182 * lock and the second lock can't be granted because it's not first in the
2183 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2184 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2185 * flag set and return DEMOTED in the lksb flags.
2187 * Originally, this function detected conv-deadlk in a more limited scope:
2188 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2189 * - if lkb1 was the first entry in the queue (not just earlier), and was
2190 * blocked by the granted mode of lkb2, and there was nothing on the
2191 * granted queue preventing lkb1 from being granted immediately, i.e.
2192 * lkb2 was the only thing preventing lkb1 from being granted.
2194 * That second condition meant we'd only say there was conv-deadlk if
2195 * resolving it (by demotion) would lead to the first lock on the convert
2196 * queue being granted right away. It allowed conversion deadlocks to exist
2197 * between locks on the convert queue while they couldn't be granted anyway.
2199 * Now, we detect and take action on conversion deadlocks immediately when
2200 * they're created, even if they may not be immediately consequential. If
2201 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2202 * mode that would prevent lkb1's conversion from being granted, we do a
2203 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2204 * I think this means that the lkb_is_ahead condition below should always
2205 * be zero, i.e. there will never be conv-deadlk between two locks that are
2206 * both already on the convert queue.
2209 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2211 struct dlm_lkb *lkb1;
2212 int lkb_is_ahead = 0;
2214 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2220 if (!lkb_is_ahead) {
2221 if (!modes_compat(lkb2, lkb1))
2224 if (!modes_compat(lkb2, lkb1) &&
2225 !modes_compat(lkb1, lkb2))
2233 * Return 1 if the lock can be granted, 0 otherwise.
2234 * Also detect and resolve conversion deadlocks.
2236 * lkb is the lock to be granted
2238 * now is 1 if the function is being called in the context of the
2239 * immediate request, it is 0 if called later, after the lock has been
2242 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2245 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2248 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2251 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2254 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2255 * a new request for a NL mode lock being blocked.
2257 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2258 * request, then it would be granted. In essence, the use of this flag
2259 * tells the Lock Manager to expedite theis request by not considering
2260 * what may be in the CONVERTING or WAITING queues... As of this
2261 * writing, the EXPEDITE flag can be used only with new requests for NL
2262 * mode locks. This flag is not valid for conversion requests.
2264 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2265 * conversion or used with a non-NL requested mode. We also know an
2266 * EXPEDITE request is always granted immediately, so now must always
2267 * be 1. The full condition to grant an expedite request: (now &&
2268 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2269 * therefore be shortened to just checking the flag.
2272 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2276 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2277 * added to the remaining conditions.
2280 if (queue_conflict(&r->res_grantqueue, lkb))
2284 * 6-3: By default, a conversion request is immediately granted if the
2285 * requested mode is compatible with the modes of all other granted
2289 if (queue_conflict(&r->res_convertqueue, lkb))
2293 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2294 * locks for a recovered rsb, on which lkb's have been rebuilt.
2295 * The lkb's may have been rebuilt on the queues in a different
2296 * order than they were in on the previous master. So, granting
2297 * queued conversions in order after recovery doesn't make sense
2298 * since the order hasn't been preserved anyway. The new order
2299 * could also have created a new "in place" conversion deadlock.
2300 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2301 * After recovery, there would be no granted locks, and possibly
2302 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2303 * recovery, grant conversions without considering order.
2306 if (conv && recover)
2310 * 6-5: But the default algorithm for deciding whether to grant or
2311 * queue conversion requests does not by itself guarantee that such
2312 * requests are serviced on a "first come first serve" basis. This, in
2313 * turn, can lead to a phenomenon known as "indefinate postponement".
2315 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2316 * the system service employed to request a lock conversion. This flag
2317 * forces certain conversion requests to be queued, even if they are
2318 * compatible with the granted modes of other locks on the same
2319 * resource. Thus, the use of this flag results in conversion requests
2320 * being ordered on a "first come first servce" basis.
2322 * DCT: This condition is all about new conversions being able to occur
2323 * "in place" while the lock remains on the granted queue (assuming
2324 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2325 * doesn't _have_ to go onto the convert queue where it's processed in
2326 * order. The "now" variable is necessary to distinguish converts
2327 * being received and processed for the first time now, because once a
2328 * convert is moved to the conversion queue the condition below applies
2329 * requiring fifo granting.
2332 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2336 * Even if the convert is compat with all granted locks,
2337 * QUECVT forces it behind other locks on the convert queue.
2340 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2341 if (list_empty(&r->res_convertqueue))
2348 * The NOORDER flag is set to avoid the standard vms rules on grant
2352 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2356 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2357 * granted until all other conversion requests ahead of it are granted
2361 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2365 * 6-4: By default, a new request is immediately granted only if all
2366 * three of the following conditions are satisfied when the request is
2368 * - The queue of ungranted conversion requests for the resource is
2370 * - The queue of ungranted new requests for the resource is empty.
2371 * - The mode of the new request is compatible with the most
2372 * restrictive mode of all granted locks on the resource.
2375 if (now && !conv && list_empty(&r->res_convertqueue) &&
2376 list_empty(&r->res_waitqueue))
2380 * 6-4: Once a lock request is in the queue of ungranted new requests,
2381 * it cannot be granted until the queue of ungranted conversion
2382 * requests is empty, all ungranted new requests ahead of it are
2383 * granted and/or canceled, and it is compatible with the granted mode
2384 * of the most restrictive lock granted on the resource.
2387 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2388 first_in_list(lkb, &r->res_waitqueue))
2394 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2395 int recover, int *err)
2398 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2399 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2404 rv = _can_be_granted(r, lkb, now, recover);
2409 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2410 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2411 * cancels one of the locks.
2414 if (is_convert && can_be_queued(lkb) &&
2415 conversion_deadlock_detect(r, lkb)) {
2416 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2417 lkb->lkb_grmode = DLM_LOCK_NL;
2418 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2422 log_print("can_be_granted deadlock %x now %d",
2430 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2431 * to grant a request in a mode other than the normal rqmode. It's a
2432 * simple way to provide a big optimization to applications that can
2436 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2438 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2442 lkb->lkb_rqmode = alt;
2443 rv = _can_be_granted(r, lkb, now, 0);
2445 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2447 lkb->lkb_rqmode = rqmode;
2453 /* Returns the highest requested mode of all blocked conversions; sets
2454 cw if there's a blocked conversion to DLM_LOCK_CW. */
2456 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2457 unsigned int *count)
2459 struct dlm_lkb *lkb, *s;
2460 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2461 int hi, demoted, quit, grant_restart, demote_restart;
2470 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2471 demoted = is_demoted(lkb);
2474 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2475 grant_lock_pending(r, lkb);
2482 if (!demoted && is_demoted(lkb)) {
2483 log_print("WARN: pending demoted %x node %d %s",
2484 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2491 * If DLM_LKB_NODLKWT flag is set and conversion
2492 * deadlock is detected, we request blocking AST and
2493 * down (or cancel) conversion.
2495 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2496 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2497 queue_bast(r, lkb, lkb->lkb_rqmode);
2498 lkb->lkb_highbast = lkb->lkb_rqmode;
2501 log_print("WARN: pending deadlock %x node %d %s",
2502 lkb->lkb_id, lkb->lkb_nodeid,
2509 hi = max_t(int, lkb->lkb_rqmode, hi);
2511 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2517 if (demote_restart && !quit) {
2522 return max_t(int, high, hi);
2525 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2526 unsigned int *count)
2528 struct dlm_lkb *lkb, *s;
2530 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2531 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2532 grant_lock_pending(r, lkb);
2536 high = max_t(int, lkb->lkb_rqmode, high);
2537 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2545 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2546 on either the convert or waiting queue.
2547 high is the largest rqmode of all locks blocked on the convert or
2550 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2552 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2553 if (gr->lkb_highbast < DLM_LOCK_EX)
2558 if (gr->lkb_highbast < high &&
2559 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2564 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2566 struct dlm_lkb *lkb, *s;
2567 int high = DLM_LOCK_IV;
2570 if (!is_master(r)) {
2571 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2576 high = grant_pending_convert(r, high, &cw, count);
2577 high = grant_pending_wait(r, high, &cw, count);
2579 if (high == DLM_LOCK_IV)
2583 * If there are locks left on the wait/convert queue then send blocking
2584 * ASTs to granted locks based on the largest requested mode (high)
2588 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2589 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2590 if (cw && high == DLM_LOCK_PR &&
2591 lkb->lkb_grmode == DLM_LOCK_PR)
2592 queue_bast(r, lkb, DLM_LOCK_CW);
2594 queue_bast(r, lkb, high);
2595 lkb->lkb_highbast = high;
2600 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2602 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2603 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2604 if (gr->lkb_highbast < DLM_LOCK_EX)
2609 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2614 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2615 struct dlm_lkb *lkb)
2619 list_for_each_entry(gr, head, lkb_statequeue) {
2620 /* skip self when sending basts to convertqueue */
2623 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2624 queue_bast(r, gr, lkb->lkb_rqmode);
2625 gr->lkb_highbast = lkb->lkb_rqmode;
2630 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2632 send_bast_queue(r, &r->res_grantqueue, lkb);
2635 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2637 send_bast_queue(r, &r->res_grantqueue, lkb);
2638 send_bast_queue(r, &r->res_convertqueue, lkb);
2641 /* set_master(r, lkb) -- set the master nodeid of a resource
2643 The purpose of this function is to set the nodeid field in the given
2644 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2645 known, it can just be copied to the lkb and the function will return
2646 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2647 before it can be copied to the lkb.
2649 When the rsb nodeid is being looked up remotely, the initial lkb
2650 causing the lookup is kept on the ls_waiters list waiting for the
2651 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2652 on the rsb's res_lookup list until the master is verified.
2655 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2656 1: the rsb master is not available and the lkb has been placed on
2660 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2662 int our_nodeid = dlm_our_nodeid();
2664 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2665 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2666 r->res_first_lkid = lkb->lkb_id;
2667 lkb->lkb_nodeid = r->res_nodeid;
2671 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2672 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2676 if (r->res_master_nodeid == our_nodeid) {
2677 lkb->lkb_nodeid = 0;
2681 if (r->res_master_nodeid) {
2682 lkb->lkb_nodeid = r->res_master_nodeid;
2686 if (dlm_dir_nodeid(r) == our_nodeid) {
2687 /* This is a somewhat unusual case; find_rsb will usually
2688 have set res_master_nodeid when dir nodeid is local, but
2689 there are cases where we become the dir node after we've
2690 past find_rsb and go through _request_lock again.
2691 confirm_master() or process_lookup_list() needs to be
2692 called after this. */
2693 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2694 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2696 r->res_master_nodeid = our_nodeid;
2698 lkb->lkb_nodeid = 0;
2702 r->res_first_lkid = lkb->lkb_id;
2703 send_lookup(r, lkb);
2707 static void process_lookup_list(struct dlm_rsb *r)
2709 struct dlm_lkb *lkb, *safe;
2711 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2712 list_del_init(&lkb->lkb_rsb_lookup);
2713 _request_lock(r, lkb);
2717 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2719 static void confirm_master(struct dlm_rsb *r, int error)
2721 struct dlm_lkb *lkb;
2723 if (!r->res_first_lkid)
2729 r->res_first_lkid = 0;
2730 process_lookup_list(r);
2736 /* the remote request failed and won't be retried (it was
2737 a NOQUEUE, or has been canceled/unlocked); make a waiting
2738 lkb the first_lkid */
2740 r->res_first_lkid = 0;
2742 if (!list_empty(&r->res_lookup)) {
2743 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2745 list_del_init(&lkb->lkb_rsb_lookup);
2746 r->res_first_lkid = lkb->lkb_id;
2747 _request_lock(r, lkb);
2752 log_error(r->res_ls, "confirm_master unknown error %d", error);
2756 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2757 int namelen, void (*ast)(void *astparam),
2759 void (*bast)(void *astparam, int mode),
2760 struct dlm_args *args)
2764 /* check for invalid arg usage */
2766 if (mode < 0 || mode > DLM_LOCK_EX)
2769 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2772 if (flags & DLM_LKF_CANCEL)
2775 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2778 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2781 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2784 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2787 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2790 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2793 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2799 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2802 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2805 /* these args will be copied to the lkb in validate_lock_args,
2806 it cannot be done now because when converting locks, fields in
2807 an active lkb cannot be modified before locking the rsb */
2809 args->flags = flags;
2811 args->astparam = astparam;
2812 args->bastfn = bast;
2820 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2822 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2823 DLM_LKF_FORCEUNLOCK))
2826 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2829 args->flags = flags;
2830 args->astparam = astarg;
2834 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2835 struct dlm_args *args)
2839 if (args->flags & DLM_LKF_CONVERT) {
2840 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2843 /* lock not allowed if there's any op in progress */
2844 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2847 if (is_overlap(lkb))
2851 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2854 if (args->flags & DLM_LKF_QUECVT &&
2855 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2859 lkb->lkb_exflags = args->flags;
2860 dlm_set_sbflags_val(lkb, 0);
2861 lkb->lkb_astfn = args->astfn;
2862 lkb->lkb_astparam = args->astparam;
2863 lkb->lkb_bastfn = args->bastfn;
2864 lkb->lkb_rqmode = args->mode;
2865 lkb->lkb_lksb = args->lksb;
2866 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2867 lkb->lkb_ownpid = (int) current->pid;
2874 /* annoy the user because dlm usage is wrong */
2876 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2877 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878 lkb->lkb_status, lkb->lkb_wait_type,
2879 lkb->lkb_resource->res_name);
2882 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2883 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2884 lkb->lkb_status, lkb->lkb_wait_type,
2885 lkb->lkb_resource->res_name);
2892 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2895 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2896 because there may be a lookup in progress and it's valid to do
2897 cancel/unlockf on it */
2899 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2901 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2904 /* normal unlock not allowed if there's any op in progress */
2905 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2906 (lkb->lkb_wait_type || lkb->lkb_wait_count))
2909 /* an lkb may be waiting for an rsb lookup to complete where the
2910 lookup was initiated by another lock */
2912 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2913 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2914 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2915 list_del_init(&lkb->lkb_rsb_lookup);
2916 queue_cast(lkb->lkb_resource, lkb,
2917 args->flags & DLM_LKF_CANCEL ?
2918 -DLM_ECANCEL : -DLM_EUNLOCK);
2919 unhold_lkb(lkb); /* undoes create_lkb() */
2921 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2926 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2927 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2932 /* an lkb may still exist even though the lock is EOL'ed due to a
2933 * cancel, unlock or failed noqueue request; an app can't use these
2934 * locks; return same error as if the lkid had not been found at all
2937 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2938 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943 /* cancel not allowed with another cancel/unlock in progress */
2945 if (args->flags & DLM_LKF_CANCEL) {
2946 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2949 if (is_overlap(lkb))
2952 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2953 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2958 /* there's nothing to cancel */
2959 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2960 !lkb->lkb_wait_type) {
2965 switch (lkb->lkb_wait_type) {
2966 case DLM_MSG_LOOKUP:
2967 case DLM_MSG_REQUEST:
2968 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2971 case DLM_MSG_UNLOCK:
2972 case DLM_MSG_CANCEL:
2975 /* add_to_waiters() will set OVERLAP_CANCEL */
2979 /* do we need to allow a force-unlock if there's a normal unlock
2980 already in progress? in what conditions could the normal unlock
2981 fail such that we'd want to send a force-unlock to be sure? */
2983 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2984 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2987 if (is_overlap_unlock(lkb))
2990 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2991 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2996 switch (lkb->lkb_wait_type) {
2997 case DLM_MSG_LOOKUP:
2998 case DLM_MSG_REQUEST:
2999 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
3002 case DLM_MSG_UNLOCK:
3005 /* add_to_waiters() will set OVERLAP_UNLOCK */
3009 /* an overlapping op shouldn't blow away exflags from other op */
3010 lkb->lkb_exflags |= args->flags;
3011 dlm_set_sbflags_val(lkb, 0);
3012 lkb->lkb_astparam = args->astparam;
3019 /* annoy the user because dlm usage is wrong */
3021 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3022 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3023 args->flags, lkb->lkb_wait_type,
3024 lkb->lkb_resource->res_name);
3027 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3028 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3029 args->flags, lkb->lkb_wait_type,
3030 lkb->lkb_resource->res_name);
3038 * Four stage 4 varieties:
3039 * do_request(), do_convert(), do_unlock(), do_cancel()
3040 * These are called on the master node for the given lock and
3041 * from the central locking logic.
3044 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3050 queue_cast(r, lkb, 0);
3054 if (can_be_queued(lkb)) {
3055 error = -EINPROGRESS;
3056 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3061 queue_cast(r, lkb, -EAGAIN);
3066 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3071 if (force_blocking_asts(lkb))
3072 send_blocking_asts_all(r, lkb);
3075 send_blocking_asts(r, lkb);
3080 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3085 /* changing an existing lock may allow others to be granted */
3087 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3089 queue_cast(r, lkb, 0);
3093 /* can_be_granted() detected that this lock would block in a conversion
3094 deadlock, so we leave it on the granted queue and return EDEADLK in
3095 the ast for the convert. */
3097 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3098 /* it's left on the granted queue */
3099 revert_lock(r, lkb);
3100 queue_cast(r, lkb, -EDEADLK);
3105 /* is_demoted() means the can_be_granted() above set the grmode
3106 to NL, and left us on the granted queue. This auto-demotion
3107 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3108 now grantable. We have to try to grant other converting locks
3109 before we try again to grant this one. */
3111 if (is_demoted(lkb)) {
3112 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3113 if (_can_be_granted(r, lkb, 1, 0)) {
3115 queue_cast(r, lkb, 0);
3118 /* else fall through and move to convert queue */
3121 if (can_be_queued(lkb)) {
3122 error = -EINPROGRESS;
3124 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3129 queue_cast(r, lkb, -EAGAIN);
3134 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3139 grant_pending_locks(r, NULL);
3140 /* grant_pending_locks also sends basts */
3143 if (force_blocking_asts(lkb))
3144 send_blocking_asts_all(r, lkb);
3147 send_blocking_asts(r, lkb);
3152 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3154 remove_lock(r, lkb);
3155 queue_cast(r, lkb, -DLM_EUNLOCK);
3156 return -DLM_EUNLOCK;
3159 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3162 grant_pending_locks(r, NULL);
3165 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3167 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3171 error = revert_lock(r, lkb);
3173 queue_cast(r, lkb, -DLM_ECANCEL);
3174 return -DLM_ECANCEL;
3179 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3183 grant_pending_locks(r, NULL);
3187 * Four stage 3 varieties:
3188 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3191 /* add a new lkb to a possibly new rsb, called by requesting process */
3193 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3197 /* set_master: sets lkb nodeid from r */
3199 error = set_master(r, lkb);
3208 /* receive_request() calls do_request() on remote node */
3209 error = send_request(r, lkb);
3211 error = do_request(r, lkb);
3212 /* for remote locks the request_reply is sent
3213 between do_request and do_request_effects */
3214 do_request_effects(r, lkb, error);
3220 /* change some property of an existing lkb, e.g. mode */
3222 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3227 /* receive_convert() calls do_convert() on remote node */
3228 error = send_convert(r, lkb);
3230 error = do_convert(r, lkb);
3231 /* for remote locks the convert_reply is sent
3232 between do_convert and do_convert_effects */
3233 do_convert_effects(r, lkb, error);
3239 /* remove an existing lkb from the granted queue */
3241 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3246 /* receive_unlock() calls do_unlock() on remote node */
3247 error = send_unlock(r, lkb);
3249 error = do_unlock(r, lkb);
3250 /* for remote locks the unlock_reply is sent
3251 between do_unlock and do_unlock_effects */
3252 do_unlock_effects(r, lkb, error);
3258 /* remove an existing lkb from the convert or wait queue */
3260 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3265 /* receive_cancel() calls do_cancel() on remote node */
3266 error = send_cancel(r, lkb);
3268 error = do_cancel(r, lkb);
3269 /* for remote locks the cancel_reply is sent
3270 between do_cancel and do_cancel_effects */
3271 do_cancel_effects(r, lkb, error);
3278 * Four stage 2 varieties:
3279 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3282 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3283 const void *name, int len,
3284 struct dlm_args *args)
3289 error = validate_lock_args(ls, lkb, args);
3293 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3300 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3302 error = _request_lock(r, lkb);
3309 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3310 struct dlm_args *args)
3315 r = lkb->lkb_resource;
3320 error = validate_lock_args(ls, lkb, args);
3324 error = _convert_lock(r, lkb);
3331 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3332 struct dlm_args *args)
3337 r = lkb->lkb_resource;
3342 error = validate_unlock_args(lkb, args);
3346 error = _unlock_lock(r, lkb);
3353 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3354 struct dlm_args *args)
3359 r = lkb->lkb_resource;
3364 error = validate_unlock_args(lkb, args);
3368 error = _cancel_lock(r, lkb);
3376 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3379 int dlm_lock(dlm_lockspace_t *lockspace,
3381 struct dlm_lksb *lksb,
3384 unsigned int namelen,
3385 uint32_t parent_lkid,
3386 void (*ast) (void *astarg),
3388 void (*bast) (void *astarg, int mode))
3391 struct dlm_lkb *lkb;
3392 struct dlm_args args;
3393 int error, convert = flags & DLM_LKF_CONVERT;
3395 ls = dlm_find_lockspace_local(lockspace);
3399 dlm_lock_recovery(ls);
3402 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3404 error = create_lkb(ls, &lkb);
3409 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3411 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3417 error = convert_lock(ls, lkb, &args);
3419 error = request_lock(ls, lkb, name, namelen, &args);
3421 if (error == -EINPROGRESS)
3424 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3426 if (convert || error)
3428 if (error == -EAGAIN || error == -EDEADLK)
3431 dlm_unlock_recovery(ls);
3432 dlm_put_lockspace(ls);
3436 int dlm_unlock(dlm_lockspace_t *lockspace,
3439 struct dlm_lksb *lksb,
3443 struct dlm_lkb *lkb;
3444 struct dlm_args args;
3447 ls = dlm_find_lockspace_local(lockspace);
3451 dlm_lock_recovery(ls);
3453 error = find_lkb(ls, lkid, &lkb);
3457 trace_dlm_unlock_start(ls, lkb, flags);
3459 error = set_unlock_args(flags, astarg, &args);
3463 if (flags & DLM_LKF_CANCEL)
3464 error = cancel_lock(ls, lkb, &args);
3466 error = unlock_lock(ls, lkb, &args);
3468 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3470 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3473 trace_dlm_unlock_end(ls, lkb, flags, error);
3477 dlm_unlock_recovery(ls);
3478 dlm_put_lockspace(ls);
3483 * send/receive routines for remote operations and replies
3487 * send_request receive_request
3488 * send_convert receive_convert
3489 * send_unlock receive_unlock
3490 * send_cancel receive_cancel
3491 * send_grant receive_grant
3492 * send_bast receive_bast
3493 * send_lookup receive_lookup
3494 * send_remove receive_remove
3497 * receive_request_reply send_request_reply
3498 * receive_convert_reply send_convert_reply
3499 * receive_unlock_reply send_unlock_reply
3500 * receive_cancel_reply send_cancel_reply
3501 * receive_lookup_reply send_lookup_reply
3504 static int _create_message(struct dlm_ls *ls, int mb_len,
3505 int to_nodeid, int mstype,
3506 struct dlm_message **ms_ret,
3507 struct dlm_mhandle **mh_ret)
3509 struct dlm_message *ms;
3510 struct dlm_mhandle *mh;
3513 /* get_buffer gives us a message handle (mh) that we need to
3514 pass into midcomms_commit and a message buffer (mb) that we
3515 write our data into */
3517 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3521 ms = (struct dlm_message *) mb;
3523 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3524 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3525 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3526 ms->m_header.h_length = cpu_to_le16(mb_len);
3527 ms->m_header.h_cmd = DLM_MSG;
3529 ms->m_type = cpu_to_le32(mstype);
3536 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3537 int to_nodeid, int mstype,
3538 struct dlm_message **ms_ret,
3539 struct dlm_mhandle **mh_ret)
3541 int mb_len = sizeof(struct dlm_message);
3544 case DLM_MSG_REQUEST:
3545 case DLM_MSG_LOOKUP:
3546 case DLM_MSG_REMOVE:
3547 mb_len += r->res_length;
3549 case DLM_MSG_CONVERT:
3550 case DLM_MSG_UNLOCK:
3551 case DLM_MSG_REQUEST_REPLY:
3552 case DLM_MSG_CONVERT_REPLY:
3554 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3555 mb_len += r->res_ls->ls_lvblen;
3559 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3563 /* further lowcomms enhancements or alternate implementations may make
3564 the return value from this function useful at some point */
3566 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3567 const void *name, int namelen)
3569 dlm_midcomms_commit_mhandle(mh, name, namelen);
3573 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3574 struct dlm_message *ms)
3576 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3577 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3578 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3579 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3580 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3581 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3582 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3583 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3584 ms->m_status = cpu_to_le32(lkb->lkb_status);
3585 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3586 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3587 ms->m_hash = cpu_to_le32(r->res_hash);
3589 /* m_result and m_bastmode are set from function args,
3590 not from lkb fields */
3592 if (lkb->lkb_bastfn)
3593 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3595 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3597 /* compare with switch in create_message; send_remove() doesn't
3600 switch (ms->m_type) {
3601 case cpu_to_le32(DLM_MSG_REQUEST):
3602 case cpu_to_le32(DLM_MSG_LOOKUP):
3603 memcpy(ms->m_extra, r->res_name, r->res_length);
3605 case cpu_to_le32(DLM_MSG_CONVERT):
3606 case cpu_to_le32(DLM_MSG_UNLOCK):
3607 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3608 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3609 case cpu_to_le32(DLM_MSG_GRANT):
3610 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3612 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3617 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3619 struct dlm_message *ms;
3620 struct dlm_mhandle *mh;
3621 int to_nodeid, error;
3623 to_nodeid = r->res_nodeid;
3625 error = add_to_waiters(lkb, mstype, to_nodeid);
3629 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3633 send_args(r, lkb, ms);
3635 error = send_message(mh, ms, r->res_name, r->res_length);
3641 remove_from_waiters(lkb, msg_reply_type(mstype));
3645 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3647 return send_common(r, lkb, DLM_MSG_REQUEST);
3650 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3654 error = send_common(r, lkb, DLM_MSG_CONVERT);
3656 /* down conversions go without a reply from the master */
3657 if (!error && down_conversion(lkb)) {
3658 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3659 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3660 r->res_ls->ls_local_ms.m_result = 0;
3661 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3667 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3668 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3669 that the master is still correct. */
3671 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3673 return send_common(r, lkb, DLM_MSG_UNLOCK);
3676 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3678 return send_common(r, lkb, DLM_MSG_CANCEL);
3681 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3683 struct dlm_message *ms;
3684 struct dlm_mhandle *mh;
3685 int to_nodeid, error;
3687 to_nodeid = lkb->lkb_nodeid;
3689 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3693 send_args(r, lkb, ms);
3697 error = send_message(mh, ms, r->res_name, r->res_length);
3702 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3704 struct dlm_message *ms;
3705 struct dlm_mhandle *mh;
3706 int to_nodeid, error;
3708 to_nodeid = lkb->lkb_nodeid;
3710 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3714 send_args(r, lkb, ms);
3716 ms->m_bastmode = cpu_to_le32(mode);
3718 error = send_message(mh, ms, r->res_name, r->res_length);
3723 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3725 struct dlm_message *ms;
3726 struct dlm_mhandle *mh;
3727 int to_nodeid, error;
3729 to_nodeid = dlm_dir_nodeid(r);
3731 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3735 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3739 send_args(r, lkb, ms);
3741 error = send_message(mh, ms, r->res_name, r->res_length);
3747 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3751 static int send_remove(struct dlm_rsb *r)
3753 struct dlm_message *ms;
3754 struct dlm_mhandle *mh;
3755 int to_nodeid, error;
3757 to_nodeid = dlm_dir_nodeid(r);
3759 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3763 memcpy(ms->m_extra, r->res_name, r->res_length);
3764 ms->m_hash = cpu_to_le32(r->res_hash);
3766 error = send_message(mh, ms, r->res_name, r->res_length);
3771 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3774 struct dlm_message *ms;
3775 struct dlm_mhandle *mh;
3776 int to_nodeid, error;
3778 to_nodeid = lkb->lkb_nodeid;
3780 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3784 send_args(r, lkb, ms);
3786 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3788 error = send_message(mh, ms, r->res_name, r->res_length);
3793 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3795 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3798 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3800 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3803 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3805 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3808 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3810 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3813 static int send_lookup_reply(struct dlm_ls *ls,
3814 const struct dlm_message *ms_in, int ret_nodeid,
3817 struct dlm_rsb *r = &ls->ls_local_rsb;
3818 struct dlm_message *ms;
3819 struct dlm_mhandle *mh;
3820 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3822 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3826 ms->m_lkid = ms_in->m_lkid;
3827 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3828 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3830 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3835 /* which args we save from a received message depends heavily on the type
3836 of message, unlike the send side where we can safely send everything about
3837 the lkb for any type of message */
3839 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3841 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3842 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3843 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3846 static void receive_flags_reply(struct dlm_lkb *lkb,
3847 const struct dlm_message *ms,
3853 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3854 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3857 static int receive_extralen(const struct dlm_message *ms)
3859 return (le16_to_cpu(ms->m_header.h_length) -
3860 sizeof(struct dlm_message));
3863 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3864 const struct dlm_message *ms)
3868 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3869 if (!lkb->lkb_lvbptr)
3870 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3871 if (!lkb->lkb_lvbptr)
3873 len = receive_extralen(ms);
3874 if (len > ls->ls_lvblen)
3875 len = ls->ls_lvblen;
3876 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3881 static void fake_bastfn(void *astparam, int mode)
3883 log_print("fake_bastfn should not be called");
3886 static void fake_astfn(void *astparam)
3888 log_print("fake_astfn should not be called");
3891 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3892 const struct dlm_message *ms)
3894 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3895 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3896 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3897 lkb->lkb_grmode = DLM_LOCK_IV;
3898 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3900 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3901 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3903 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3904 /* lkb was just created so there won't be an lvb yet */
3905 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3906 if (!lkb->lkb_lvbptr)
3913 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3914 const struct dlm_message *ms)
3916 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3919 if (receive_lvb(ls, lkb, ms))
3922 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3923 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3928 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3929 const struct dlm_message *ms)
3931 if (receive_lvb(ls, lkb, ms))
3936 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3937 uses to send a reply and that the remote end uses to process the reply. */
3939 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3941 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3942 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3943 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3946 /* This is called after the rsb is locked so that we can safely inspect
3947 fields in the lkb. */
3949 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3951 int from = le32_to_cpu(ms->m_header.h_nodeid);
3954 /* currently mixing of user/kernel locks are not supported */
3955 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3956 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3957 log_error(lkb->lkb_resource->res_ls,
3958 "got user dlm message for a kernel lock");
3963 switch (ms->m_type) {
3964 case cpu_to_le32(DLM_MSG_CONVERT):
3965 case cpu_to_le32(DLM_MSG_UNLOCK):
3966 case cpu_to_le32(DLM_MSG_CANCEL):
3967 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3971 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3972 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3973 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3974 case cpu_to_le32(DLM_MSG_GRANT):
3975 case cpu_to_le32(DLM_MSG_BAST):
3976 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3980 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3981 if (!is_process_copy(lkb))
3983 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3993 log_error(lkb->lkb_resource->res_ls,
3994 "ignore invalid message %d from %d %x %x %x %d",
3995 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3996 lkb->lkb_remid, dlm_iflags_val(lkb),
4001 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
4003 struct dlm_lkb *lkb;
4006 int error, namelen = 0;
4008 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4010 error = create_lkb(ls, &lkb);
4014 receive_flags(lkb, ms);
4015 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4016 error = receive_request_args(ls, lkb, ms);
4022 /* The dir node is the authority on whether we are the master
4023 for this rsb or not, so if the master sends us a request, we should
4024 recreate the rsb if we've destroyed it. This race happens when we
4025 send a remove message to the dir node at the same time that the dir
4026 node sends us a request for the rsb. */
4028 namelen = receive_extralen(ms);
4030 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4031 R_RECEIVE_REQUEST, &r);
4039 if (r->res_master_nodeid != dlm_our_nodeid()) {
4040 error = validate_master_nodeid(ls, r, from_nodeid);
4050 error = do_request(r, lkb);
4051 send_request_reply(r, lkb, error);
4052 do_request_effects(r, lkb, error);
4057 if (error == -EINPROGRESS)
4064 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4065 and do this receive_request again from process_lookup_list once
4066 we get the lookup reply. This would avoid a many repeated
4067 ENOTBLK request failures when the lookup reply designating us
4068 as master is delayed. */
4070 if (error != -ENOTBLK) {
4071 log_limit(ls, "receive_request %x from %d %d",
4072 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4075 setup_local_lkb(ls, ms);
4076 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4080 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4082 struct dlm_lkb *lkb;
4084 int error, reply = 1;
4086 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4090 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4091 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4092 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4093 (unsigned long long)lkb->lkb_recover_seq,
4094 le32_to_cpu(ms->m_header.h_nodeid),
4095 le32_to_cpu(ms->m_lkid));
4101 r = lkb->lkb_resource;
4106 error = validate_message(lkb, ms);
4110 receive_flags(lkb, ms);
4112 error = receive_convert_args(ls, lkb, ms);
4114 send_convert_reply(r, lkb, error);
4118 reply = !down_conversion(lkb);
4120 error = do_convert(r, lkb);
4122 send_convert_reply(r, lkb, error);
4123 do_convert_effects(r, lkb, error);
4131 setup_local_lkb(ls, ms);
4132 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4136 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4138 struct dlm_lkb *lkb;
4142 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4146 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4147 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4148 lkb->lkb_id, lkb->lkb_remid,
4149 le32_to_cpu(ms->m_header.h_nodeid),
4150 le32_to_cpu(ms->m_lkid));
4156 r = lkb->lkb_resource;
4161 error = validate_message(lkb, ms);
4165 receive_flags(lkb, ms);
4167 error = receive_unlock_args(ls, lkb, ms);
4169 send_unlock_reply(r, lkb, error);
4173 error = do_unlock(r, lkb);
4174 send_unlock_reply(r, lkb, error);
4175 do_unlock_effects(r, lkb, error);
4183 setup_local_lkb(ls, ms);
4184 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4188 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4190 struct dlm_lkb *lkb;
4194 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4198 receive_flags(lkb, ms);
4200 r = lkb->lkb_resource;
4205 error = validate_message(lkb, ms);
4209 error = do_cancel(r, lkb);
4210 send_cancel_reply(r, lkb, error);
4211 do_cancel_effects(r, lkb, error);
4219 setup_local_lkb(ls, ms);
4220 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4224 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4226 struct dlm_lkb *lkb;
4230 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4234 r = lkb->lkb_resource;
4239 error = validate_message(lkb, ms);
4243 receive_flags_reply(lkb, ms, false);
4244 if (is_altmode(lkb))
4245 munge_altmode(lkb, ms);
4246 grant_lock_pc(r, lkb, ms);
4247 queue_cast(r, lkb, 0);
4255 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4257 struct dlm_lkb *lkb;
4261 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4265 r = lkb->lkb_resource;
4270 error = validate_message(lkb, ms);
4274 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4275 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4283 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4285 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4287 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4288 our_nodeid = dlm_our_nodeid();
4290 len = receive_extralen(ms);
4292 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4295 /* Optimization: we're master so treat lookup as a request */
4296 if (!error && ret_nodeid == our_nodeid) {
4297 receive_request(ls, ms);
4300 send_lookup_reply(ls, ms, ret_nodeid, error);
4303 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4305 char name[DLM_RESNAME_MAXLEN+1];
4307 int rv, len, dir_nodeid, from_nodeid;
4309 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4311 len = receive_extralen(ms);
4313 if (len > DLM_RESNAME_MAXLEN) {
4314 log_error(ls, "receive_remove from %d bad len %d",
4319 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4320 if (dir_nodeid != dlm_our_nodeid()) {
4321 log_error(ls, "receive_remove from %d bad nodeid %d",
4322 from_nodeid, dir_nodeid);
4326 /* Look for name in rsb toss state, if it's there, kill it.
4327 * If it's in non toss state, it's being used, and we should ignore this
4328 * message. This is an expected race between the dir node sending a
4329 * request to the master node at the same time as the master node sends
4330 * a remove to the dir node. The resolution to that race is for the
4331 * dir node to ignore the remove message, and the master node to
4332 * recreate the master rsb when it gets a request from the dir node for
4333 * an rsb it doesn't have.
4336 memset(name, 0, sizeof(name));
4337 memcpy(name, ms->m_extra, len);
4339 write_lock_bh(&ls->ls_rsbtbl_lock);
4341 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4343 /* should not happen */
4344 log_error(ls, "%s from %d not found %s", __func__,
4346 write_unlock_bh(&ls->ls_rsbtbl_lock);
4350 if (!rsb_flag(r, RSB_TOSS)) {
4351 if (r->res_master_nodeid != from_nodeid) {
4352 /* should not happen */
4353 log_error(ls, "receive_remove keep from %d master %d",
4354 from_nodeid, r->res_master_nodeid);
4356 write_unlock_bh(&ls->ls_rsbtbl_lock);
4360 log_debug(ls, "receive_remove from %d master %d first %x %s",
4361 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4363 write_unlock_bh(&ls->ls_rsbtbl_lock);
4367 if (r->res_master_nodeid != from_nodeid) {
4368 log_error(ls, "receive_remove toss from %d master %d",
4369 from_nodeid, r->res_master_nodeid);
4371 write_unlock_bh(&ls->ls_rsbtbl_lock);
4375 list_del(&r->res_rsbs_list);
4376 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4377 dlm_rhash_rsb_params);
4378 write_unlock_bh(&ls->ls_rsbtbl_lock);
4383 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4385 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4388 static int receive_request_reply(struct dlm_ls *ls,
4389 const struct dlm_message *ms)
4391 struct dlm_lkb *lkb;
4393 int error, mstype, result;
4394 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4396 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 r = lkb->lkb_resource;
4404 error = validate_message(lkb, ms);
4408 mstype = lkb->lkb_wait_type;
4409 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4411 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4412 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4413 from_dlm_errno(le32_to_cpu(ms->m_result)));
4418 /* Optimization: the dir node was also the master, so it took our
4419 lookup as a request and sent request reply instead of lookup reply */
4420 if (mstype == DLM_MSG_LOOKUP) {
4421 r->res_master_nodeid = from_nodeid;
4422 r->res_nodeid = from_nodeid;
4423 lkb->lkb_nodeid = from_nodeid;
4426 /* this is the value returned from do_request() on the master */
4427 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4431 /* request would block (be queued) on remote master */
4432 queue_cast(r, lkb, -EAGAIN);
4433 confirm_master(r, -EAGAIN);
4434 unhold_lkb(lkb); /* undoes create_lkb() */
4439 /* request was queued or granted on remote master */
4440 receive_flags_reply(lkb, ms, false);
4441 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4442 if (is_altmode(lkb))
4443 munge_altmode(lkb, ms);
4445 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4447 grant_lock_pc(r, lkb, ms);
4448 queue_cast(r, lkb, 0);
4450 confirm_master(r, result);
4455 /* find_rsb failed to find rsb or rsb wasn't master */
4456 log_limit(ls, "receive_request_reply %x from %d %d "
4457 "master %d dir %d first %x %s", lkb->lkb_id,
4458 from_nodeid, result, r->res_master_nodeid,
4459 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4461 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4462 r->res_master_nodeid != dlm_our_nodeid()) {
4463 /* cause _request_lock->set_master->send_lookup */
4464 r->res_master_nodeid = 0;
4466 lkb->lkb_nodeid = -1;
4469 if (is_overlap(lkb)) {
4470 /* we'll ignore error in cancel/unlock reply */
4471 queue_cast_overlap(r, lkb);
4472 confirm_master(r, result);
4473 unhold_lkb(lkb); /* undoes create_lkb() */
4475 _request_lock(r, lkb);
4477 if (r->res_master_nodeid == dlm_our_nodeid())
4478 confirm_master(r, 0);
4483 log_error(ls, "receive_request_reply %x error %d",
4484 lkb->lkb_id, result);
4487 if ((result == 0 || result == -EINPROGRESS) &&
4488 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4489 log_debug(ls, "receive_request_reply %x result %d unlock",
4490 lkb->lkb_id, result);
4491 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4492 send_unlock(r, lkb);
4493 } else if ((result == -EINPROGRESS) &&
4494 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4495 &lkb->lkb_iflags)) {
4496 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4497 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4498 send_cancel(r, lkb);
4500 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4501 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4510 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4511 const struct dlm_message *ms, bool local)
4513 /* this is the value returned from do_convert() on the master */
4514 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4516 /* convert would block (be queued) on remote master */
4517 queue_cast(r, lkb, -EAGAIN);
4521 receive_flags_reply(lkb, ms, local);
4522 revert_lock_pc(r, lkb);
4523 queue_cast(r, lkb, -EDEADLK);
4527 /* convert was queued on remote master */
4528 receive_flags_reply(lkb, ms, local);
4529 if (is_demoted(lkb))
4532 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4536 /* convert was granted on remote master */
4537 receive_flags_reply(lkb, ms, local);
4538 if (is_demoted(lkb))
4540 grant_lock_pc(r, lkb, ms);
4541 queue_cast(r, lkb, 0);
4545 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4546 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4547 le32_to_cpu(ms->m_lkid),
4548 from_dlm_errno(le32_to_cpu(ms->m_result)));
4554 static void _receive_convert_reply(struct dlm_lkb *lkb,
4555 const struct dlm_message *ms, bool local)
4557 struct dlm_rsb *r = lkb->lkb_resource;
4563 error = validate_message(lkb, ms);
4567 error = remove_from_waiters_ms(lkb, ms, local);
4571 __receive_convert_reply(r, lkb, ms, local);
4577 static int receive_convert_reply(struct dlm_ls *ls,
4578 const struct dlm_message *ms)
4580 struct dlm_lkb *lkb;
4583 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4587 _receive_convert_reply(lkb, ms, false);
4592 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4593 const struct dlm_message *ms, bool local)
4595 struct dlm_rsb *r = lkb->lkb_resource;
4601 error = validate_message(lkb, ms);
4605 error = remove_from_waiters_ms(lkb, ms, local);
4609 /* this is the value returned from do_unlock() on the master */
4611 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4613 receive_flags_reply(lkb, ms, local);
4614 remove_lock_pc(r, lkb);
4615 queue_cast(r, lkb, -DLM_EUNLOCK);
4620 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4621 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4628 static int receive_unlock_reply(struct dlm_ls *ls,
4629 const struct dlm_message *ms)
4631 struct dlm_lkb *lkb;
4634 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4638 _receive_unlock_reply(lkb, ms, false);
4643 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4644 const struct dlm_message *ms, bool local)
4646 struct dlm_rsb *r = lkb->lkb_resource;
4652 error = validate_message(lkb, ms);
4656 error = remove_from_waiters_ms(lkb, ms, local);
4660 /* this is the value returned from do_cancel() on the master */
4662 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4664 receive_flags_reply(lkb, ms, local);
4665 revert_lock_pc(r, lkb);
4666 queue_cast(r, lkb, -DLM_ECANCEL);
4671 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4673 from_dlm_errno(le32_to_cpu(ms->m_result)));
4680 static int receive_cancel_reply(struct dlm_ls *ls,
4681 const struct dlm_message *ms)
4683 struct dlm_lkb *lkb;
4686 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4690 _receive_cancel_reply(lkb, ms, false);
4695 static void receive_lookup_reply(struct dlm_ls *ls,
4696 const struct dlm_message *ms)
4698 struct dlm_lkb *lkb;
4700 int error, ret_nodeid;
4701 int do_lookup_list = 0;
4703 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4705 log_error(ls, "%s no lkid %x", __func__,
4706 le32_to_cpu(ms->m_lkid));
4710 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4711 FIXME: will a non-zero error ever be returned? */
4713 r = lkb->lkb_resource;
4717 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4721 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4723 /* We sometimes receive a request from the dir node for this
4724 rsb before we've received the dir node's loookup_reply for it.
4725 The request from the dir node implies we're the master, so we set
4726 ourself as master in receive_request_reply, and verify here that
4727 we are indeed the master. */
4729 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4730 /* This should never happen */
4731 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4732 "master %d dir %d our %d first %x %s",
4733 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4734 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4735 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4738 if (ret_nodeid == dlm_our_nodeid()) {
4739 r->res_master_nodeid = ret_nodeid;
4742 r->res_first_lkid = 0;
4743 } else if (ret_nodeid == -1) {
4744 /* the remote node doesn't believe it's the dir node */
4745 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4746 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4747 r->res_master_nodeid = 0;
4749 lkb->lkb_nodeid = -1;
4751 /* set_master() will set lkb_nodeid from r */
4752 r->res_master_nodeid = ret_nodeid;
4753 r->res_nodeid = ret_nodeid;
4756 if (is_overlap(lkb)) {
4757 log_debug(ls, "receive_lookup_reply %x unlock %x",
4758 lkb->lkb_id, dlm_iflags_val(lkb));
4759 queue_cast_overlap(r, lkb);
4760 unhold_lkb(lkb); /* undoes create_lkb() */
4764 _request_lock(r, lkb);
4768 process_lookup_list(r);
4775 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4778 int error = 0, noent = 0;
4780 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4781 log_limit(ls, "receive %d from non-member %d %x %x %d",
4782 le32_to_cpu(ms->m_type),
4783 le32_to_cpu(ms->m_header.h_nodeid),
4784 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4785 from_dlm_errno(le32_to_cpu(ms->m_result)));
4789 switch (ms->m_type) {
4791 /* messages sent to a master node */
4793 case cpu_to_le32(DLM_MSG_REQUEST):
4794 error = receive_request(ls, ms);
4797 case cpu_to_le32(DLM_MSG_CONVERT):
4798 error = receive_convert(ls, ms);
4801 case cpu_to_le32(DLM_MSG_UNLOCK):
4802 error = receive_unlock(ls, ms);
4805 case cpu_to_le32(DLM_MSG_CANCEL):
4807 error = receive_cancel(ls, ms);
4810 /* messages sent from a master node (replies to above) */
4812 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4813 error = receive_request_reply(ls, ms);
4816 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4817 error = receive_convert_reply(ls, ms);
4820 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4821 error = receive_unlock_reply(ls, ms);
4824 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4825 error = receive_cancel_reply(ls, ms);
4828 /* messages sent from a master node (only two types of async msg) */
4830 case cpu_to_le32(DLM_MSG_GRANT):
4832 error = receive_grant(ls, ms);
4835 case cpu_to_le32(DLM_MSG_BAST):
4837 error = receive_bast(ls, ms);
4840 /* messages sent to a dir node */
4842 case cpu_to_le32(DLM_MSG_LOOKUP):
4843 receive_lookup(ls, ms);
4846 case cpu_to_le32(DLM_MSG_REMOVE):
4847 receive_remove(ls, ms);
4850 /* messages sent from a dir node (remove has no reply) */
4852 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4853 receive_lookup_reply(ls, ms);
4856 /* other messages */
4858 case cpu_to_le32(DLM_MSG_PURGE):
4859 receive_purge(ls, ms);
4863 log_error(ls, "unknown message type %d",
4864 le32_to_cpu(ms->m_type));
4868 * When checking for ENOENT, we're checking the result of
4869 * find_lkb(m_remid):
4871 * The lock id referenced in the message wasn't found. This may
4872 * happen in normal usage for the async messages and cancel, so
4873 * only use log_debug for them.
4875 * Some errors are expected and normal.
4878 if (error == -ENOENT && noent) {
4879 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4880 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4881 le32_to_cpu(ms->m_header.h_nodeid),
4882 le32_to_cpu(ms->m_lkid), saved_seq);
4883 } else if (error == -ENOENT) {
4884 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4885 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4886 le32_to_cpu(ms->m_header.h_nodeid),
4887 le32_to_cpu(ms->m_lkid), saved_seq);
4889 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4890 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4893 if (error == -EINVAL) {
4894 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4896 le32_to_cpu(ms->m_type),
4897 le32_to_cpu(ms->m_header.h_nodeid),
4898 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4903 /* If the lockspace is in recovery mode (locking stopped), then normal
4904 messages are saved on the requestqueue for processing after recovery is
4905 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4906 messages off the requestqueue before we process new ones. This occurs right
4907 after recovery completes when we transition from saving all messages on
4908 requestqueue, to processing all the saved messages, to processing new
4909 messages as they arrive. */
4911 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4915 read_lock_bh(&ls->ls_requestqueue_lock);
4916 if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917 /* If we were a member of this lockspace, left, and rejoined,
4918 other nodes may still be sending us messages from the
4919 lockspace generation before we left. */
4920 if (WARN_ON_ONCE(!ls->ls_generation)) {
4921 read_unlock_bh(&ls->ls_requestqueue_lock);
4922 log_limit(ls, "receive %d from %d ignore old gen",
4923 le32_to_cpu(ms->m_type), nodeid);
4927 read_unlock_bh(&ls->ls_requestqueue_lock);
4928 write_lock_bh(&ls->ls_requestqueue_lock);
4929 /* recheck because we hold writelock now */
4930 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4931 write_unlock_bh(&ls->ls_requestqueue_lock);
4935 dlm_add_requestqueue(ls, nodeid, ms);
4936 write_unlock_bh(&ls->ls_requestqueue_lock);
4938 _receive_message(ls, ms, 0);
4939 read_unlock_bh(&ls->ls_requestqueue_lock);
4943 /* This is called by dlm_recoverd to process messages that were saved on
4944 the requestqueue. */
4946 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4949 _receive_message(ls, ms, saved_seq);
4952 /* This is called by the midcomms layer when something is received for
4953 the lockspace. It could be either a MSG (normal message sent as part of
4954 standard locking activity) or an RCOM (recovery message sent as part of
4955 lockspace recovery). */
4957 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4959 const struct dlm_header *hd = &p->header;
4963 switch (hd->h_cmd) {
4965 type = le32_to_cpu(p->message.m_type);
4968 type = le32_to_cpu(p->rcom.rc_type);
4971 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4975 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4976 log_print("invalid h_nodeid %d from %d lockspace %x",
4977 le32_to_cpu(hd->h_nodeid), nodeid,
4978 le32_to_cpu(hd->u.h_lockspace));
4982 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4984 if (dlm_config.ci_log_debug) {
4985 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4986 "%u from %d cmd %d type %d\n",
4987 le32_to_cpu(hd->u.h_lockspace), nodeid,
4991 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4992 dlm_send_ls_not_ready(nodeid, &p->rcom);
4996 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4997 be inactive (in this ls) before transitioning to recovery mode */
4999 read_lock_bh(&ls->ls_recv_active);
5000 if (hd->h_cmd == DLM_MSG)
5001 dlm_receive_message(ls, &p->message, nodeid);
5002 else if (hd->h_cmd == DLM_RCOM)
5003 dlm_receive_rcom(ls, &p->rcom, nodeid);
5005 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5006 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5007 read_unlock_bh(&ls->ls_recv_active);
5009 dlm_put_lockspace(ls);
5012 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5013 struct dlm_message *ms_local)
5015 if (middle_conversion(lkb)) {
5017 memset(ms_local, 0, sizeof(struct dlm_message));
5018 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5019 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5020 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5021 _receive_convert_reply(lkb, ms_local, true);
5023 /* Same special case as in receive_rcom_lock_args() */
5024 lkb->lkb_grmode = DLM_LOCK_IV;
5025 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5028 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5029 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5032 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5033 conversions are async; there's no reply from the remote master */
5036 /* A waiting lkb needs recovery if the master node has failed, or
5037 the master node is changing (only when no directory is used) */
5039 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5042 if (dlm_no_directory(ls))
5045 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5051 /* Recovery for locks that are waiting for replies from nodes that are now
5052 gone. We can just complete unlocks and cancels by faking a reply from the
5053 dead node. Requests and up-conversions we flag to be resent after
5054 recovery. Down-conversions can just be completed with a fake reply like
5055 unlocks. Conversions between PR and CW need special attention. */
5057 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5059 struct dlm_lkb *lkb, *safe;
5060 struct dlm_message *ms_local;
5061 int wait_type, local_unlock_result, local_cancel_result;
5064 ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5068 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5070 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5072 /* exclude debug messages about unlocks because there can be so
5073 many and they aren't very interesting */
5075 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5076 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5077 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5081 lkb->lkb_resource->res_nodeid,
5083 lkb->lkb_wait_nodeid,
5087 /* all outstanding lookups, regardless of destination will be
5088 resent after recovery is done */
5090 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5091 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5095 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5098 wait_type = lkb->lkb_wait_type;
5099 local_unlock_result = -DLM_EUNLOCK;
5100 local_cancel_result = -DLM_ECANCEL;
5102 /* Main reply may have been received leaving a zero wait_type,
5103 but a reply for the overlapping op may not have been
5104 received. In that case we need to fake the appropriate
5105 reply for the overlap op. */
5108 if (is_overlap_cancel(lkb)) {
5109 wait_type = DLM_MSG_CANCEL;
5110 if (lkb->lkb_grmode == DLM_LOCK_IV)
5111 local_cancel_result = 0;
5113 if (is_overlap_unlock(lkb)) {
5114 wait_type = DLM_MSG_UNLOCK;
5115 if (lkb->lkb_grmode == DLM_LOCK_IV)
5116 local_unlock_result = -ENOENT;
5119 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5120 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5121 local_cancel_result, local_unlock_result);
5124 switch (wait_type) {
5126 case DLM_MSG_REQUEST:
5127 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5130 case DLM_MSG_CONVERT:
5131 recover_convert_waiter(ls, lkb, ms_local);
5134 case DLM_MSG_UNLOCK:
5136 memset(ms_local, 0, sizeof(struct dlm_message));
5137 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5138 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5139 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5140 _receive_unlock_reply(lkb, ms_local, true);
5144 case DLM_MSG_CANCEL:
5146 memset(ms_local, 0, sizeof(struct dlm_message));
5147 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5148 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5149 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5150 _receive_cancel_reply(lkb, ms_local, true);
5155 log_error(ls, "invalid lkb wait_type %d %d",
5156 lkb->lkb_wait_type, wait_type);
5163 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5165 struct dlm_lkb *lkb = NULL, *iter;
5167 spin_lock_bh(&ls->ls_waiters_lock);
5168 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5169 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5175 spin_unlock_bh(&ls->ls_waiters_lock);
5181 * Forced state reset for locks that were in the middle of remote operations
5182 * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5183 * for a reply from a remote operation.) The lkbs remaining on the waiters
5184 * list need to be reevaluated; some may need resending to a different node
5185 * than previously, and some may now need local handling rather than remote.
5187 * First, the lkb state for the voided remote operation is forcibly reset,
5188 * equivalent to what remove_from_waiters() would normally do:
5189 * . lkb removed from ls_waiters list
5190 * . lkb wait_type cleared
5191 * . lkb waiters_count cleared
5192 * . lkb ref count decremented for each waiters_count (almost always 1,
5193 * but possibly 2 in case of cancel/unlock overlapping, which means
5194 * two remote replies were being expected for the lkb.)
5196 * Second, the lkb is reprocessed like an original operation would be,
5197 * by passing it to _request_lock or _convert_lock, which will either
5198 * process the lkb operation locally, or send it to a remote node again
5199 * and put the lkb back onto the waiters list.
5201 * When reprocessing the lkb, we may find that it's flagged for an overlapping
5202 * force-unlock or cancel, either from before recovery began, or after recovery
5203 * finished. If this is the case, the unlock/cancel is done directly, and the
5204 * original operation is not initiated again (no _request_lock/_convert_lock.)
5207 int dlm_recover_waiters_post(struct dlm_ls *ls)
5209 struct dlm_lkb *lkb;
5211 int error = 0, mstype, err, oc, ou;
5214 if (dlm_locking_stopped(ls)) {
5215 log_debug(ls, "recover_waiters_post aborted");
5221 * Find an lkb from the waiters list that's been affected by
5222 * recovery node changes, and needs to be reprocessed. Does
5223 * hold_lkb(), adding a refcount.
5225 lkb = find_resend_waiter(ls);
5229 r = lkb->lkb_resource;
5234 * If the lkb has been flagged for a force unlock or cancel,
5235 * then the reprocessing below will be replaced by just doing
5236 * the unlock/cancel directly.
5238 mstype = lkb->lkb_wait_type;
5239 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5241 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5245 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5246 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5247 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5248 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5249 dlm_dir_nodeid(r), oc, ou);
5252 * No reply to the pre-recovery operation will now be received,
5253 * so a forced equivalent of remove_from_waiters() is needed to
5254 * reset the waiters state that was in place before recovery.
5257 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5259 /* Forcibly clear wait_type */
5260 lkb->lkb_wait_type = 0;
5263 * Forcibly reset wait_count and associated refcount. The
5264 * wait_count will almost always be 1, but in case of an
5265 * overlapping unlock/cancel it could be 2: see where
5266 * add_to_waiters() finds the lkb is already on the waiters
5267 * list and does lkb_wait_count++; hold_lkb().
5269 while (lkb->lkb_wait_count) {
5270 lkb->lkb_wait_count--;
5274 /* Forcibly remove from waiters list */
5275 spin_lock_bh(&ls->ls_waiters_lock);
5276 list_del_init(&lkb->lkb_wait_reply);
5277 spin_unlock_bh(&ls->ls_waiters_lock);
5280 * The lkb is now clear of all prior waiters state and can be
5281 * processed locally, or sent to remote node again, or directly
5282 * cancelled/unlocked.
5286 /* do an unlock or cancel instead of resending */
5288 case DLM_MSG_LOOKUP:
5289 case DLM_MSG_REQUEST:
5290 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5292 unhold_lkb(lkb); /* undoes create_lkb() */
5294 case DLM_MSG_CONVERT:
5296 queue_cast(r, lkb, -DLM_ECANCEL);
5298 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5299 _unlock_lock(r, lkb);
5307 case DLM_MSG_LOOKUP:
5308 case DLM_MSG_REQUEST:
5309 _request_lock(r, lkb);
5311 confirm_master(r, 0);
5313 case DLM_MSG_CONVERT:
5314 _convert_lock(r, lkb);
5322 log_error(ls, "waiter %x msg %d r_nodeid %d "
5323 "dir_nodeid %d overlap %d %d",
5324 lkb->lkb_id, mstype, r->res_nodeid,
5325 dlm_dir_nodeid(r), oc, ou);
5335 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5336 struct list_head *list)
5338 struct dlm_lkb *lkb, *safe;
5340 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5341 if (!is_master_copy(lkb))
5344 /* don't purge lkbs we've added in recover_master_copy for
5345 the current recovery seq */
5347 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5352 /* this put should free the lkb */
5353 if (!dlm_put_lkb(lkb))
5354 log_error(ls, "purged mstcpy lkb not released");
5358 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5360 struct dlm_ls *ls = r->res_ls;
5362 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5363 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5364 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5367 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5368 struct list_head *list,
5369 int nodeid_gone, unsigned int *count)
5371 struct dlm_lkb *lkb, *safe;
5373 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5374 if (!is_master_copy(lkb))
5377 if ((lkb->lkb_nodeid == nodeid_gone) ||
5378 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5380 /* tell recover_lvb to invalidate the lvb
5381 because a node holding EX/PW failed */
5382 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5383 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5384 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5389 /* this put should free the lkb */
5390 if (!dlm_put_lkb(lkb))
5391 log_error(ls, "purged dead lkb not released");
5393 rsb_set_flag(r, RSB_RECOVER_GRANT);
5400 /* Get rid of locks held by nodes that are gone. */
5402 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5405 struct dlm_member *memb;
5406 int nodes_count = 0;
5407 int nodeid_gone = 0;
5408 unsigned int lkb_count = 0;
5410 /* cache one removed nodeid to optimize the common
5411 case of a single node removed */
5413 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5415 nodeid_gone = memb->nodeid;
5421 list_for_each_entry(r, root_list, res_root_list) {
5425 purge_dead_list(ls, r, &r->res_grantqueue,
5426 nodeid_gone, &lkb_count);
5427 purge_dead_list(ls, r, &r->res_convertqueue,
5428 nodeid_gone, &lkb_count);
5429 purge_dead_list(ls, r, &r->res_waitqueue,
5430 nodeid_gone, &lkb_count);
5438 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5439 lkb_count, nodes_count);
5442 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5446 read_lock_bh(&ls->ls_rsbtbl_lock);
5447 list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
5448 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5450 if (!is_master(r)) {
5451 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5455 read_unlock_bh(&ls->ls_rsbtbl_lock);
5458 read_unlock_bh(&ls->ls_rsbtbl_lock);
5463 * Attempt to grant locks on resources that we are the master of.
5464 * Locks may have become grantable during recovery because locks
5465 * from departed nodes have been purged (or not rebuilt), allowing
5466 * previously blocked locks to now be granted. The subset of rsb's
5467 * we are interested in are those with lkb's on either the convert or
5470 * Simplest would be to go through each master rsb and check for non-empty
5471 * convert or waiting queues, and attempt to grant on those rsbs.
5472 * Checking the queues requires lock_rsb, though, for which we'd need
5473 * to release the rsbtbl lock. This would make iterating through all
5474 * rsb's very inefficient. So, we rely on earlier recovery routines
5475 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5479 void dlm_recover_grant(struct dlm_ls *ls)
5482 unsigned int count = 0;
5483 unsigned int rsb_count = 0;
5484 unsigned int lkb_count = 0;
5487 r = find_grant_rsb(ls);
5494 /* the RECOVER_GRANT flag is checked in the grant path */
5495 grant_pending_locks(r, &count);
5496 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5498 confirm_master(r, 0);
5505 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5506 lkb_count, rsb_count);
5509 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5512 struct dlm_lkb *lkb;
5514 list_for_each_entry(lkb, head, lkb_statequeue) {
5515 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5521 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5524 struct dlm_lkb *lkb;
5526 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5529 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5532 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5538 /* needs at least dlm_rcom + rcom_lock */
5539 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5540 struct dlm_rsb *r, const struct dlm_rcom *rc)
5542 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5544 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5545 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5546 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5547 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5548 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5549 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5550 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5551 lkb->lkb_rqmode = rl->rl_rqmode;
5552 lkb->lkb_grmode = rl->rl_grmode;
5553 /* don't set lkb_status because add_lkb wants to itself */
5555 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5556 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5558 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5559 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5560 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5561 if (lvblen > ls->ls_lvblen)
5563 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5564 if (!lkb->lkb_lvbptr)
5566 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5569 /* Conversions between PR and CW (middle modes) need special handling.
5570 The real granted mode of these converting locks cannot be determined
5571 until all locks have been rebuilt on the rsb (recover_conversion) */
5573 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5574 middle_conversion(lkb)) {
5575 rl->rl_status = DLM_LKSTS_CONVERT;
5576 lkb->lkb_grmode = DLM_LOCK_IV;
5577 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5583 /* This lkb may have been recovered in a previous aborted recovery so we need
5584 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5585 If so we just send back a standard reply. If not, we create a new lkb with
5586 the given values and send back our lkid. We send back our lkid by sending
5587 back the rcom_lock struct we got but with the remid field filled in. */
5589 /* needs at least dlm_rcom + rcom_lock */
5590 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5591 __le32 *rl_remid, __le32 *rl_result)
5593 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5595 struct dlm_lkb *lkb;
5597 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5600 /* init rl_remid with rcom lock rl_remid */
5601 *rl_remid = rl->rl_remid;
5603 if (rl->rl_parent_lkid) {
5604 error = -EOPNOTSUPP;
5608 remid = le32_to_cpu(rl->rl_lkid);
5610 /* In general we expect the rsb returned to be R_MASTER, but we don't
5611 have to require it. Recovery of masters on one node can overlap
5612 recovery of locks on another node, so one node can send us MSTCPY
5613 locks before we've made ourselves master of this rsb. We can still
5614 add new MSTCPY locks that we receive here without any harm; when
5615 we make ourselves master, dlm_recover_masters() won't touch the
5616 MSTCPY locks we've received early. */
5618 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5619 from_nodeid, R_RECEIVE_RECOVER, &r);
5625 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5626 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5627 from_nodeid, remid);
5632 lkb = search_remid(r, from_nodeid, remid);
5638 error = create_lkb(ls, &lkb);
5642 error = receive_rcom_lock_args(ls, lkb, r, rc);
5649 add_lkb(r, lkb, rl->rl_status);
5650 ls->ls_recover_locks_in++;
5652 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5653 rsb_set_flag(r, RSB_RECOVER_GRANT);
5656 /* this is the new value returned to the lock holder for
5657 saving in its process-copy lkb */
5658 *rl_remid = cpu_to_le32(lkb->lkb_id);
5660 lkb->lkb_recover_seq = ls->ls_recover_seq;
5666 if (error && error != -EEXIST)
5667 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5668 from_nodeid, remid, error);
5669 *rl_result = cpu_to_le32(error);
5673 /* needs at least dlm_rcom + rcom_lock */
5674 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5677 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5679 struct dlm_lkb *lkb;
5680 uint32_t lkid, remid;
5683 lkid = le32_to_cpu(rl->rl_lkid);
5684 remid = le32_to_cpu(rl->rl_remid);
5685 result = le32_to_cpu(rl->rl_result);
5687 error = find_lkb(ls, lkid, &lkb);
5689 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5690 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5695 r = lkb->lkb_resource;
5699 if (!is_process_copy(lkb)) {
5700 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5701 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5712 /* There's a chance the new master received our lock before
5713 dlm_recover_master_reply(), this wouldn't happen if we did
5714 a barrier between recover_masters and recover_locks. */
5716 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5717 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5720 dlm_send_rcom_lock(r, lkb, seq);
5724 lkb->lkb_remid = remid;
5727 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5728 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5732 /* an ack for dlm_recover_locks() which waits for replies from
5733 all the locks it sends to new masters */
5734 dlm_recovered_lock(r);
5743 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5744 int mode, uint32_t flags, void *name, unsigned int namelen)
5746 struct dlm_lkb *lkb;
5747 struct dlm_args args;
5751 dlm_lock_recovery(ls);
5753 error = create_lkb(ls, &lkb);
5759 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5761 if (flags & DLM_LKF_VALBLK) {
5762 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5763 if (!ua->lksb.sb_lvbptr) {
5769 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5770 fake_bastfn, &args);
5772 kfree(ua->lksb.sb_lvbptr);
5773 ua->lksb.sb_lvbptr = NULL;
5778 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5779 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5780 lock and that lkb_astparam is the dlm_user_args structure. */
5781 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5782 error = request_lock(ls, lkb, name, namelen, &args);
5797 /* add this new lkb to the per-process list of locks */
5798 spin_lock_bh(&ua->proc->locks_spin);
5800 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5801 spin_unlock_bh(&ua->proc->locks_spin);
5804 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5808 dlm_unlock_recovery(ls);
5812 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5813 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5815 struct dlm_lkb *lkb;
5816 struct dlm_args args;
5817 struct dlm_user_args *ua;
5820 dlm_lock_recovery(ls);
5822 error = find_lkb(ls, lkid, &lkb);
5826 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5828 /* user can change the params on its lock when it converts it, or
5829 add an lvb that didn't exist before */
5833 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5834 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5835 if (!ua->lksb.sb_lvbptr) {
5840 if (lvb_in && ua->lksb.sb_lvbptr)
5841 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5843 ua->xid = ua_tmp->xid;
5844 ua->castparam = ua_tmp->castparam;
5845 ua->castaddr = ua_tmp->castaddr;
5846 ua->bastparam = ua_tmp->bastparam;
5847 ua->bastaddr = ua_tmp->bastaddr;
5848 ua->user_lksb = ua_tmp->user_lksb;
5850 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5851 fake_bastfn, &args);
5855 error = convert_lock(ls, lkb, &args);
5857 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5860 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5863 dlm_unlock_recovery(ls);
5869 * The caller asks for an orphan lock on a given resource with a given mode.
5870 * If a matching lock exists, it's moved to the owner's list of locks and
5871 * the lkid is returned.
5874 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5875 int mode, uint32_t flags, void *name, unsigned int namelen,
5878 struct dlm_lkb *lkb = NULL, *iter;
5879 struct dlm_user_args *ua;
5880 int found_other_mode = 0;
5883 spin_lock_bh(&ls->ls_orphans_lock);
5884 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5885 if (iter->lkb_resource->res_length != namelen)
5887 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5889 if (iter->lkb_grmode != mode) {
5890 found_other_mode = 1;
5895 list_del_init(&iter->lkb_ownqueue);
5896 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5897 *lkid = iter->lkb_id;
5900 spin_unlock_bh(&ls->ls_orphans_lock);
5902 if (!lkb && found_other_mode) {
5912 lkb->lkb_exflags = flags;
5913 lkb->lkb_ownpid = (int) current->pid;
5917 ua->proc = ua_tmp->proc;
5918 ua->xid = ua_tmp->xid;
5919 ua->castparam = ua_tmp->castparam;
5920 ua->castaddr = ua_tmp->castaddr;
5921 ua->bastparam = ua_tmp->bastparam;
5922 ua->bastaddr = ua_tmp->bastaddr;
5923 ua->user_lksb = ua_tmp->user_lksb;
5926 * The lkb reference from the ls_orphans list was not
5927 * removed above, and is now considered the reference
5928 * for the proc locks list.
5931 spin_lock_bh(&ua->proc->locks_spin);
5932 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5933 spin_unlock_bh(&ua->proc->locks_spin);
5939 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5940 uint32_t flags, uint32_t lkid, char *lvb_in)
5942 struct dlm_lkb *lkb;
5943 struct dlm_args args;
5944 struct dlm_user_args *ua;
5947 dlm_lock_recovery(ls);
5949 error = find_lkb(ls, lkid, &lkb);
5953 trace_dlm_unlock_start(ls, lkb, flags);
5957 if (lvb_in && ua->lksb.sb_lvbptr)
5958 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5959 if (ua_tmp->castparam)
5960 ua->castparam = ua_tmp->castparam;
5961 ua->user_lksb = ua_tmp->user_lksb;
5963 error = set_unlock_args(flags, ua, &args);
5967 error = unlock_lock(ls, lkb, &args);
5969 if (error == -DLM_EUNLOCK)
5971 /* from validate_unlock_args() */
5972 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5977 spin_lock_bh(&ua->proc->locks_spin);
5978 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5979 if (!list_empty(&lkb->lkb_ownqueue))
5980 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5981 spin_unlock_bh(&ua->proc->locks_spin);
5983 trace_dlm_unlock_end(ls, lkb, flags, error);
5986 dlm_unlock_recovery(ls);
5991 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5992 uint32_t flags, uint32_t lkid)
5994 struct dlm_lkb *lkb;
5995 struct dlm_args args;
5996 struct dlm_user_args *ua;
5999 dlm_lock_recovery(ls);
6001 error = find_lkb(ls, lkid, &lkb);
6005 trace_dlm_unlock_start(ls, lkb, flags);
6008 if (ua_tmp->castparam)
6009 ua->castparam = ua_tmp->castparam;
6010 ua->user_lksb = ua_tmp->user_lksb;
6012 error = set_unlock_args(flags, ua, &args);
6016 error = cancel_lock(ls, lkb, &args);
6018 if (error == -DLM_ECANCEL)
6020 /* from validate_unlock_args() */
6021 if (error == -EBUSY)
6024 trace_dlm_unlock_end(ls, lkb, flags, error);
6027 dlm_unlock_recovery(ls);
6032 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6034 struct dlm_lkb *lkb;
6035 struct dlm_args args;
6036 struct dlm_user_args *ua;
6040 dlm_lock_recovery(ls);
6042 error = find_lkb(ls, lkid, &lkb);
6046 trace_dlm_unlock_start(ls, lkb, flags);
6050 error = set_unlock_args(flags, ua, &args);
6054 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6056 r = lkb->lkb_resource;
6060 error = validate_unlock_args(lkb, &args);
6063 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6065 error = _cancel_lock(r, lkb);
6070 if (error == -DLM_ECANCEL)
6072 /* from validate_unlock_args() */
6073 if (error == -EBUSY)
6076 trace_dlm_unlock_end(ls, lkb, flags, error);
6079 dlm_unlock_recovery(ls);
6083 /* lkb's that are removed from the waiters list by revert are just left on the
6084 orphans list with the granted orphan locks, to be freed by purge */
6086 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6088 struct dlm_args args;
6091 hold_lkb(lkb); /* reference for the ls_orphans list */
6092 spin_lock_bh(&ls->ls_orphans_lock);
6093 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6094 spin_unlock_bh(&ls->ls_orphans_lock);
6096 set_unlock_args(0, lkb->lkb_ua, &args);
6098 error = cancel_lock(ls, lkb, &args);
6099 if (error == -DLM_ECANCEL)
6104 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6105 granted. Regardless of what rsb queue the lock is on, it's removed and
6106 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6107 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6109 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6111 struct dlm_args args;
6114 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6115 lkb->lkb_ua, &args);
6117 error = unlock_lock(ls, lkb, &args);
6118 if (error == -DLM_EUNLOCK)
6123 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6124 (which does lock_rsb) due to deadlock with receiving a message that does
6125 lock_rsb followed by dlm_user_add_cb() */
6127 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6128 struct dlm_user_proc *proc)
6130 struct dlm_lkb *lkb = NULL;
6132 spin_lock_bh(&ls->ls_clear_proc_locks);
6133 if (list_empty(&proc->locks))
6136 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6137 list_del_init(&lkb->lkb_ownqueue);
6139 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6140 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6142 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6144 spin_unlock_bh(&ls->ls_clear_proc_locks);
6148 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6149 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6150 which we clear here. */
6152 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6153 list, and no more device_writes should add lkb's to proc->locks list; so we
6154 shouldn't need to take asts_spin or locks_spin here. this assumes that
6155 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6158 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6160 struct dlm_callback *cb, *cb_safe;
6161 struct dlm_lkb *lkb, *safe;
6163 dlm_lock_recovery(ls);
6166 lkb = del_proc_lock(ls, proc);
6169 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6170 orphan_proc_lock(ls, lkb);
6172 unlock_proc_lock(ls, lkb);
6174 /* this removes the reference for the proc->locks list
6175 added by dlm_user_request, it may result in the lkb
6181 spin_lock_bh(&ls->ls_clear_proc_locks);
6183 /* in-progress unlocks */
6184 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6185 list_del_init(&lkb->lkb_ownqueue);
6186 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6190 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6191 list_del(&cb->list);
6195 spin_unlock_bh(&ls->ls_clear_proc_locks);
6196 dlm_unlock_recovery(ls);
6199 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6201 struct dlm_callback *cb, *cb_safe;
6202 struct dlm_lkb *lkb, *safe;
6206 spin_lock_bh(&proc->locks_spin);
6207 if (!list_empty(&proc->locks)) {
6208 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6210 list_del_init(&lkb->lkb_ownqueue);
6212 spin_unlock_bh(&proc->locks_spin);
6217 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6218 unlock_proc_lock(ls, lkb);
6219 dlm_put_lkb(lkb); /* ref from proc->locks list */
6222 spin_lock_bh(&proc->locks_spin);
6223 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6224 list_del_init(&lkb->lkb_ownqueue);
6225 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6228 spin_unlock_bh(&proc->locks_spin);
6230 spin_lock_bh(&proc->asts_spin);
6231 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6232 list_del(&cb->list);
6235 spin_unlock_bh(&proc->asts_spin);
6238 /* pid of 0 means purge all orphans */
6240 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6242 struct dlm_lkb *lkb, *safe;
6244 spin_lock_bh(&ls->ls_orphans_lock);
6245 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6246 if (pid && lkb->lkb_ownpid != pid)
6248 unlock_proc_lock(ls, lkb);
6249 list_del_init(&lkb->lkb_ownqueue);
6252 spin_unlock_bh(&ls->ls_orphans_lock);
6255 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6257 struct dlm_message *ms;
6258 struct dlm_mhandle *mh;
6261 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6262 DLM_MSG_PURGE, &ms, &mh);
6265 ms->m_nodeid = cpu_to_le32(nodeid);
6266 ms->m_pid = cpu_to_le32(pid);
6268 return send_message(mh, ms, NULL, 0);
6271 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6272 int nodeid, int pid)
6276 if (nodeid && (nodeid != dlm_our_nodeid())) {
6277 error = send_purge(ls, nodeid, pid);
6279 dlm_lock_recovery(ls);
6280 if (pid == current->pid)
6281 purge_proc_locks(ls, proc);
6283 do_purge(ls, nodeid, pid);
6284 dlm_unlock_recovery(ls);
6289 /* debug functionality */
6290 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6291 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6293 struct dlm_lksb *lksb;
6294 struct dlm_lkb *lkb;
6298 /* we currently can't set a valid user lock */
6299 if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6302 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6306 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6312 dlm_set_dflags_val(lkb, lkb_dflags);
6313 lkb->lkb_nodeid = lkb_nodeid;
6314 lkb->lkb_lksb = lksb;
6315 /* user specific pointer, just don't have it NULL for kernel locks */
6316 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6317 lkb->lkb_astparam = (void *)0xDEADBEEF;
6319 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6328 add_lkb(r, lkb, lkb_status);
6335 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6336 int mstype, int to_nodeid)
6338 struct dlm_lkb *lkb;
6341 error = find_lkb(ls, lkb_id, &lkb);
6345 error = add_to_waiters(lkb, mstype, to_nodeid);