]> Git Repo - linux.git/blame - fs/dlm/lock.c
fs: dlm: allow lockspaces have zero lvblen
[linux.git] / fs / dlm / lock.c
CommitLineData
2522fe45 1// SPDX-License-Identifier: GPL-2.0-only
e7fd4179
DT
2/******************************************************************************
3*******************************************************************************
4**
7fe2b319 5** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179 6**
e7fd4179
DT
7**
8*******************************************************************************
9******************************************************************************/
10
11/* Central locking logic has four stages:
12
13 dlm_lock()
14 dlm_unlock()
15
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
20
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
25
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
30
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
33
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
38
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
41
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
47
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
51
52 L: send_xxxx() -> R: receive_xxxx()
53 R: do_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
55*/
f1d3b8f9
AA
56#include <trace/events/dlm.h>
57
597d0cae 58#include <linux/types.h>
9beb3bf5 59#include <linux/rbtree.h>
5a0e3ad6 60#include <linux/slab.h>
e7fd4179 61#include "dlm_internal.h"
597d0cae 62#include <linux/dlm_device.h>
e7fd4179 63#include "memory.h"
a070a91c 64#include "midcomms.h"
e7fd4179
DT
65#include "requestqueue.h"
66#include "util.h"
67#include "dir.h"
68#include "member.h"
69#include "lockspace.h"
70#include "ast.h"
71#include "lock.h"
72#include "rcom.h"
73#include "recover.h"
74#include "lvb_table.h"
597d0cae 75#include "user.h"
e7fd4179
DT
76#include "config.h"
77
78static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int send_remove(struct dlm_rsb *r);
86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90static int receive_extralen(struct dlm_message *ms);
8499137d 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 92static void del_timeout(struct dlm_lkb *lkb);
c04fecb4 93static void toss_rsb(struct kref *kref);
e7fd4179
DT
94
95/*
96 * Lock compatibilty matrix - thanks Steve
97 * UN = Unlocked state. Not really a state, used as a flag
98 * PD = Padding. Used to make the matrix a nice power of two in size
99 * Other states are the same as the VMS DLM.
100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101 */
102
103static const int __dlm_compat_matrix[8][8] = {
104 /* UN NL CR CW PR PW EX PD */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
113};
114
115/*
116 * This defines the direction of transfer of LVB data.
117 * Granted mode is the row; requested mode is the column.
118 * Usage: matrix[grmode+1][rqmode+1]
119 * 1 = LVB is returned to the caller
120 * 0 = LVB is written to the resource
121 * -1 = nothing happens to the LVB
122 */
123
124const int dlm_lvb_operations[8][8] = {
125 /* UN NL CR CW PR PW EX PD*/
126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134};
e7fd4179
DT
135
136#define modes_compat(gr, rq) \
137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139int dlm_modes_compat(int mode1, int mode2)
140{
141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142}
143
144/*
145 * Compatibility matrix for conversions with QUECVT set.
146 * Granted mode is the row; requested mode is the column.
147 * Usage: matrix[grmode+1][rqmode+1]
148 */
149
150static const int __quecvt_compat_matrix[8][8] = {
151 /* UN NL CR CW PR PW EX PD */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160};
161
597d0cae 162void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179 163{
6d40c4a7 164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
4875647a 165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
e7fd4179
DT
166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
4875647a
DT
168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 (unsigned long long)lkb->lkb_recover_seq);
e7fd4179
DT
170}
171
170e19ab 172static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179 173{
c04fecb4
DT
174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175 "rlc %d name %s\n",
176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 r->res_name);
e7fd4179
DT
179}
180
a345da3e
DT
181void dlm_dump_rsb(struct dlm_rsb *r)
182{
183 struct dlm_lkb *lkb;
184
185 dlm_print_rsb(r);
186
187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 printk(KERN_ERR "rsb lookup list\n");
190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191 dlm_print_lkb(lkb);
192 printk(KERN_ERR "rsb grant queue:\n");
193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194 dlm_print_lkb(lkb);
195 printk(KERN_ERR "rsb convert queue:\n");
196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197 dlm_print_lkb(lkb);
198 printk(KERN_ERR "rsb wait queue:\n");
199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200 dlm_print_lkb(lkb);
201}
202
e7fd4179
DT
203/* Threads cannot use the lockspace while it's being recovered */
204
85e86edf 205static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
206{
207 down_read(&ls->ls_in_recovery);
208}
209
85e86edf 210void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
211{
212 up_read(&ls->ls_in_recovery);
213}
214
85e86edf 215int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
216{
217 return down_read_trylock(&ls->ls_in_recovery);
218}
219
220static inline int can_be_queued(struct dlm_lkb *lkb)
221{
222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223}
224
225static inline int force_blocking_asts(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228}
229
230static inline int is_demoted(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233}
234
7d3c1feb
DT
235static inline int is_altmode(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238}
239
240static inline int is_granted(struct dlm_lkb *lkb)
241{
242 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243}
244
e7fd4179
DT
245static inline int is_remote(struct dlm_rsb *r)
246{
247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 return !!r->res_nodeid;
249}
250
251static inline int is_process_copy(struct dlm_lkb *lkb)
252{
253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254}
255
256static inline int is_master_copy(struct dlm_lkb *lkb)
257{
90135925 258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
259}
260
261static inline int middle_conversion(struct dlm_lkb *lkb)
262{
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
265 return 1;
266 return 0;
e7fd4179
DT
267}
268
269static inline int down_conversion(struct dlm_lkb *lkb)
270{
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272}
273
ef0c2bb0
DT
274static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275{
276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277}
278
279static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280{
281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282}
283
284static inline int is_overlap(struct dlm_lkb *lkb)
285{
286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 DLM_IFL_OVERLAP_CANCEL));
288}
289
e7fd4179
DT
290static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291{
292 if (is_master_copy(lkb))
293 return;
294
3ae1acf9
DT
295 del_timeout(lkb);
296
e7fd4179
DT
297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
6b0afc0c 299#ifdef CONFIG_DLM_DEPRECATED_API
3ae1acf9
DT
300 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
301 timeout caused the cancel then return -ETIMEDOUT */
302 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
303 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
304 rv = -ETIMEDOUT;
305 }
6b0afc0c 306#endif
3ae1acf9 307
8b4021fa
DT
308 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
309 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
310 rv = -EDEADLK;
311 }
312
23e8e1aa 313 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
314}
315
ef0c2bb0
DT
316static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
317{
318 queue_cast(r, lkb,
319 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
320}
321
e7fd4179
DT
322static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
323{
b6fa8796 324 if (is_master_copy(lkb)) {
e7fd4179 325 send_bast(r, lkb, rqmode);
b6fa8796 326 } else {
23e8e1aa 327 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 328 }
e7fd4179
DT
329}
330
331/*
332 * Basic operations on rsb's and lkb's
333 */
334
c04fecb4
DT
335/* This is only called to add a reference when the code already holds
336 a valid reference to the rsb, so there's no need for locking. */
337
338static inline void hold_rsb(struct dlm_rsb *r)
339{
340 kref_get(&r->res_ref);
341}
342
343void dlm_hold_rsb(struct dlm_rsb *r)
344{
345 hold_rsb(r);
346}
347
348/* When all references to the rsb are gone it's transferred to
349 the tossed list for later disposal. */
350
351static void put_rsb(struct dlm_rsb *r)
352{
353 struct dlm_ls *ls = r->res_ls;
354 uint32_t bucket = r->res_bucket;
9502a7f6 355 int rv;
c04fecb4 356
9502a7f6
AA
357 rv = kref_put_lock(&r->res_ref, toss_rsb,
358 &ls->ls_rsbtbl[bucket].lock);
359 if (rv)
360 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
c04fecb4
DT
361}
362
363void dlm_put_rsb(struct dlm_rsb *r)
364{
365 put_rsb(r);
366}
367
3881ac04
DT
368static int pre_rsb_struct(struct dlm_ls *ls)
369{
370 struct dlm_rsb *r1, *r2;
371 int count = 0;
372
373 spin_lock(&ls->ls_new_rsb_spin);
374 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
375 spin_unlock(&ls->ls_new_rsb_spin);
376 return 0;
377 }
378 spin_unlock(&ls->ls_new_rsb_spin);
379
380 r1 = dlm_allocate_rsb(ls);
381 r2 = dlm_allocate_rsb(ls);
382
383 spin_lock(&ls->ls_new_rsb_spin);
384 if (r1) {
385 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
386 ls->ls_new_rsb_count++;
387 }
388 if (r2) {
389 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
390 ls->ls_new_rsb_count++;
391 }
392 count = ls->ls_new_rsb_count;
393 spin_unlock(&ls->ls_new_rsb_spin);
394
395 if (!count)
396 return -ENOMEM;
397 return 0;
398}
399
400/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
401 unlock any spinlocks, go back and call pre_rsb_struct again.
402 Otherwise, take an rsb off the list and return it. */
403
404static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
405 struct dlm_rsb **r_ret)
e7fd4179
DT
406{
407 struct dlm_rsb *r;
3881ac04
DT
408 int count;
409
410 spin_lock(&ls->ls_new_rsb_spin);
411 if (list_empty(&ls->ls_new_rsb)) {
412 count = ls->ls_new_rsb_count;
413 spin_unlock(&ls->ls_new_rsb_spin);
414 log_debug(ls, "find_rsb retry %d %d %s",
415 count, dlm_config.ci_new_rsb_count, name);
416 return -EAGAIN;
417 }
e7fd4179 418
3881ac04
DT
419 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
420 list_del(&r->res_hashchain);
9beb3bf5
BP
421 /* Convert the empty list_head to a NULL rb_node for tree usage: */
422 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
3881ac04
DT
423 ls->ls_new_rsb_count--;
424 spin_unlock(&ls->ls_new_rsb_spin);
e7fd4179
DT
425
426 r->res_ls = ls;
427 r->res_length = len;
428 memcpy(r->res_name, name, len);
90135925 429 mutex_init(&r->res_mutex);
e7fd4179
DT
430
431 INIT_LIST_HEAD(&r->res_lookup);
432 INIT_LIST_HEAD(&r->res_grantqueue);
433 INIT_LIST_HEAD(&r->res_convertqueue);
434 INIT_LIST_HEAD(&r->res_waitqueue);
435 INIT_LIST_HEAD(&r->res_root_list);
436 INIT_LIST_HEAD(&r->res_recover_list);
437
3881ac04
DT
438 *r_ret = r;
439 return 0;
e7fd4179
DT
440}
441
9beb3bf5
BP
442static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
443{
444 char maxname[DLM_RESNAME_MAXLEN];
445
446 memset(maxname, 0, DLM_RESNAME_MAXLEN);
447 memcpy(maxname, name, nlen);
448 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
449}
450
7210cb7a 451int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
c04fecb4 452 struct dlm_rsb **r_ret)
e7fd4179 453{
9beb3bf5 454 struct rb_node *node = tree->rb_node;
e7fd4179 455 struct dlm_rsb *r;
9beb3bf5
BP
456 int rc;
457
458 while (node) {
459 r = rb_entry(node, struct dlm_rsb, res_hashnode);
460 rc = rsb_cmp(r, name, len);
461 if (rc < 0)
462 node = node->rb_left;
463 else if (rc > 0)
464 node = node->rb_right;
465 else
e7fd4179
DT
466 goto found;
467 }
18c60c0a 468 *r_ret = NULL;
597d0cae 469 return -EBADR;
e7fd4179
DT
470
471 found:
e7fd4179 472 *r_ret = r;
c04fecb4 473 return 0;
e7fd4179
DT
474}
475
9beb3bf5
BP
476static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
477{
478 struct rb_node **newn = &tree->rb_node;
479 struct rb_node *parent = NULL;
480 int rc;
481
482 while (*newn) {
483 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
484 res_hashnode);
485
486 parent = *newn;
487 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
488 if (rc < 0)
489 newn = &parent->rb_left;
490 else if (rc > 0)
491 newn = &parent->rb_right;
492 else {
493 log_print("rsb_insert match");
494 dlm_dump_rsb(rsb);
495 dlm_dump_rsb(cur);
496 return -EEXIST;
497 }
498 }
499
500 rb_link_node(&rsb->res_hashnode, parent, newn);
501 rb_insert_color(&rsb->res_hashnode, tree);
502 return 0;
503}
504
c04fecb4
DT
505/*
506 * Find rsb in rsbtbl and potentially create/add one
507 *
508 * Delaying the release of rsb's has a similar benefit to applications keeping
509 * NL locks on an rsb, but without the guarantee that the cached master value
510 * will still be valid when the rsb is reused. Apps aren't always smart enough
511 * to keep NL locks on an rsb that they may lock again shortly; this can lead
512 * to excessive master lookups and removals if we don't delay the release.
513 *
514 * Searching for an rsb means looking through both the normal list and toss
515 * list. When found on the toss list the rsb is moved to the normal list with
516 * ref count of 1; when found on normal list the ref count is incremented.
517 *
518 * rsb's on the keep list are being used locally and refcounted.
519 * rsb's on the toss list are not being used locally, and are not refcounted.
520 *
521 * The toss list rsb's were either
522 * - previously used locally but not any more (were on keep list, then
523 * moved to toss list when last refcount dropped)
524 * - created and put on toss list as a directory record for a lookup
525 * (we are the dir node for the res, but are not using the res right now,
526 * but some other node is)
527 *
528 * The purpose of find_rsb() is to return a refcounted rsb for local use.
529 * So, if the given rsb is on the toss list, it is moved to the keep list
530 * before being returned.
531 *
532 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
533 * more refcounts exist, so the rsb is moved from the keep list to the
534 * toss list.
535 *
536 * rsb's on both keep and toss lists are used for doing a name to master
537 * lookups. rsb's that are in use locally (and being refcounted) are on
538 * the keep list, rsb's that are not in use locally (not refcounted) and
539 * only exist for name/master lookups are on the toss list.
540 *
541 * rsb's on the toss list who's dir_nodeid is not local can have stale
542 * name/master mappings. So, remote requests on such rsb's can potentially
543 * return with an error, which means the mapping is stale and needs to
544 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
545 * first_lkid is to keep only a single outstanding request on an rsb
546 * while that rsb has a potentially stale master.)
547 */
548
549static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
550 uint32_t hash, uint32_t b,
551 int dir_nodeid, int from_nodeid,
552 unsigned int flags, struct dlm_rsb **r_ret)
e7fd4179 553{
c04fecb4
DT
554 struct dlm_rsb *r = NULL;
555 int our_nodeid = dlm_our_nodeid();
556 int from_local = 0;
557 int from_other = 0;
558 int from_dir = 0;
559 int create = 0;
e7fd4179
DT
560 int error;
561
c04fecb4
DT
562 if (flags & R_RECEIVE_REQUEST) {
563 if (from_nodeid == dir_nodeid)
564 from_dir = 1;
565 else
566 from_other = 1;
567 } else if (flags & R_REQUEST) {
568 from_local = 1;
569 }
570
571 /*
572 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
573 * from_nodeid has sent us a lock in dlm_recover_locks, believing
574 * we're the new master. Our local recovery may not have set
575 * res_master_nodeid to our_nodeid yet, so allow either. Don't
576 * create the rsb; dlm_recover_process_copy() will handle EBADR
577 * by resending.
578 *
579 * If someone sends us a request, we are the dir node, and we do
580 * not find the rsb anywhere, then recreate it. This happens if
581 * someone sends us a request after we have removed/freed an rsb
582 * from our toss list. (They sent a request instead of lookup
583 * because they are using an rsb from their toss list.)
584 */
585
586 if (from_local || from_dir ||
587 (from_other && (dir_nodeid == our_nodeid))) {
588 create = 1;
e7fd4179 589 }
57638bf3 590
c04fecb4
DT
591 retry:
592 if (create) {
593 error = pre_rsb_struct(ls);
594 if (error < 0)
595 goto out;
596 }
597
598 spin_lock(&ls->ls_rsbtbl[b].lock);
599
600 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
e7fd4179 601 if (error)
c04fecb4
DT
602 goto do_toss;
603
604 /*
605 * rsb is active, so we can't check master_nodeid without lock_rsb.
606 */
e7fd4179 607
c04fecb4 608 kref_get(&r->res_ref);
c04fecb4
DT
609 goto out_unlock;
610
611
612 do_toss:
613 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
9beb3bf5 614 if (error)
c04fecb4 615 goto do_new;
e7fd4179 616
c04fecb4
DT
617 /*
618 * rsb found inactive (master_nodeid may be out of date unless
619 * we are the dir_nodeid or were the master) No other thread
620 * is using this rsb because it's on the toss list, so we can
621 * look at or update res_master_nodeid without lock_rsb.
622 */
e7fd4179 623
c04fecb4
DT
624 if ((r->res_master_nodeid != our_nodeid) && from_other) {
625 /* our rsb was not master, and another node (not the dir node)
626 has sent us a request */
627 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
628 from_nodeid, r->res_master_nodeid, dir_nodeid,
629 r->res_name);
630 error = -ENOTBLK;
631 goto out_unlock;
632 }
633
634 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
635 /* don't think this should ever happen */
636 log_error(ls, "find_rsb toss from_dir %d master %d",
637 from_nodeid, r->res_master_nodeid);
638 dlm_print_rsb(r);
639 /* fix it and go on */
640 r->res_master_nodeid = our_nodeid;
641 r->res_nodeid = 0;
e7fd4179
DT
642 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
643 r->res_first_lkid = 0;
c04fecb4
DT
644 }
645
646 if (from_local && (r->res_master_nodeid != our_nodeid)) {
647 /* Because we have held no locks on this rsb,
648 res_master_nodeid could have become stale. */
e7fd4179
DT
649 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
650 r->res_first_lkid = 0;
c04fecb4
DT
651 }
652
653 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
654 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
655 goto out_unlock;
656
657
658 do_new:
659 /*
660 * rsb not found
661 */
662
663 if (error == -EBADR && !create)
664 goto out_unlock;
665
666 error = get_rsb_struct(ls, name, len, &r);
667 if (error == -EAGAIN) {
668 spin_unlock(&ls->ls_rsbtbl[b].lock);
669 goto retry;
670 }
671 if (error)
672 goto out_unlock;
673
674 r->res_hash = hash;
675 r->res_bucket = b;
676 r->res_dir_nodeid = dir_nodeid;
677 kref_init(&r->res_ref);
678
679 if (from_dir) {
680 /* want to see how often this happens */
681 log_debug(ls, "find_rsb new from_dir %d recreate %s",
682 from_nodeid, r->res_name);
683 r->res_master_nodeid = our_nodeid;
684 r->res_nodeid = 0;
685 goto out_add;
686 }
687
688 if (from_other && (dir_nodeid != our_nodeid)) {
689 /* should never happen */
690 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
691 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
692 dlm_free_rsb(r);
e8243f32 693 r = NULL;
c04fecb4
DT
694 error = -ENOTBLK;
695 goto out_unlock;
696 }
697
698 if (from_other) {
699 log_debug(ls, "find_rsb new from_other %d dir %d %s",
700 from_nodeid, dir_nodeid, r->res_name);
701 }
702
703 if (dir_nodeid == our_nodeid) {
704 /* When we are the dir nodeid, we can set the master
705 node immediately */
706 r->res_master_nodeid = our_nodeid;
707 r->res_nodeid = 0;
e7fd4179 708 } else {
c04fecb4
DT
709 /* set_master will send_lookup to dir_nodeid */
710 r->res_master_nodeid = 0;
711 r->res_nodeid = -1;
712 }
713
714 out_add:
715 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
716 out_unlock:
717 spin_unlock(&ls->ls_rsbtbl[b].lock);
718 out:
719 *r_ret = r;
720 return error;
721}
722
723/* During recovery, other nodes can send us new MSTCPY locks (from
724 dlm_recover_locks) before we've made ourself master (in
725 dlm_recover_masters). */
726
727static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
728 uint32_t hash, uint32_t b,
729 int dir_nodeid, int from_nodeid,
730 unsigned int flags, struct dlm_rsb **r_ret)
731{
732 struct dlm_rsb *r = NULL;
733 int our_nodeid = dlm_our_nodeid();
734 int recover = (flags & R_RECEIVE_RECOVER);
735 int error;
736
737 retry:
738 error = pre_rsb_struct(ls);
739 if (error < 0)
740 goto out;
741
742 spin_lock(&ls->ls_rsbtbl[b].lock);
743
744 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
745 if (error)
746 goto do_toss;
747
748 /*
749 * rsb is active, so we can't check master_nodeid without lock_rsb.
750 */
751
752 kref_get(&r->res_ref);
753 goto out_unlock;
754
755
756 do_toss:
757 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
758 if (error)
759 goto do_new;
760
761 /*
762 * rsb found inactive. No other thread is using this rsb because
763 * it's on the toss list, so we can look at or update
764 * res_master_nodeid without lock_rsb.
765 */
766
767 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
768 /* our rsb is not master, and another node has sent us a
769 request; this should never happen */
770 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
771 from_nodeid, r->res_master_nodeid, dir_nodeid);
772 dlm_print_rsb(r);
773 error = -ENOTBLK;
774 goto out_unlock;
e7fd4179 775 }
c04fecb4
DT
776
777 if (!recover && (r->res_master_nodeid != our_nodeid) &&
778 (dir_nodeid == our_nodeid)) {
779 /* our rsb is not master, and we are dir; may as well fix it;
780 this should never happen */
781 log_error(ls, "find_rsb toss our %d master %d dir %d",
782 our_nodeid, r->res_master_nodeid, dir_nodeid);
783 dlm_print_rsb(r);
784 r->res_master_nodeid = our_nodeid;
785 r->res_nodeid = 0;
786 }
787
788 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
789 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
790 goto out_unlock;
791
792
793 do_new:
794 /*
795 * rsb not found
796 */
797
798 error = get_rsb_struct(ls, name, len, &r);
799 if (error == -EAGAIN) {
800 spin_unlock(&ls->ls_rsbtbl[b].lock);
801 goto retry;
802 }
803 if (error)
804 goto out_unlock;
805
806 r->res_hash = hash;
807 r->res_bucket = b;
808 r->res_dir_nodeid = dir_nodeid;
809 r->res_master_nodeid = dir_nodeid;
810 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
811 kref_init(&r->res_ref);
812
813 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
814 out_unlock:
815 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
816 out:
817 *r_ret = r;
818 return error;
819}
820
c04fecb4
DT
821static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
822 unsigned int flags, struct dlm_rsb **r_ret)
823{
824 uint32_t hash, b;
825 int dir_nodeid;
826
827 if (len > DLM_RESNAME_MAXLEN)
828 return -EINVAL;
829
830 hash = jhash(name, len, 0);
831 b = hash & (ls->ls_rsbtbl_size - 1);
832
833 dir_nodeid = dlm_hash2nodeid(ls, hash);
834
835 if (dlm_no_directory(ls))
836 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
837 from_nodeid, flags, r_ret);
838 else
839 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
840 from_nodeid, flags, r_ret);
841}
842
843/* we have received a request and found that res_master_nodeid != our_nodeid,
844 so we need to return an error or make ourself the master */
845
846static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
847 int from_nodeid)
848{
849 if (dlm_no_directory(ls)) {
850 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
851 from_nodeid, r->res_master_nodeid,
852 r->res_dir_nodeid);
853 dlm_print_rsb(r);
854 return -ENOTBLK;
855 }
856
857 if (from_nodeid != r->res_dir_nodeid) {
858 /* our rsb is not master, and another node (not the dir node)
859 has sent us a request. this is much more common when our
860 master_nodeid is zero, so limit debug to non-zero. */
861
862 if (r->res_master_nodeid) {
863 log_debug(ls, "validate master from_other %d master %d "
864 "dir %d first %x %s", from_nodeid,
865 r->res_master_nodeid, r->res_dir_nodeid,
866 r->res_first_lkid, r->res_name);
867 }
868 return -ENOTBLK;
869 } else {
870 /* our rsb is not master, but the dir nodeid has sent us a
871 request; this could happen with master 0 / res_nodeid -1 */
872
873 if (r->res_master_nodeid) {
874 log_error(ls, "validate master from_dir %d master %d "
875 "first %x %s",
876 from_nodeid, r->res_master_nodeid,
877 r->res_first_lkid, r->res_name);
878 }
879
880 r->res_master_nodeid = dlm_our_nodeid();
881 r->res_nodeid = 0;
882 return 0;
883 }
884}
885
40159748
AA
886static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
887 int from_nodeid, bool toss_list, unsigned int flags,
888 int *r_nodeid, int *result)
889{
890 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
891 int from_master = (flags & DLM_LU_RECOVER_DIR);
892
893 if (r->res_dir_nodeid != our_nodeid) {
894 /* should not happen, but may as well fix it and carry on */
895 log_error(ls, "%s res_dir %d our %d %s", __func__,
896 r->res_dir_nodeid, our_nodeid, r->res_name);
897 r->res_dir_nodeid = our_nodeid;
898 }
899
900 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
901 /* Recovery uses this function to set a new master when
902 * the previous master failed. Setting NEW_MASTER will
903 * force dlm_recover_masters to call recover_master on this
904 * rsb even though the res_nodeid is no longer removed.
905 */
906
907 r->res_master_nodeid = from_nodeid;
908 r->res_nodeid = from_nodeid;
909 rsb_set_flag(r, RSB_NEW_MASTER);
910
911 if (toss_list) {
912 /* I don't think we should ever find it on toss list. */
913 log_error(ls, "%s fix_master on toss", __func__);
914 dlm_dump_rsb(r);
915 }
916 }
917
918 if (from_master && (r->res_master_nodeid != from_nodeid)) {
919 /* this will happen if from_nodeid became master during
920 * a previous recovery cycle, and we aborted the previous
921 * cycle before recovering this master value
922 */
923
924 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
925 __func__, from_nodeid, r->res_master_nodeid,
926 r->res_nodeid, r->res_first_lkid, r->res_name);
927
928 if (r->res_master_nodeid == our_nodeid) {
929 log_error(ls, "from_master %d our_master", from_nodeid);
930 dlm_dump_rsb(r);
931 goto ret_assign;
932 }
933
934 r->res_master_nodeid = from_nodeid;
935 r->res_nodeid = from_nodeid;
936 rsb_set_flag(r, RSB_NEW_MASTER);
937 }
938
939 if (!r->res_master_nodeid) {
940 /* this will happen if recovery happens while we're looking
941 * up the master for this rsb
942 */
943
944 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
945 from_nodeid, r->res_first_lkid, r->res_name);
946 r->res_master_nodeid = from_nodeid;
947 r->res_nodeid = from_nodeid;
948 }
949
950 if (!from_master && !fix_master &&
951 (r->res_master_nodeid == from_nodeid)) {
952 /* this can happen when the master sends remove, the dir node
953 * finds the rsb on the keep list and ignores the remove,
954 * and the former master sends a lookup
955 */
956
957 log_limit(ls, "%s from master %d flags %x first %x %s",
958 __func__, from_nodeid, flags, r->res_first_lkid,
959 r->res_name);
960 }
961
962 ret_assign:
963 *r_nodeid = r->res_master_nodeid;
964 if (result)
965 *result = DLM_LU_MATCH;
966}
967
e7fd4179 968/*
c04fecb4
DT
969 * We're the dir node for this res and another node wants to know the
970 * master nodeid. During normal operation (non recovery) this is only
971 * called from receive_lookup(); master lookups when the local node is
972 * the dir node are done by find_rsb().
e7fd4179 973 *
c04fecb4
DT
974 * normal operation, we are the dir node for a resource
975 * . _request_lock
976 * . set_master
977 * . send_lookup
978 * . receive_lookup
979 * . dlm_master_lookup flags 0
e7fd4179 980 *
c04fecb4
DT
981 * recover directory, we are rebuilding dir for all resources
982 * . dlm_recover_directory
983 * . dlm_rcom_names
984 * remote node sends back the rsb names it is master of and we are dir of
985 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
986 * we either create new rsb setting remote node as master, or find existing
987 * rsb and set master to be the remote node.
988 *
989 * recover masters, we are finding the new master for resources
990 * . dlm_recover_masters
991 * . recover_master
992 * . dlm_send_rcom_lookup
993 * . receive_rcom_lookup
994 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
e7fd4179
DT
995 */
996
c04fecb4
DT
997int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
998 unsigned int flags, int *r_nodeid, int *result)
e7fd4179 999{
3881ac04 1000 struct dlm_rsb *r = NULL;
c04fecb4 1001 uint32_t hash, b;
c04fecb4 1002 int our_nodeid = dlm_our_nodeid();
40159748 1003 int dir_nodeid, error;
ef58bcca 1004
c04fecb4
DT
1005 if (len > DLM_RESNAME_MAXLEN)
1006 return -EINVAL;
1007
1008 if (from_nodeid == our_nodeid) {
1009 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1010 our_nodeid, flags);
1011 return -EINVAL;
3881ac04 1012 }
e7fd4179 1013
c04fecb4
DT
1014 hash = jhash(name, len, 0);
1015 b = hash & (ls->ls_rsbtbl_size - 1);
e7fd4179 1016
c04fecb4
DT
1017 dir_nodeid = dlm_hash2nodeid(ls, hash);
1018 if (dir_nodeid != our_nodeid) {
1019 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1020 from_nodeid, dir_nodeid, our_nodeid, hash,
1021 ls->ls_num_nodes);
1022 *r_nodeid = -1;
1023 return -EINVAL;
1024 }
e7fd4179 1025
3881ac04 1026 retry:
c04fecb4
DT
1027 error = pre_rsb_struct(ls);
1028 if (error < 0)
1029 return error;
1030
1031 spin_lock(&ls->ls_rsbtbl[b].lock);
1032 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1033 if (!error) {
1034 /* because the rsb is active, we need to lock_rsb before
40159748
AA
1035 * checking/changing re_master_nodeid
1036 */
c04fecb4
DT
1037
1038 hold_rsb(r);
1039 spin_unlock(&ls->ls_rsbtbl[b].lock);
1040 lock_rsb(r);
e7fd4179 1041
40159748
AA
1042 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1043 flags, r_nodeid, result);
c04fecb4 1044
40159748
AA
1045 /* the rsb was active */
1046 unlock_rsb(r);
1047 put_rsb(r);
c04fecb4 1048
40159748 1049 return 0;
c04fecb4
DT
1050 }
1051
40159748
AA
1052 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1053 if (error)
1054 goto not_found;
c04fecb4 1055
40159748
AA
1056 /* because the rsb is inactive (on toss list), it's not refcounted
1057 * and lock_rsb is not used, but is protected by the rsbtbl lock
1058 */
c04fecb4 1059
40159748
AA
1060 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1061 r_nodeid, result);
c04fecb4 1062
40159748
AA
1063 r->res_toss_time = jiffies;
1064 /* the rsb was inactive (on toss list) */
1065 spin_unlock(&ls->ls_rsbtbl[b].lock);
c04fecb4 1066
c04fecb4 1067 return 0;
e7fd4179 1068
c04fecb4
DT
1069 not_found:
1070 error = get_rsb_struct(ls, name, len, &r);
3881ac04 1071 if (error == -EAGAIN) {
c04fecb4 1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
3881ac04
DT
1073 goto retry;
1074 }
1075 if (error)
1076 goto out_unlock;
e7fd4179
DT
1077
1078 r->res_hash = hash;
c04fecb4
DT
1079 r->res_bucket = b;
1080 r->res_dir_nodeid = our_nodeid;
1081 r->res_master_nodeid = from_nodeid;
1082 r->res_nodeid = from_nodeid;
e7fd4179 1083 kref_init(&r->res_ref);
c04fecb4 1084 r->res_toss_time = jiffies;
e7fd4179 1085
c04fecb4
DT
1086 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1087 if (error) {
1088 /* should never happen */
1089 dlm_free_rsb(r);
1090 spin_unlock(&ls->ls_rsbtbl[b].lock);
1091 goto retry;
e7fd4179 1092 }
c04fecb4
DT
1093
1094 if (result)
1095 *result = DLM_LU_ADD;
1096 *r_nodeid = from_nodeid;
3881ac04 1097 out_unlock:
c04fecb4 1098 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1099 return error;
1100}
1101
6d40c4a7
DT
1102static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1103{
1104 struct rb_node *n;
1105 struct dlm_rsb *r;
1106 int i;
1107
1108 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1109 spin_lock(&ls->ls_rsbtbl[i].lock);
1110 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1111 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1112 if (r->res_hash == hash)
1113 dlm_dump_rsb(r);
1114 }
1115 spin_unlock(&ls->ls_rsbtbl[i].lock);
1116 }
1117}
1118
c04fecb4 1119void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
e7fd4179 1120{
c04fecb4
DT
1121 struct dlm_rsb *r = NULL;
1122 uint32_t hash, b;
1123 int error;
e7fd4179 1124
c04fecb4
DT
1125 hash = jhash(name, len, 0);
1126 b = hash & (ls->ls_rsbtbl_size - 1);
1127
1128 spin_lock(&ls->ls_rsbtbl[b].lock);
1129 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1130 if (!error)
1131 goto out_dump;
1132
1133 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1134 if (error)
1135 goto out;
1136 out_dump:
1137 dlm_dump_rsb(r);
1138 out:
1139 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1140}
1141
1142static void toss_rsb(struct kref *kref)
1143{
1144 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1145 struct dlm_ls *ls = r->res_ls;
1146
1147 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1148 kref_init(&r->res_ref);
9beb3bf5
BP
1149 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1150 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
e7fd4179 1151 r->res_toss_time = jiffies;
f1172283 1152 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
e7fd4179 1153 if (r->res_lvbptr) {
52bda2b5 1154 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
1155 r->res_lvbptr = NULL;
1156 }
1157}
1158
e7fd4179
DT
1159/* See comment for unhold_lkb */
1160
1161static void unhold_rsb(struct dlm_rsb *r)
1162{
1163 int rv;
1164 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 1165 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
1166}
1167
1168static void kill_rsb(struct kref *kref)
1169{
1170 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1171
1172 /* All work is done after the return from kref_put() so we
1173 can release the write_lock before the remove and free. */
1174
a345da3e
DT
1175 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1176 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1177 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1178 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1179 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1180 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
1181}
1182
1183/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1184 The rsb must exist as long as any lkb's for it do. */
1185
1186static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1187{
1188 hold_rsb(r);
1189 lkb->lkb_resource = r;
1190}
1191
1192static void detach_lkb(struct dlm_lkb *lkb)
1193{
1194 if (lkb->lkb_resource) {
1195 put_rsb(lkb->lkb_resource);
1196 lkb->lkb_resource = NULL;
1197 }
1198}
1199
75d25ffe
AA
1200static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1201 int start, int end)
e7fd4179 1202{
3d6aa675 1203 struct dlm_lkb *lkb;
2a86b3e7 1204 int rv;
e7fd4179 1205
52bda2b5 1206 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
1207 if (!lkb)
1208 return -ENOMEM;
1209
1210 lkb->lkb_nodeid = -1;
1211 lkb->lkb_grmode = DLM_LOCK_IV;
1212 kref_init(&lkb->lkb_ref);
34e22bed 1213 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 1214 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
6b0afc0c 1215#ifdef CONFIG_DLM_DEPRECATED_API
3ae1acf9 1216 INIT_LIST_HEAD(&lkb->lkb_time_list);
6b0afc0c 1217#endif
23e8e1aa
DT
1218 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1219 mutex_init(&lkb->lkb_cb_mutex);
1220 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
e7fd4179 1221
2a86b3e7 1222 idr_preload(GFP_NOFS);
3d6aa675 1223 spin_lock(&ls->ls_lkbidr_spin);
75d25ffe 1224 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
2a86b3e7
TH
1225 if (rv >= 0)
1226 lkb->lkb_id = rv;
3d6aa675 1227 spin_unlock(&ls->ls_lkbidr_spin);
2a86b3e7 1228 idr_preload_end();
e7fd4179 1229
3d6aa675
DT
1230 if (rv < 0) {
1231 log_error(ls, "create_lkb idr error %d", rv);
23851e97 1232 dlm_free_lkb(lkb);
3d6aa675 1233 return rv;
e7fd4179
DT
1234 }
1235
e7fd4179
DT
1236 *lkb_ret = lkb;
1237 return 0;
1238}
1239
75d25ffe
AA
1240static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1241{
1242 return _create_lkb(ls, lkb_ret, 1, 0);
1243}
1244
e7fd4179
DT
1245static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1246{
1247 struct dlm_lkb *lkb;
e7fd4179 1248
3d6aa675
DT
1249 spin_lock(&ls->ls_lkbidr_spin);
1250 lkb = idr_find(&ls->ls_lkbidr, lkid);
e7fd4179
DT
1251 if (lkb)
1252 kref_get(&lkb->lkb_ref);
3d6aa675 1253 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
1254
1255 *lkb_ret = lkb;
1256 return lkb ? 0 : -ENOENT;
1257}
1258
1259static void kill_lkb(struct kref *kref)
1260{
1261 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1262
1263 /* All work is done after the return from kref_put() so we
1264 can release the write_lock before the detach_lkb */
1265
1266 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1267}
1268
b3f58d8f
DT
1269/* __put_lkb() is used when an lkb may not have an rsb attached to
1270 it so we need to provide the lockspace explicitly */
1271
1272static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 1273{
3d6aa675 1274 uint32_t lkid = lkb->lkb_id;
8e51ec61 1275 int rv;
e7fd4179 1276
8e51ec61
AA
1277 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1278 &ls->ls_lkbidr_spin);
1279 if (rv) {
3d6aa675
DT
1280 idr_remove(&ls->ls_lkbidr, lkid);
1281 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
1282
1283 detach_lkb(lkb);
1284
1285 /* for local/process lkbs, lvbptr points to caller's lksb */
1286 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
1287 dlm_free_lvb(lkb->lkb_lvbptr);
1288 dlm_free_lkb(lkb);
e7fd4179 1289 }
8e51ec61
AA
1290
1291 return rv;
e7fd4179
DT
1292}
1293
1294int dlm_put_lkb(struct dlm_lkb *lkb)
1295{
b3f58d8f
DT
1296 struct dlm_ls *ls;
1297
1298 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1299 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1300
1301 ls = lkb->lkb_resource->res_ls;
1302 return __put_lkb(ls, lkb);
e7fd4179
DT
1303}
1304
1305/* This is only called to add a reference when the code already holds
1306 a valid reference to the lkb, so there's no need for locking. */
1307
1308static inline void hold_lkb(struct dlm_lkb *lkb)
1309{
1310 kref_get(&lkb->lkb_ref);
1311}
1312
95858989
AA
1313static void unhold_lkb_assert(struct kref *kref)
1314{
1315 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1316
1317 DLM_ASSERT(false, dlm_print_lkb(lkb););
1318}
1319
e7fd4179
DT
1320/* This is called when we need to remove a reference and are certain
1321 it's not the last ref. e.g. del_lkb is always called between a
1322 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1323 put_lkb would work fine, but would involve unnecessary locking */
1324
1325static inline void unhold_lkb(struct dlm_lkb *lkb)
1326{
95858989 1327 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
e7fd4179
DT
1328}
1329
1330static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1331 int mode)
1332{
c490b3af 1333 struct dlm_lkb *lkb = NULL, *iter;
e7fd4179 1334
c490b3af
JK
1335 list_for_each_entry(iter, head, lkb_statequeue)
1336 if (iter->lkb_rqmode < mode) {
1337 lkb = iter;
1338 list_add_tail(new, &iter->lkb_statequeue);
e7fd4179 1339 break;
c490b3af 1340 }
e7fd4179 1341
c490b3af
JK
1342 if (!lkb)
1343 list_add_tail(new, head);
e7fd4179
DT
1344}
1345
1346/* add/remove lkb to rsb's grant/convert/wait queue */
1347
1348static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1349{
1350 kref_get(&lkb->lkb_ref);
1351
1352 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1353
eeda418d
DT
1354 lkb->lkb_timestamp = ktime_get();
1355
e7fd4179
DT
1356 lkb->lkb_status = status;
1357
1358 switch (status) {
1359 case DLM_LKSTS_WAITING:
1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1362 else
1363 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1364 break;
1365 case DLM_LKSTS_GRANTED:
1366 /* convention says granted locks kept in order of grmode */
1367 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1368 lkb->lkb_grmode);
1369 break;
1370 case DLM_LKSTS_CONVERT:
1371 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1372 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1373 else
1374 list_add_tail(&lkb->lkb_statequeue,
1375 &r->res_convertqueue);
1376 break;
1377 default:
1378 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1379 }
1380}
1381
1382static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1383{
1384 lkb->lkb_status = 0;
1385 list_del(&lkb->lkb_statequeue);
1386 unhold_lkb(lkb);
1387}
1388
1389static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1390{
1391 hold_lkb(lkb);
1392 del_lkb(r, lkb);
1393 add_lkb(r, lkb, sts);
1394 unhold_lkb(lkb);
1395}
1396
ef0c2bb0
DT
1397static int msg_reply_type(int mstype)
1398{
1399 switch (mstype) {
1400 case DLM_MSG_REQUEST:
1401 return DLM_MSG_REQUEST_REPLY;
1402 case DLM_MSG_CONVERT:
1403 return DLM_MSG_CONVERT_REPLY;
1404 case DLM_MSG_UNLOCK:
1405 return DLM_MSG_UNLOCK_REPLY;
1406 case DLM_MSG_CANCEL:
1407 return DLM_MSG_CANCEL_REPLY;
1408 case DLM_MSG_LOOKUP:
1409 return DLM_MSG_LOOKUP_REPLY;
1410 }
1411 return -1;
1412}
1413
e7fd4179
DT
1414/* add/remove lkb from global waiters list of lkb's waiting for
1415 a reply from a remote node */
1416
c6ff669b 1417static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
1418{
1419 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 1420 int error = 0;
e7fd4179 1421
90135925 1422 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
1423
1424 if (is_overlap_unlock(lkb) ||
1425 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1426 error = -EINVAL;
1427 goto out;
1428 }
1429
1430 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1431 switch (mstype) {
1432 case DLM_MSG_UNLOCK:
1433 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1434 break;
1435 case DLM_MSG_CANCEL:
1436 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1437 break;
1438 default:
1439 error = -EBUSY;
1440 goto out;
1441 }
1442 lkb->lkb_wait_count++;
1443 hold_lkb(lkb);
1444
43279e53 1445 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
1446 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1447 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
1448 goto out;
1449 }
ef0c2bb0
DT
1450
1451 DLM_ASSERT(!lkb->lkb_wait_count,
1452 dlm_print_lkb(lkb);
1453 printk("wait_count %d\n", lkb->lkb_wait_count););
1454
1455 lkb->lkb_wait_count++;
e7fd4179 1456 lkb->lkb_wait_type = mstype;
c6ff669b 1457 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 1458 hold_lkb(lkb);
e7fd4179
DT
1459 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1460 out:
ef0c2bb0 1461 if (error)
43279e53 1462 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
1463 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1464 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 1465 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 1466 return error;
e7fd4179
DT
1467}
1468
b790c3b7
DT
1469/* We clear the RESEND flag because we might be taking an lkb off the waiters
1470 list as part of process_requestqueue (e.g. a lookup that has an optimized
1471 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1472 set RESEND and dlm_recover_waiters_post() */
1473
43279e53
DT
1474static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1475 struct dlm_message *ms)
e7fd4179 1476{
ef0c2bb0
DT
1477 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1478 int overlap_done = 0;
e7fd4179 1479
ef0c2bb0 1480 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 1481 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1483 overlap_done = 1;
1484 goto out_del;
e7fd4179 1485 }
ef0c2bb0
DT
1486
1487 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 1488 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
1489 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1490 overlap_done = 1;
1491 goto out_del;
1492 }
1493
43279e53
DT
1494 /* Cancel state was preemptively cleared by a successful convert,
1495 see next comment, nothing to do. */
1496
1497 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1498 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1499 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1500 lkb->lkb_id, lkb->lkb_wait_type);
1501 return -1;
1502 }
1503
1504 /* Remove for the convert reply, and premptively remove for the
1505 cancel reply. A convert has been granted while there's still
1506 an outstanding cancel on it (the cancel is moot and the result
1507 in the cancel reply should be 0). We preempt the cancel reply
1508 because the app gets the convert result and then can follow up
1509 with another op, like convert. This subsequent op would see the
1510 lingering state of the cancel and fail with -EBUSY. */
1511
1512 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1513 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1514 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1515 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1516 lkb->lkb_id);
1517 lkb->lkb_wait_type = 0;
1518 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1519 lkb->lkb_wait_count--;
1689c169 1520 unhold_lkb(lkb);
43279e53
DT
1521 goto out_del;
1522 }
1523
ef0c2bb0
DT
1524 /* N.B. type of reply may not always correspond to type of original
1525 msg due to lookup->request optimization, verify others? */
1526
1527 if (lkb->lkb_wait_type) {
1528 lkb->lkb_wait_type = 0;
1529 goto out_del;
1530 }
1531
6d40c4a7 1532 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
3428785a
AA
1533 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1534 lkb->lkb_remid, mstype, lkb->lkb_flags);
ef0c2bb0
DT
1535 return -1;
1536
1537 out_del:
1538 /* the force-unlock/cancel has completed and we haven't recvd a reply
1539 to the op that was in progress prior to the unlock/cancel; we
1540 give up on any reply to the earlier op. FIXME: not sure when/how
1541 this would happen */
1542
1543 if (overlap_done && lkb->lkb_wait_type) {
43279e53 1544 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
1545 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1546 lkb->lkb_wait_count--;
1689c169 1547 unhold_lkb(lkb);
ef0c2bb0
DT
1548 lkb->lkb_wait_type = 0;
1549 }
1550
1551 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1552
b790c3b7 1553 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
1554 lkb->lkb_wait_count--;
1555 if (!lkb->lkb_wait_count)
1556 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 1557 unhold_lkb(lkb);
ef0c2bb0 1558 return 0;
e7fd4179
DT
1559}
1560
ef0c2bb0 1561static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
1562{
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564 int error;
1565
90135925 1566 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1567 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1568 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1569 return error;
1570}
1571
ef0c2bb0
DT
1572/* Handles situations where we might be processing a "fake" or "stub" reply in
1573 which we can't try to take waiters_mutex again. */
1574
1575static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1576{
1577 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1578 int error;
1579
00e99ccd 1580 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
ef0c2bb0 1581 mutex_lock(&ls->ls_waiters_mutex);
00e99ccd
AA
1582 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1583 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
ef0c2bb0
DT
1584 mutex_unlock(&ls->ls_waiters_mutex);
1585 return error;
1586}
1587
05c32f47 1588/* If there's an rsb for the same resource being removed, ensure
21d9ac1a
AA
1589 * that the remove message is sent before the new lookup message.
1590 */
1591
1592#define DLM_WAIT_PENDING_COND(ls, r) \
1593 (ls->ls_remove_len && \
1594 !rsb_cmp(r, ls->ls_remove_name, \
1595 ls->ls_remove_len))
e7fd4179 1596
05c32f47 1597static void wait_pending_remove(struct dlm_rsb *r)
e7fd4179 1598{
05c32f47
DT
1599 struct dlm_ls *ls = r->res_ls;
1600 restart:
1601 spin_lock(&ls->ls_remove_spin);
21d9ac1a 1602 if (DLM_WAIT_PENDING_COND(ls, r)) {
05c32f47 1603 log_debug(ls, "delay lookup for remove dir %d %s",
21d9ac1a 1604 r->res_dir_nodeid, r->res_name);
05c32f47 1605 spin_unlock(&ls->ls_remove_spin);
21d9ac1a 1606 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
05c32f47
DT
1607 goto restart;
1608 }
1609 spin_unlock(&ls->ls_remove_spin);
1610}
e7fd4179 1611
05c32f47
DT
1612/*
1613 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1614 * read by other threads in wait_pending_remove. ls_remove_names
1615 * and ls_remove_lens are only used by the scan thread, so they do
1616 * not need protection.
1617 */
c04fecb4 1618
05c32f47
DT
1619static void shrink_bucket(struct dlm_ls *ls, int b)
1620{
1621 struct rb_node *n, *next;
1622 struct dlm_rsb *r;
1623 char *name;
1624 int our_nodeid = dlm_our_nodeid();
1625 int remote_count = 0;
f1172283 1626 int need_shrink = 0;
05c32f47 1627 int i, len, rv;
c04fecb4 1628
05c32f47 1629 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
c04fecb4 1630
05c32f47 1631 spin_lock(&ls->ls_rsbtbl[b].lock);
f1172283
DT
1632
1633 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1634 spin_unlock(&ls->ls_rsbtbl[b].lock);
1635 return;
1636 }
1637
05c32f47
DT
1638 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1639 next = rb_next(n);
1640 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1641
1642 /* If we're the directory record for this rsb, and
1643 we're not the master of it, then we need to wait
1644 for the master node to send us a dir remove for
1645 before removing the dir record. */
1646
1647 if (!dlm_no_directory(ls) &&
1648 (r->res_master_nodeid != our_nodeid) &&
1649 (dlm_dir_nodeid(r) == our_nodeid)) {
1650 continue;
e7fd4179
DT
1651 }
1652
f1172283
DT
1653 need_shrink = 1;
1654
05c32f47
DT
1655 if (!time_after_eq(jiffies, r->res_toss_time +
1656 dlm_config.ci_toss_secs * HZ)) {
1657 continue;
e7fd4179
DT
1658 }
1659
05c32f47
DT
1660 if (!dlm_no_directory(ls) &&
1661 (r->res_master_nodeid == our_nodeid) &&
1662 (dlm_dir_nodeid(r) != our_nodeid)) {
e7fd4179 1663
c04fecb4
DT
1664 /* We're the master of this rsb but we're not
1665 the directory record, so we need to tell the
1666 dir node to remove the dir record. */
1667
05c32f47
DT
1668 ls->ls_remove_lens[remote_count] = r->res_length;
1669 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1670 DLM_RESNAME_MAXLEN);
1671 remote_count++;
c04fecb4 1672
05c32f47
DT
1673 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1674 break;
1675 continue;
1676 }
1677
1678 if (!kref_put(&r->res_ref, kill_rsb)) {
e7fd4179 1679 log_error(ls, "tossed rsb in use %s", r->res_name);
05c32f47 1680 continue;
e7fd4179 1681 }
05c32f47
DT
1682
1683 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1684 dlm_free_rsb(r);
e7fd4179 1685 }
f1172283
DT
1686
1687 if (need_shrink)
1688 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1689 else
1690 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
05c32f47 1691 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179 1692
05c32f47
DT
1693 /*
1694 * While searching for rsb's to free, we found some that require
1695 * remote removal. We leave them in place and find them again here
1696 * so there is a very small gap between removing them from the toss
1697 * list and sending the removal. Keeping this gap small is
1698 * important to keep us (the master node) from being out of sync
1699 * with the remote dir node for very long.
1700 *
1701 * From the time the rsb is removed from toss until just after
1702 * send_remove, the rsb name is saved in ls_remove_name. A new
1703 * lookup checks this to ensure that a new lookup message for the
1704 * same resource name is not sent just before the remove message.
1705 */
1706
1707 for (i = 0; i < remote_count; i++) {
1708 name = ls->ls_remove_names[i];
1709 len = ls->ls_remove_lens[i];
1710
1711 spin_lock(&ls->ls_rsbtbl[b].lock);
1712 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1713 if (rv) {
1714 spin_unlock(&ls->ls_rsbtbl[b].lock);
1715 log_debug(ls, "remove_name not toss %s", name);
1716 continue;
1717 }
1718
1719 if (r->res_master_nodeid != our_nodeid) {
1720 spin_unlock(&ls->ls_rsbtbl[b].lock);
1721 log_debug(ls, "remove_name master %d dir %d our %d %s",
1722 r->res_master_nodeid, r->res_dir_nodeid,
1723 our_nodeid, name);
1724 continue;
1725 }
1726
1727 if (r->res_dir_nodeid == our_nodeid) {
1728 /* should never happen */
1729 spin_unlock(&ls->ls_rsbtbl[b].lock);
1730 log_error(ls, "remove_name dir %d master %d our %d %s",
1731 r->res_dir_nodeid, r->res_master_nodeid,
1732 our_nodeid, name);
1733 continue;
1734 }
1735
1736 if (!time_after_eq(jiffies, r->res_toss_time +
1737 dlm_config.ci_toss_secs * HZ)) {
1738 spin_unlock(&ls->ls_rsbtbl[b].lock);
1739 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1740 r->res_toss_time, jiffies, name);
1741 continue;
1742 }
1743
1744 if (!kref_put(&r->res_ref, kill_rsb)) {
1745 spin_unlock(&ls->ls_rsbtbl[b].lock);
1746 log_error(ls, "remove_name in use %s", name);
1747 continue;
1748 }
1749
1750 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1751
1752 /* block lookup of same name until we've sent remove */
1753 spin_lock(&ls->ls_remove_spin);
1754 ls->ls_remove_len = len;
1755 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1756 spin_unlock(&ls->ls_remove_spin);
1757 spin_unlock(&ls->ls_rsbtbl[b].lock);
1758
1759 send_remove(r);
1760
1761 /* allow lookup of name again */
1762 spin_lock(&ls->ls_remove_spin);
1763 ls->ls_remove_len = 0;
1764 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1765 spin_unlock(&ls->ls_remove_spin);
f6f74183 1766 wake_up(&ls->ls_remove_wait);
05c32f47
DT
1767
1768 dlm_free_rsb(r);
1769 }
e7fd4179
DT
1770}
1771
1772void dlm_scan_rsbs(struct dlm_ls *ls)
1773{
1774 int i;
1775
e7fd4179
DT
1776 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1777 shrink_bucket(ls, i);
85e86edf
DT
1778 if (dlm_locking_stopped(ls))
1779 break;
e7fd4179
DT
1780 cond_resched();
1781 }
1782}
1783
6b0afc0c 1784#ifdef CONFIG_DLM_DEPRECATED_API
3ae1acf9
DT
1785static void add_timeout(struct dlm_lkb *lkb)
1786{
1787 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1788
eeda418d 1789 if (is_master_copy(lkb))
3ae1acf9 1790 return;
3ae1acf9
DT
1791
1792 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1793 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1794 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1795 goto add_it;
1796 }
84d8cd69
DT
1797 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1798 goto add_it;
3ae1acf9
DT
1799 return;
1800
1801 add_it:
1802 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1803 mutex_lock(&ls->ls_timeout_mutex);
1804 hold_lkb(lkb);
3ae1acf9
DT
1805 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1806 mutex_unlock(&ls->ls_timeout_mutex);
1807}
1808
1809static void del_timeout(struct dlm_lkb *lkb)
1810{
1811 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1812
1813 mutex_lock(&ls->ls_timeout_mutex);
1814 if (!list_empty(&lkb->lkb_time_list)) {
1815 list_del_init(&lkb->lkb_time_list);
1816 unhold_lkb(lkb);
1817 }
1818 mutex_unlock(&ls->ls_timeout_mutex);
1819}
1820
1821/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1822 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1823 and then lock rsb because of lock ordering in add_timeout. We may need
1824 to specify some special timeout-related bits in the lkb that are just to
1825 be accessed under the timeout_mutex. */
1826
1827void dlm_scan_timeout(struct dlm_ls *ls)
1828{
1829 struct dlm_rsb *r;
dc1acd5c 1830 struct dlm_lkb *lkb = NULL, *iter;
3ae1acf9 1831 int do_cancel, do_warn;
eeda418d 1832 s64 wait_us;
3ae1acf9
DT
1833
1834 for (;;) {
1835 if (dlm_locking_stopped(ls))
1836 break;
1837
1838 do_cancel = 0;
1839 do_warn = 0;
1840 mutex_lock(&ls->ls_timeout_mutex);
dc1acd5c 1841 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
3ae1acf9 1842
eeda418d 1843 wait_us = ktime_to_us(ktime_sub(ktime_get(),
dc1acd5c 1844 iter->lkb_timestamp));
eeda418d 1845
dc1acd5c
JK
1846 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1847 wait_us >= (iter->lkb_timeout_cs * 10000))
3ae1acf9
DT
1848 do_cancel = 1;
1849
dc1acd5c 1850 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1851 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1852 do_warn = 1;
1853
1854 if (!do_cancel && !do_warn)
1855 continue;
dc1acd5c
JK
1856 hold_lkb(iter);
1857 lkb = iter;
3ae1acf9
DT
1858 break;
1859 }
1860 mutex_unlock(&ls->ls_timeout_mutex);
1861
dc1acd5c 1862 if (!lkb)
3ae1acf9
DT
1863 break;
1864
1865 r = lkb->lkb_resource;
1866 hold_rsb(r);
1867 lock_rsb(r);
1868
1869 if (do_warn) {
1870 /* clear flag so we only warn once */
1871 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1872 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1873 del_timeout(lkb);
1874 dlm_timeout_warn(lkb);
1875 }
1876
1877 if (do_cancel) {
b3cab7b9 1878 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1879 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1880 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1881 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1882 del_timeout(lkb);
1883 _cancel_lock(r, lkb);
1884 }
1885
1886 unlock_rsb(r);
1887 unhold_rsb(r);
1888 dlm_put_lkb(lkb);
1889 }
1890}
1891
1892/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1893 dlm_recoverd before checking/setting ls_recover_begin. */
1894
1895void dlm_adjust_timeouts(struct dlm_ls *ls)
1896{
1897 struct dlm_lkb *lkb;
eeda418d 1898 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1899
1900 ls->ls_recover_begin = 0;
1901 mutex_lock(&ls->ls_timeout_mutex);
1902 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1903 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9
DT
1904 mutex_unlock(&ls->ls_timeout_mutex);
1905}
6b0afc0c
AA
1906#else
1907static void add_timeout(struct dlm_lkb *lkb) { }
1908static void del_timeout(struct dlm_lkb *lkb) { }
1909#endif
3ae1acf9 1910
e7fd4179
DT
1911/* lkb is master or local copy */
1912
1913static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1914{
1915 int b, len = r->res_ls->ls_lvblen;
1916
1917 /* b=1 lvb returned to caller
1918 b=0 lvb written to rsb or invalidated
1919 b=-1 do nothing */
1920
1921 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1922
1923 if (b == 1) {
1924 if (!lkb->lkb_lvbptr)
1925 return;
1926
1927 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1928 return;
1929
1930 if (!r->res_lvbptr)
1931 return;
1932
1933 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1934 lkb->lkb_lvbseq = r->res_lvbseq;
1935
1936 } else if (b == 0) {
1937 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1938 rsb_set_flag(r, RSB_VALNOTVALID);
1939 return;
1940 }
1941
1942 if (!lkb->lkb_lvbptr)
1943 return;
1944
1945 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1946 return;
1947
1948 if (!r->res_lvbptr)
52bda2b5 1949 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1950
1951 if (!r->res_lvbptr)
1952 return;
1953
1954 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1955 r->res_lvbseq++;
1956 lkb->lkb_lvbseq = r->res_lvbseq;
1957 rsb_clear_flag(r, RSB_VALNOTVALID);
1958 }
1959
1960 if (rsb_flag(r, RSB_VALNOTVALID))
1961 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1962}
1963
1964static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965{
1966 if (lkb->lkb_grmode < DLM_LOCK_PW)
1967 return;
1968
1969 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1970 rsb_set_flag(r, RSB_VALNOTVALID);
1971 return;
1972 }
1973
1974 if (!lkb->lkb_lvbptr)
1975 return;
1976
1977 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1978 return;
1979
1980 if (!r->res_lvbptr)
52bda2b5 1981 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1982
1983 if (!r->res_lvbptr)
1984 return;
1985
1986 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1987 r->res_lvbseq++;
1988 rsb_clear_flag(r, RSB_VALNOTVALID);
1989}
1990
1991/* lkb is process copy (pc) */
1992
1993static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1994 struct dlm_message *ms)
1995{
1996 int b;
1997
1998 if (!lkb->lkb_lvbptr)
1999 return;
2000
2001 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2002 return;
2003
597d0cae 2004 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
2005 if (b == 1) {
2006 int len = receive_extralen(ms);
cfa805f6
BVA
2007 if (len > r->res_ls->ls_lvblen)
2008 len = r->res_ls->ls_lvblen;
e7fd4179 2009 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
00e99ccd 2010 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
e7fd4179
DT
2011 }
2012}
2013
2014/* Manipulate lkb's on rsb's convert/granted/waiting queues
2015 remove_lock -- used for unlock, removes lkb from granted
2016 revert_lock -- used for cancel, moves lkb from convert to granted
2017 grant_lock -- used for request and convert, adds lkb to granted or
2018 moves lkb from convert or waiting to granted
2019
2020 Each of these is used for master or local copy lkb's. There is
2021 also a _pc() variation used to make the corresponding change on
2022 a process copy (pc) lkb. */
2023
2024static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025{
2026 del_lkb(r, lkb);
2027 lkb->lkb_grmode = DLM_LOCK_IV;
2028 /* this unhold undoes the original ref from create_lkb()
2029 so this leads to the lkb being freed */
2030 unhold_lkb(lkb);
2031}
2032
2033static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2034{
2035 set_lvb_unlock(r, lkb);
2036 _remove_lock(r, lkb);
2037}
2038
2039static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040{
2041 _remove_lock(r, lkb);
2042}
2043
ef0c2bb0
DT
2044/* returns: 0 did nothing
2045 1 moved lock to granted
2046 -1 removed lock */
2047
2048static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 2049{
ef0c2bb0
DT
2050 int rv = 0;
2051
e7fd4179
DT
2052 lkb->lkb_rqmode = DLM_LOCK_IV;
2053
2054 switch (lkb->lkb_status) {
597d0cae
DT
2055 case DLM_LKSTS_GRANTED:
2056 break;
e7fd4179
DT
2057 case DLM_LKSTS_CONVERT:
2058 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 2059 rv = 1;
e7fd4179
DT
2060 break;
2061 case DLM_LKSTS_WAITING:
2062 del_lkb(r, lkb);
2063 lkb->lkb_grmode = DLM_LOCK_IV;
2064 /* this unhold undoes the original ref from create_lkb()
2065 so this leads to the lkb being freed */
2066 unhold_lkb(lkb);
ef0c2bb0 2067 rv = -1;
e7fd4179
DT
2068 break;
2069 default:
2070 log_print("invalid status for revert %d", lkb->lkb_status);
2071 }
ef0c2bb0 2072 return rv;
e7fd4179
DT
2073}
2074
ef0c2bb0 2075static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 2076{
ef0c2bb0 2077 return revert_lock(r, lkb);
e7fd4179
DT
2078}
2079
2080static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2081{
2082 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2083 lkb->lkb_grmode = lkb->lkb_rqmode;
2084 if (lkb->lkb_status)
2085 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2086 else
2087 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2088 }
2089
2090 lkb->lkb_rqmode = DLM_LOCK_IV;
4875647a 2091 lkb->lkb_highbast = 0;
e7fd4179
DT
2092}
2093
2094static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2095{
2096 set_lvb_lock(r, lkb);
2097 _grant_lock(r, lkb);
e7fd4179
DT
2098}
2099
2100static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2101 struct dlm_message *ms)
2102{
2103 set_lvb_lock_pc(r, lkb, ms);
2104 _grant_lock(r, lkb);
2105}
2106
2107/* called by grant_pending_locks() which means an async grant message must
2108 be sent to the requesting node in addition to granting the lock if the
2109 lkb belongs to a remote node. */
2110
2111static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112{
2113 grant_lock(r, lkb);
2114 if (is_master_copy(lkb))
2115 send_grant(r, lkb);
2116 else
2117 queue_cast(r, lkb, 0);
2118}
2119
7d3c1feb
DT
2120/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2121 change the granted/requested modes. We're munging things accordingly in
2122 the process copy.
2123 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2124 conversion deadlock
2125 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2126 compatible with other granted locks */
2127
2a7ce0ed 2128static void munge_demoted(struct dlm_lkb *lkb)
7d3c1feb 2129{
7d3c1feb
DT
2130 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2131 log_print("munge_demoted %x invalid modes gr %d rq %d",
2132 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2133 return;
2134 }
2135
2136 lkb->lkb_grmode = DLM_LOCK_NL;
2137}
2138
2139static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2140{
00e99ccd
AA
2141 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2142 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
7d3c1feb 2143 log_print("munge_altmode %x invalid reply type %d",
00e99ccd 2144 lkb->lkb_id, le32_to_cpu(ms->m_type));
7d3c1feb
DT
2145 return;
2146 }
2147
2148 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2149 lkb->lkb_rqmode = DLM_LOCK_PR;
2150 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2151 lkb->lkb_rqmode = DLM_LOCK_CW;
2152 else {
2153 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2154 dlm_print_lkb(lkb);
2155 }
2156}
2157
e7fd4179
DT
2158static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2159{
2160 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2161 lkb_statequeue);
2162 if (lkb->lkb_id == first->lkb_id)
90135925 2163 return 1;
e7fd4179 2164
90135925 2165 return 0;
e7fd4179
DT
2166}
2167
e7fd4179
DT
2168/* Check if the given lkb conflicts with another lkb on the queue. */
2169
2170static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2171{
2172 struct dlm_lkb *this;
2173
2174 list_for_each_entry(this, head, lkb_statequeue) {
2175 if (this == lkb)
2176 continue;
3bcd3687 2177 if (!modes_compat(this, lkb))
90135925 2178 return 1;
e7fd4179 2179 }
90135925 2180 return 0;
e7fd4179
DT
2181}
2182
2183/*
2184 * "A conversion deadlock arises with a pair of lock requests in the converting
2185 * queue for one resource. The granted mode of each lock blocks the requested
2186 * mode of the other lock."
2187 *
c85d65e9
DT
2188 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2189 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
2190 *
2191 * Example:
2192 * Granted Queue: empty
2193 * Convert Queue: NL->EX (first lock)
2194 * PR->EX (second lock)
2195 *
2196 * The first lock can't be granted because of the granted mode of the second
2197 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
2198 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2199 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2200 * flag set and return DEMOTED in the lksb flags.
e7fd4179 2201 *
c85d65e9
DT
2202 * Originally, this function detected conv-deadlk in a more limited scope:
2203 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2204 * - if lkb1 was the first entry in the queue (not just earlier), and was
2205 * blocked by the granted mode of lkb2, and there was nothing on the
2206 * granted queue preventing lkb1 from being granted immediately, i.e.
2207 * lkb2 was the only thing preventing lkb1 from being granted.
2208 *
2209 * That second condition meant we'd only say there was conv-deadlk if
2210 * resolving it (by demotion) would lead to the first lock on the convert
2211 * queue being granted right away. It allowed conversion deadlocks to exist
2212 * between locks on the convert queue while they couldn't be granted anyway.
2213 *
2214 * Now, we detect and take action on conversion deadlocks immediately when
2215 * they're created, even if they may not be immediately consequential. If
2216 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2217 * mode that would prevent lkb1's conversion from being granted, we do a
2218 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2219 * I think this means that the lkb_is_ahead condition below should always
2220 * be zero, i.e. there will never be conv-deadlk between two locks that are
2221 * both already on the convert queue.
e7fd4179
DT
2222 */
2223
c85d65e9 2224static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 2225{
c85d65e9
DT
2226 struct dlm_lkb *lkb1;
2227 int lkb_is_ahead = 0;
e7fd4179 2228
c85d65e9
DT
2229 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2230 if (lkb1 == lkb2) {
2231 lkb_is_ahead = 1;
e7fd4179
DT
2232 continue;
2233 }
2234
c85d65e9
DT
2235 if (!lkb_is_ahead) {
2236 if (!modes_compat(lkb2, lkb1))
2237 return 1;
2238 } else {
2239 if (!modes_compat(lkb2, lkb1) &&
2240 !modes_compat(lkb1, lkb2))
2241 return 1;
2242 }
e7fd4179 2243 }
90135925 2244 return 0;
e7fd4179
DT
2245}
2246
2247/*
2248 * Return 1 if the lock can be granted, 0 otherwise.
2249 * Also detect and resolve conversion deadlocks.
2250 *
2251 * lkb is the lock to be granted
2252 *
2253 * now is 1 if the function is being called in the context of the
2254 * immediate request, it is 0 if called later, after the lock has been
2255 * queued.
2256 *
c503a621
DT
2257 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2258 * after recovery.
2259 *
e7fd4179
DT
2260 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2261 */
2262
c503a621
DT
2263static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2264 int recover)
e7fd4179
DT
2265{
2266 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2267
2268 /*
2269 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2270 * a new request for a NL mode lock being blocked.
2271 *
2272 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2273 * request, then it would be granted. In essence, the use of this flag
2274 * tells the Lock Manager to expedite theis request by not considering
2275 * what may be in the CONVERTING or WAITING queues... As of this
2276 * writing, the EXPEDITE flag can be used only with new requests for NL
2277 * mode locks. This flag is not valid for conversion requests.
2278 *
2279 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2280 * conversion or used with a non-NL requested mode. We also know an
2281 * EXPEDITE request is always granted immediately, so now must always
2282 * be 1. The full condition to grant an expedite request: (now &&
2283 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2284 * therefore be shortened to just checking the flag.
2285 */
2286
2287 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 2288 return 1;
e7fd4179
DT
2289
2290 /*
2291 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2292 * added to the remaining conditions.
2293 */
2294
2295 if (queue_conflict(&r->res_grantqueue, lkb))
c503a621 2296 return 0;
e7fd4179
DT
2297
2298 /*
2299 * 6-3: By default, a conversion request is immediately granted if the
2300 * requested mode is compatible with the modes of all other granted
2301 * locks
2302 */
2303
2304 if (queue_conflict(&r->res_convertqueue, lkb))
c503a621
DT
2305 return 0;
2306
2307 /*
2308 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2309 * locks for a recovered rsb, on which lkb's have been rebuilt.
2310 * The lkb's may have been rebuilt on the queues in a different
2311 * order than they were in on the previous master. So, granting
2312 * queued conversions in order after recovery doesn't make sense
2313 * since the order hasn't been preserved anyway. The new order
2314 * could also have created a new "in place" conversion deadlock.
2315 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2316 * After recovery, there would be no granted locks, and possibly
2317 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2318 * recovery, grant conversions without considering order.
2319 */
2320
2321 if (conv && recover)
2322 return 1;
e7fd4179
DT
2323
2324 /*
2325 * 6-5: But the default algorithm for deciding whether to grant or
2326 * queue conversion requests does not by itself guarantee that such
2327 * requests are serviced on a "first come first serve" basis. This, in
2328 * turn, can lead to a phenomenon known as "indefinate postponement".
2329 *
2330 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2331 * the system service employed to request a lock conversion. This flag
2332 * forces certain conversion requests to be queued, even if they are
2333 * compatible with the granted modes of other locks on the same
2334 * resource. Thus, the use of this flag results in conversion requests
2335 * being ordered on a "first come first servce" basis.
2336 *
2337 * DCT: This condition is all about new conversions being able to occur
2338 * "in place" while the lock remains on the granted queue (assuming
2339 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2340 * doesn't _have_ to go onto the convert queue where it's processed in
2341 * order. The "now" variable is necessary to distinguish converts
2342 * being received and processed for the first time now, because once a
2343 * convert is moved to the conversion queue the condition below applies
2344 * requiring fifo granting.
2345 */
2346
2347 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 2348 return 1;
e7fd4179 2349
53ad1c98
DT
2350 /*
2351 * Even if the convert is compat with all granted locks,
2352 * QUECVT forces it behind other locks on the convert queue.
2353 */
2354
2355 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2356 if (list_empty(&r->res_convertqueue))
2357 return 1;
2358 else
c503a621 2359 return 0;
53ad1c98
DT
2360 }
2361
e7fd4179 2362 /*
3bcd3687
DT
2363 * The NOORDER flag is set to avoid the standard vms rules on grant
2364 * order.
e7fd4179
DT
2365 */
2366
2367 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 2368 return 1;
e7fd4179
DT
2369
2370 /*
2371 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2372 * granted until all other conversion requests ahead of it are granted
2373 * and/or canceled.
2374 */
2375
2376 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 2377 return 1;
e7fd4179
DT
2378
2379 /*
2380 * 6-4: By default, a new request is immediately granted only if all
2381 * three of the following conditions are satisfied when the request is
2382 * issued:
2383 * - The queue of ungranted conversion requests for the resource is
2384 * empty.
2385 * - The queue of ungranted new requests for the resource is empty.
2386 * - The mode of the new request is compatible with the most
2387 * restrictive mode of all granted locks on the resource.
2388 */
2389
2390 if (now && !conv && list_empty(&r->res_convertqueue) &&
2391 list_empty(&r->res_waitqueue))
90135925 2392 return 1;
e7fd4179
DT
2393
2394 /*
2395 * 6-4: Once a lock request is in the queue of ungranted new requests,
2396 * it cannot be granted until the queue of ungranted conversion
2397 * requests is empty, all ungranted new requests ahead of it are
2398 * granted and/or canceled, and it is compatible with the granted mode
2399 * of the most restrictive lock granted on the resource.
2400 */
2401
2402 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2403 first_in_list(lkb, &r->res_waitqueue))
90135925 2404 return 1;
c503a621 2405
90135925 2406 return 0;
e7fd4179
DT
2407}
2408
c85d65e9 2409static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
c503a621 2410 int recover, int *err)
e7fd4179 2411{
e7fd4179
DT
2412 int rv;
2413 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
2414 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2415
2416 if (err)
2417 *err = 0;
e7fd4179 2418
c503a621 2419 rv = _can_be_granted(r, lkb, now, recover);
e7fd4179
DT
2420 if (rv)
2421 goto out;
2422
c85d65e9
DT
2423 /*
2424 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2425 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2426 * cancels one of the locks.
2427 */
2428
2429 if (is_convert && can_be_queued(lkb) &&
2430 conversion_deadlock_detect(r, lkb)) {
2431 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2432 lkb->lkb_grmode = DLM_LOCK_NL;
2433 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
294e7e45 2434 } else if (err) {
2435 *err = -EDEADLK;
2436 } else {
2437 log_print("can_be_granted deadlock %x now %d",
2438 lkb->lkb_id, now);
2439 dlm_dump_rsb(r);
c85d65e9 2440 }
e7fd4179 2441 goto out;
c85d65e9 2442 }
e7fd4179 2443
c85d65e9
DT
2444 /*
2445 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2446 * to grant a request in a mode other than the normal rqmode. It's a
2447 * simple way to provide a big optimization to applications that can
2448 * use them.
2449 */
2450
2451 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 2452 alt = DLM_LOCK_PR;
c85d65e9 2453 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
2454 alt = DLM_LOCK_CW;
2455
2456 if (alt) {
2457 lkb->lkb_rqmode = alt;
c503a621 2458 rv = _can_be_granted(r, lkb, now, 0);
e7fd4179
DT
2459 if (rv)
2460 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2461 else
2462 lkb->lkb_rqmode = rqmode;
2463 }
2464 out:
2465 return rv;
2466}
2467
36509258
DT
2468/* Returns the highest requested mode of all blocked conversions; sets
2469 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 2470
4875647a
DT
2471static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2472 unsigned int *count)
e7fd4179
DT
2473{
2474 struct dlm_lkb *lkb, *s;
c503a621 2475 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
e7fd4179 2476 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 2477 int deadlk;
e7fd4179
DT
2478
2479 quit = 0;
2480 restart:
2481 grant_restart = 0;
2482 demote_restart = 0;
2483 hi = DLM_LOCK_IV;
2484
2485 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2486 demoted = is_demoted(lkb);
c85d65e9
DT
2487 deadlk = 0;
2488
c503a621 2489 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
e7fd4179
DT
2490 grant_lock_pending(r, lkb);
2491 grant_restart = 1;
4875647a
DT
2492 if (count)
2493 (*count)++;
c85d65e9 2494 continue;
e7fd4179 2495 }
c85d65e9
DT
2496
2497 if (!demoted && is_demoted(lkb)) {
2498 log_print("WARN: pending demoted %x node %d %s",
2499 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2500 demote_restart = 1;
2501 continue;
2502 }
2503
2504 if (deadlk) {
294e7e45 2505 /*
2506 * If DLM_LKB_NODLKWT flag is set and conversion
2507 * deadlock is detected, we request blocking AST and
2508 * down (or cancel) conversion.
2509 */
2510 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2511 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2512 queue_bast(r, lkb, lkb->lkb_rqmode);
2513 lkb->lkb_highbast = lkb->lkb_rqmode;
2514 }
2515 } else {
2516 log_print("WARN: pending deadlock %x node %d %s",
2517 lkb->lkb_id, lkb->lkb_nodeid,
2518 r->res_name);
2519 dlm_dump_rsb(r);
2520 }
c85d65e9
DT
2521 continue;
2522 }
2523
2524 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
2525
2526 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2527 *cw = 1;
e7fd4179
DT
2528 }
2529
2530 if (grant_restart)
2531 goto restart;
2532 if (demote_restart && !quit) {
2533 quit = 1;
2534 goto restart;
2535 }
2536
2537 return max_t(int, high, hi);
2538}
2539
4875647a
DT
2540static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2541 unsigned int *count)
e7fd4179
DT
2542{
2543 struct dlm_lkb *lkb, *s;
2544
2545 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c503a621 2546 if (can_be_granted(r, lkb, 0, 0, NULL)) {
e7fd4179 2547 grant_lock_pending(r, lkb);
4875647a
DT
2548 if (count)
2549 (*count)++;
2550 } else {
e7fd4179 2551 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
2552 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2553 *cw = 1;
2554 }
e7fd4179
DT
2555 }
2556
2557 return high;
2558}
2559
36509258
DT
2560/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2561 on either the convert or waiting queue.
2562 high is the largest rqmode of all locks blocked on the convert or
2563 waiting queue. */
2564
2565static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2566{
2567 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2568 if (gr->lkb_highbast < DLM_LOCK_EX)
2569 return 1;
2570 return 0;
2571 }
2572
2573 if (gr->lkb_highbast < high &&
2574 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2575 return 1;
2576 return 0;
2577}
2578
4875647a 2579static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
e7fd4179
DT
2580{
2581 struct dlm_lkb *lkb, *s;
2582 int high = DLM_LOCK_IV;
36509258 2583 int cw = 0;
e7fd4179 2584
4875647a
DT
2585 if (!is_master(r)) {
2586 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2587 dlm_dump_rsb(r);
2588 return;
2589 }
e7fd4179 2590
4875647a
DT
2591 high = grant_pending_convert(r, high, &cw, count);
2592 high = grant_pending_wait(r, high, &cw, count);
e7fd4179
DT
2593
2594 if (high == DLM_LOCK_IV)
2595 return;
2596
2597 /*
2598 * If there are locks left on the wait/convert queue then send blocking
2599 * ASTs to granted locks based on the largest requested mode (high)
36509258 2600 * found above.
e7fd4179
DT
2601 */
2602
2603 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 2604 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
2605 if (cw && high == DLM_LOCK_PR &&
2606 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
2607 queue_bast(r, lkb, DLM_LOCK_CW);
2608 else
2609 queue_bast(r, lkb, high);
e7fd4179
DT
2610 lkb->lkb_highbast = high;
2611 }
2612 }
2613}
2614
36509258
DT
2615static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2616{
2617 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2618 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2619 if (gr->lkb_highbast < DLM_LOCK_EX)
2620 return 1;
2621 return 0;
2622 }
2623
2624 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2625 return 1;
2626 return 0;
2627}
2628
e7fd4179
DT
2629static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2630 struct dlm_lkb *lkb)
2631{
2632 struct dlm_lkb *gr;
2633
2634 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
2635 /* skip self when sending basts to convertqueue */
2636 if (gr == lkb)
2637 continue;
e5dae548 2638 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
2639 queue_bast(r, gr, lkb->lkb_rqmode);
2640 gr->lkb_highbast = lkb->lkb_rqmode;
2641 }
2642 }
2643}
2644
2645static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2646{
2647 send_bast_queue(r, &r->res_grantqueue, lkb);
2648}
2649
2650static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2651{
2652 send_bast_queue(r, &r->res_grantqueue, lkb);
2653 send_bast_queue(r, &r->res_convertqueue, lkb);
2654}
2655
2656/* set_master(r, lkb) -- set the master nodeid of a resource
2657
2658 The purpose of this function is to set the nodeid field in the given
2659 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2660 known, it can just be copied to the lkb and the function will return
2661 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2662 before it can be copied to the lkb.
2663
2664 When the rsb nodeid is being looked up remotely, the initial lkb
2665 causing the lookup is kept on the ls_waiters list waiting for the
2666 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2667 on the rsb's res_lookup list until the master is verified.
2668
2669 Return values:
2670 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2671 1: the rsb master is not available and the lkb has been placed on
2672 a wait queue
2673*/
2674
2675static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2676{
c04fecb4 2677 int our_nodeid = dlm_our_nodeid();
e7fd4179
DT
2678
2679 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2680 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2681 r->res_first_lkid = lkb->lkb_id;
2682 lkb->lkb_nodeid = r->res_nodeid;
2683 return 0;
2684 }
2685
2686 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2687 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2688 return 1;
2689 }
2690
c04fecb4 2691 if (r->res_master_nodeid == our_nodeid) {
e7fd4179
DT
2692 lkb->lkb_nodeid = 0;
2693 return 0;
2694 }
2695
c04fecb4
DT
2696 if (r->res_master_nodeid) {
2697 lkb->lkb_nodeid = r->res_master_nodeid;
e7fd4179
DT
2698 return 0;
2699 }
2700
c04fecb4
DT
2701 if (dlm_dir_nodeid(r) == our_nodeid) {
2702 /* This is a somewhat unusual case; find_rsb will usually
2703 have set res_master_nodeid when dir nodeid is local, but
2704 there are cases where we become the dir node after we've
2705 past find_rsb and go through _request_lock again.
2706 confirm_master() or process_lookup_list() needs to be
2707 called after this. */
2708 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2709 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2710 r->res_name);
2711 r->res_master_nodeid = our_nodeid;
e7fd4179
DT
2712 r->res_nodeid = 0;
2713 lkb->lkb_nodeid = 0;
c04fecb4 2714 return 0;
e7fd4179 2715 }
c04fecb4 2716
05c32f47
DT
2717 wait_pending_remove(r);
2718
c04fecb4
DT
2719 r->res_first_lkid = lkb->lkb_id;
2720 send_lookup(r, lkb);
2721 return 1;
e7fd4179
DT
2722}
2723
2724static void process_lookup_list(struct dlm_rsb *r)
2725{
2726 struct dlm_lkb *lkb, *safe;
2727
2728 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2729 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2730 _request_lock(r, lkb);
2731 schedule();
2732 }
2733}
2734
2735/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2736
2737static void confirm_master(struct dlm_rsb *r, int error)
2738{
2739 struct dlm_lkb *lkb;
2740
2741 if (!r->res_first_lkid)
2742 return;
2743
2744 switch (error) {
2745 case 0:
2746 case -EINPROGRESS:
2747 r->res_first_lkid = 0;
2748 process_lookup_list(r);
2749 break;
2750
2751 case -EAGAIN:
aec64e1b
DT
2752 case -EBADR:
2753 case -ENOTBLK:
2754 /* the remote request failed and won't be retried (it was
2755 a NOQUEUE, or has been canceled/unlocked); make a waiting
2756 lkb the first_lkid */
e7fd4179
DT
2757
2758 r->res_first_lkid = 0;
2759
2760 if (!list_empty(&r->res_lookup)) {
2761 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2762 lkb_rsb_lookup);
ef0c2bb0 2763 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2764 r->res_first_lkid = lkb->lkb_id;
2765 _request_lock(r, lkb);
761b9d3f 2766 }
e7fd4179
DT
2767 break;
2768
2769 default:
2770 log_error(r->res_ls, "confirm_master unknown error %d", error);
2771 }
2772}
2773
6b0afc0c 2774#ifdef CONFIG_DLM_DEPRECATED_API
e7fd4179 2775static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2776 int namelen, unsigned long timeout_cs,
2777 void (*ast) (void *astparam),
2778 void *astparam,
2779 void (*bast) (void *astparam, int mode),
2780 struct dlm_args *args)
6b0afc0c
AA
2781#else
2782static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2783 int namelen, void (*ast)(void *astparam),
2784 void *astparam,
2785 void (*bast)(void *astparam, int mode),
2786 struct dlm_args *args)
2787#endif
e7fd4179
DT
2788{
2789 int rv = -EINVAL;
2790
2791 /* check for invalid arg usage */
2792
2793 if (mode < 0 || mode > DLM_LOCK_EX)
2794 goto out;
2795
2796 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2797 goto out;
2798
2799 if (flags & DLM_LKF_CANCEL)
2800 goto out;
2801
2802 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2803 goto out;
2804
2805 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2806 goto out;
2807
2808 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2809 goto out;
2810
2811 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2812 goto out;
2813
2814 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2815 goto out;
2816
2817 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2818 goto out;
2819
2820 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2821 goto out;
2822
2823 if (!ast || !lksb)
2824 goto out;
2825
2826 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2827 goto out;
2828
e7fd4179
DT
2829 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2830 goto out;
2831
2832 /* these args will be copied to the lkb in validate_lock_args,
2833 it cannot be done now because when converting locks, fields in
2834 an active lkb cannot be modified before locking the rsb */
2835
2836 args->flags = flags;
e5dae548
DT
2837 args->astfn = ast;
2838 args->astparam = astparam;
2839 args->bastfn = bast;
6b0afc0c 2840#ifdef CONFIG_DLM_DEPRECATED_API
d7db923e 2841 args->timeout = timeout_cs;
6b0afc0c 2842#endif
e7fd4179
DT
2843 args->mode = mode;
2844 args->lksb = lksb;
e7fd4179
DT
2845 rv = 0;
2846 out:
2847 return rv;
2848}
2849
2850static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2851{
2852 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2853 DLM_LKF_FORCEUNLOCK))
2854 return -EINVAL;
2855
ef0c2bb0
DT
2856 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2857 return -EINVAL;
2858
e7fd4179 2859 args->flags = flags;
e5dae548 2860 args->astparam = astarg;
e7fd4179
DT
2861 return 0;
2862}
2863
2864static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2865 struct dlm_args *args)
2866{
44637ca4 2867 int rv = -EBUSY;
e7fd4179
DT
2868
2869 if (args->flags & DLM_LKF_CONVERT) {
e7fd4179
DT
2870 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2871 goto out;
2872
67e4d8c5
AA
2873 /* lock not allowed if there's any op in progress */
2874 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179 2875 goto out;
ef0c2bb0
DT
2876
2877 if (is_overlap(lkb))
2878 goto out;
44637ca4
AA
2879
2880 rv = -EINVAL;
2881 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882 goto out;
2883
2884 if (args->flags & DLM_LKF_QUECVT &&
2885 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886 goto out;
e7fd4179
DT
2887 }
2888
2889 lkb->lkb_exflags = args->flags;
2890 lkb->lkb_sbflags = 0;
e5dae548 2891 lkb->lkb_astfn = args->astfn;
e7fd4179 2892 lkb->lkb_astparam = args->astparam;
e5dae548 2893 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2894 lkb->lkb_rqmode = args->mode;
2895 lkb->lkb_lksb = args->lksb;
2896 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2897 lkb->lkb_ownpid = (int) current->pid;
6b0afc0c 2898#ifdef CONFIG_DLM_DEPRECATED_API
d7db923e 2899 lkb->lkb_timeout_cs = args->timeout;
6b0afc0c 2900#endif
e7fd4179
DT
2901 rv = 0;
2902 out:
9ac8ba46
AA
2903 switch (rv) {
2904 case 0:
2905 break;
2906 case -EINVAL:
2907 /* annoy the user because dlm usage is wrong */
2908 WARN_ON(1);
2909 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2910 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2911 lkb->lkb_status, lkb->lkb_wait_type,
2912 lkb->lkb_resource->res_name);
2913 break;
2914 default:
c2d76a62 2915 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
43279e53
DT
2916 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2917 lkb->lkb_status, lkb->lkb_wait_type,
2918 lkb->lkb_resource->res_name);
9ac8ba46
AA
2919 break;
2920 }
2921
e7fd4179
DT
2922 return rv;
2923}
2924
ef0c2bb0
DT
2925/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2926 for success */
2927
2928/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2929 because there may be a lookup in progress and it's valid to do
2930 cancel/unlockf on it */
2931
e7fd4179
DT
2932static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2933{
ef0c2bb0 2934 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
420ba3cd 2935 int rv = -EBUSY;
e7fd4179 2936
420ba3cd
AA
2937 /* normal unlock not allowed if there's any op in progress */
2938 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2939 (lkb->lkb_wait_type || lkb->lkb_wait_count))
e7fd4179
DT
2940 goto out;
2941
ef0c2bb0
DT
2942 /* an lkb may be waiting for an rsb lookup to complete where the
2943 lookup was initiated by another lock */
2944
42dc1601
DT
2945 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2946 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2947 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2948 list_del_init(&lkb->lkb_rsb_lookup);
2949 queue_cast(lkb->lkb_resource, lkb,
2950 args->flags & DLM_LKF_CANCEL ?
2951 -DLM_ECANCEL : -DLM_EUNLOCK);
2952 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2953 }
42dc1601 2954 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
420ba3cd
AA
2955 goto out;
2956 }
2957
2958 rv = -EINVAL;
2959 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2960 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2961 dlm_print_lkb(lkb);
2962 goto out;
2963 }
2964
2965 /* an lkb may still exist even though the lock is EOL'ed due to a
2966 * cancel, unlock or failed noqueue request; an app can't use these
2967 * locks; return same error as if the lkid had not been found at all
2968 */
2969
2970 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2971 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2972 rv = -ENOENT;
42dc1601 2973 goto out;
ef0c2bb0
DT
2974 }
2975
2976 /* cancel not allowed with another cancel/unlock in progress */
2977
2978 if (args->flags & DLM_LKF_CANCEL) {
2979 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2980 goto out;
2981
2982 if (is_overlap(lkb))
2983 goto out;
2984
3ae1acf9
DT
2985 /* don't let scand try to do a cancel */
2986 del_timeout(lkb);
2987
ef0c2bb0
DT
2988 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2989 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2990 rv = -EBUSY;
2991 goto out;
2992 }
2993
a536e381
DT
2994 /* there's nothing to cancel */
2995 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2996 !lkb->lkb_wait_type) {
2997 rv = -EBUSY;
2998 goto out;
2999 }
3000
ef0c2bb0
DT
3001 switch (lkb->lkb_wait_type) {
3002 case DLM_MSG_LOOKUP:
3003 case DLM_MSG_REQUEST:
3004 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3005 rv = -EBUSY;
3006 goto out;
3007 case DLM_MSG_UNLOCK:
3008 case DLM_MSG_CANCEL:
3009 goto out;
3010 }
3011 /* add_to_waiters() will set OVERLAP_CANCEL */
3012 goto out_ok;
3013 }
3014
3015 /* do we need to allow a force-unlock if there's a normal unlock
3016 already in progress? in what conditions could the normal unlock
3017 fail such that we'd want to send a force-unlock to be sure? */
3018
3019 if (args->flags & DLM_LKF_FORCEUNLOCK) {
3020 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3021 goto out;
3022
3023 if (is_overlap_unlock(lkb))
3024 goto out;
e7fd4179 3025
3ae1acf9
DT
3026 /* don't let scand try to do a cancel */
3027 del_timeout(lkb);
3028
ef0c2bb0
DT
3029 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3030 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3031 rv = -EBUSY;
3032 goto out;
3033 }
3034
3035 switch (lkb->lkb_wait_type) {
3036 case DLM_MSG_LOOKUP:
3037 case DLM_MSG_REQUEST:
3038 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3039 rv = -EBUSY;
3040 goto out;
3041 case DLM_MSG_UNLOCK:
3042 goto out;
3043 }
3044 /* add_to_waiters() will set OVERLAP_UNLOCK */
ef0c2bb0
DT
3045 }
3046
e7fd4179 3047 out_ok:
ef0c2bb0
DT
3048 /* an overlapping op shouldn't blow away exflags from other op */
3049 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
3050 lkb->lkb_sbflags = 0;
3051 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
3052 rv = 0;
3053 out:
9ac8ba46
AA
3054 switch (rv) {
3055 case 0:
3056 break;
3057 case -EINVAL:
3058 /* annoy the user because dlm usage is wrong */
3059 WARN_ON(1);
3060 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3061 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3062 args->flags, lkb->lkb_wait_type,
3063 lkb->lkb_resource->res_name);
3064 break;
3065 default:
c2d76a62 3066 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
ef0c2bb0
DT
3067 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3068 args->flags, lkb->lkb_wait_type,
3069 lkb->lkb_resource->res_name);
9ac8ba46
AA
3070 break;
3071 }
3072
e7fd4179
DT
3073 return rv;
3074}
3075
3076/*
3077 * Four stage 4 varieties:
3078 * do_request(), do_convert(), do_unlock(), do_cancel()
3079 * These are called on the master node for the given lock and
3080 * from the central locking logic.
3081 */
3082
3083static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3084{
3085 int error = 0;
3086
c503a621 3087 if (can_be_granted(r, lkb, 1, 0, NULL)) {
e7fd4179
DT
3088 grant_lock(r, lkb);
3089 queue_cast(r, lkb, 0);
3090 goto out;
3091 }
3092
3093 if (can_be_queued(lkb)) {
3094 error = -EINPROGRESS;
3095 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 3096 add_timeout(lkb);
e7fd4179
DT
3097 goto out;
3098 }
3099
3100 error = -EAGAIN;
e7fd4179 3101 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
3102 out:
3103 return error;
3104}
3105
cf6620ac
DT
3106static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3107 int error)
3108{
3109 switch (error) {
3110 case -EAGAIN:
3111 if (force_blocking_asts(lkb))
3112 send_blocking_asts_all(r, lkb);
3113 break;
3114 case -EINPROGRESS:
3115 send_blocking_asts(r, lkb);
3116 break;
3117 }
3118}
3119
e7fd4179
DT
3120static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3121{
3122 int error = 0;
c85d65e9 3123 int deadlk = 0;
e7fd4179
DT
3124
3125 /* changing an existing lock may allow others to be granted */
3126
c503a621 3127 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
e7fd4179
DT
3128 grant_lock(r, lkb);
3129 queue_cast(r, lkb, 0);
e7fd4179
DT
3130 goto out;
3131 }
3132
c85d65e9
DT
3133 /* can_be_granted() detected that this lock would block in a conversion
3134 deadlock, so we leave it on the granted queue and return EDEADLK in
3135 the ast for the convert. */
3136
294e7e45 3137 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
c85d65e9 3138 /* it's left on the granted queue */
c85d65e9
DT
3139 revert_lock(r, lkb);
3140 queue_cast(r, lkb, -EDEADLK);
3141 error = -EDEADLK;
3142 goto out;
3143 }
3144
7d3c1feb
DT
3145 /* is_demoted() means the can_be_granted() above set the grmode
3146 to NL, and left us on the granted queue. This auto-demotion
3147 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3148 now grantable. We have to try to grant other converting locks
3149 before we try again to grant this one. */
3150
3151 if (is_demoted(lkb)) {
4875647a 3152 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
c503a621 3153 if (_can_be_granted(r, lkb, 1, 0)) {
7d3c1feb
DT
3154 grant_lock(r, lkb);
3155 queue_cast(r, lkb, 0);
7d3c1feb
DT
3156 goto out;
3157 }
3158 /* else fall through and move to convert queue */
3159 }
3160
3161 if (can_be_queued(lkb)) {
e7fd4179
DT
3162 error = -EINPROGRESS;
3163 del_lkb(r, lkb);
3164 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3165 add_timeout(lkb);
e7fd4179
DT
3166 goto out;
3167 }
3168
3169 error = -EAGAIN;
e7fd4179 3170 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
3171 out:
3172 return error;
3173}
3174
cf6620ac
DT
3175static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3176 int error)
3177{
3178 switch (error) {
3179 case 0:
4875647a 3180 grant_pending_locks(r, NULL);
cf6620ac
DT
3181 /* grant_pending_locks also sends basts */
3182 break;
3183 case -EAGAIN:
3184 if (force_blocking_asts(lkb))
3185 send_blocking_asts_all(r, lkb);
3186 break;
3187 case -EINPROGRESS:
3188 send_blocking_asts(r, lkb);
3189 break;
3190 }
3191}
3192
e7fd4179
DT
3193static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3194{
3195 remove_lock(r, lkb);
3196 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
3197 return -DLM_EUNLOCK;
3198}
3199
cf6620ac
DT
3200static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201 int error)
3202{
4875647a 3203 grant_pending_locks(r, NULL);
cf6620ac
DT
3204}
3205
ef0c2bb0 3206/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
c04fecb4 3207
e7fd4179
DT
3208static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3209{
ef0c2bb0
DT
3210 int error;
3211
3212 error = revert_lock(r, lkb);
3213 if (error) {
3214 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3215 return -DLM_ECANCEL;
3216 }
3217 return 0;
e7fd4179
DT
3218}
3219
cf6620ac
DT
3220static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3221 int error)
3222{
3223 if (error)
4875647a 3224 grant_pending_locks(r, NULL);
cf6620ac
DT
3225}
3226
e7fd4179
DT
3227/*
3228 * Four stage 3 varieties:
3229 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3230 */
3231
3232/* add a new lkb to a possibly new rsb, called by requesting process */
3233
3234static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235{
3236 int error;
3237
3238 /* set_master: sets lkb nodeid from r */
3239
3240 error = set_master(r, lkb);
3241 if (error < 0)
3242 goto out;
3243 if (error) {
3244 error = 0;
3245 goto out;
3246 }
3247
cf6620ac 3248 if (is_remote(r)) {
e7fd4179
DT
3249 /* receive_request() calls do_request() on remote node */
3250 error = send_request(r, lkb);
cf6620ac 3251 } else {
e7fd4179 3252 error = do_request(r, lkb);
cf6620ac
DT
3253 /* for remote locks the request_reply is sent
3254 between do_request and do_request_effects */
3255 do_request_effects(r, lkb, error);
3256 }
e7fd4179
DT
3257 out:
3258 return error;
3259}
3260
3bcd3687 3261/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
3262
3263static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3264{
3265 int error;
3266
cf6620ac 3267 if (is_remote(r)) {
e7fd4179
DT
3268 /* receive_convert() calls do_convert() on remote node */
3269 error = send_convert(r, lkb);
cf6620ac 3270 } else {
e7fd4179 3271 error = do_convert(r, lkb);
cf6620ac
DT
3272 /* for remote locks the convert_reply is sent
3273 between do_convert and do_convert_effects */
3274 do_convert_effects(r, lkb, error);
3275 }
e7fd4179
DT
3276
3277 return error;
3278}
3279
3280/* remove an existing lkb from the granted queue */
3281
3282static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3283{
3284 int error;
3285
cf6620ac 3286 if (is_remote(r)) {
e7fd4179
DT
3287 /* receive_unlock() calls do_unlock() on remote node */
3288 error = send_unlock(r, lkb);
cf6620ac 3289 } else {
e7fd4179 3290 error = do_unlock(r, lkb);
cf6620ac
DT
3291 /* for remote locks the unlock_reply is sent
3292 between do_unlock and do_unlock_effects */
3293 do_unlock_effects(r, lkb, error);
3294 }
e7fd4179
DT
3295
3296 return error;
3297}
3298
3299/* remove an existing lkb from the convert or wait queue */
3300
3301static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3302{
3303 int error;
3304
cf6620ac 3305 if (is_remote(r)) {
e7fd4179
DT
3306 /* receive_cancel() calls do_cancel() on remote node */
3307 error = send_cancel(r, lkb);
cf6620ac 3308 } else {
e7fd4179 3309 error = do_cancel(r, lkb);
cf6620ac
DT
3310 /* for remote locks the cancel_reply is sent
3311 between do_cancel and do_cancel_effects */
3312 do_cancel_effects(r, lkb, error);
3313 }
e7fd4179
DT
3314
3315 return error;
3316}
3317
3318/*
3319 * Four stage 2 varieties:
3320 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3321 */
3322
3323static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3324 int len, struct dlm_args *args)
3325{
3326 struct dlm_rsb *r;
3327 int error;
3328
3329 error = validate_lock_args(ls, lkb, args);
3330 if (error)
c04fecb4 3331 return error;
e7fd4179 3332
c04fecb4 3333 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
e7fd4179 3334 if (error)
c04fecb4 3335 return error;
e7fd4179
DT
3336
3337 lock_rsb(r);
3338
3339 attach_lkb(r, lkb);
3340 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3341
3342 error = _request_lock(r, lkb);
3343
3344 unlock_rsb(r);
3345 put_rsb(r);
e7fd4179
DT
3346 return error;
3347}
3348
3349static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3350 struct dlm_args *args)
3351{
3352 struct dlm_rsb *r;
3353 int error;
3354
3355 r = lkb->lkb_resource;
3356
3357 hold_rsb(r);
3358 lock_rsb(r);
3359
3360 error = validate_lock_args(ls, lkb, args);
3361 if (error)
3362 goto out;
3363
3364 error = _convert_lock(r, lkb);
3365 out:
3366 unlock_rsb(r);
3367 put_rsb(r);
3368 return error;
3369}
3370
3371static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3372 struct dlm_args *args)
3373{
3374 struct dlm_rsb *r;
3375 int error;
3376
3377 r = lkb->lkb_resource;
3378
3379 hold_rsb(r);
3380 lock_rsb(r);
3381
3382 error = validate_unlock_args(lkb, args);
3383 if (error)
3384 goto out;
3385
3386 error = _unlock_lock(r, lkb);
3387 out:
3388 unlock_rsb(r);
3389 put_rsb(r);
3390 return error;
3391}
3392
3393static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3394 struct dlm_args *args)
3395{
3396 struct dlm_rsb *r;
3397 int error;
3398
3399 r = lkb->lkb_resource;
3400
3401 hold_rsb(r);
3402 lock_rsb(r);
3403
3404 error = validate_unlock_args(lkb, args);
3405 if (error)
3406 goto out;
3407
3408 error = _cancel_lock(r, lkb);
3409 out:
3410 unlock_rsb(r);
3411 put_rsb(r);
3412 return error;
3413}
3414
3415/*
3416 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3417 */
3418
3419int dlm_lock(dlm_lockspace_t *lockspace,
3420 int mode,
3421 struct dlm_lksb *lksb,
3422 uint32_t flags,
3423 void *name,
3424 unsigned int namelen,
3425 uint32_t parent_lkid,
3426 void (*ast) (void *astarg),
3427 void *astarg,
3bcd3687 3428 void (*bast) (void *astarg, int mode))
e7fd4179
DT
3429{
3430 struct dlm_ls *ls;
3431 struct dlm_lkb *lkb;
3432 struct dlm_args args;
3433 int error, convert = flags & DLM_LKF_CONVERT;
3434
3435 ls = dlm_find_lockspace_local(lockspace);
3436 if (!ls)
3437 return -EINVAL;
3438
85e86edf 3439 dlm_lock_recovery(ls);
e7fd4179
DT
3440
3441 if (convert)
3442 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3443 else
3444 error = create_lkb(ls, &lkb);
3445
3446 if (error)
3447 goto out;
3448
5d92a30e 3449 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
f1d3b8f9 3450
6b0afc0c 3451#ifdef CONFIG_DLM_DEPRECATED_API
d7db923e 3452 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 3453 astarg, bast, &args);
6b0afc0c
AA
3454#else
3455 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3456 &args);
3457#endif
e7fd4179
DT
3458 if (error)
3459 goto out_put;
3460
3461 if (convert)
3462 error = convert_lock(ls, lkb, &args);
3463 else
3464 error = request_lock(ls, lkb, name, namelen, &args);
3465
3466 if (error == -EINPROGRESS)
3467 error = 0;
3468 out_put:
5d92a30e 3469 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error);
f1d3b8f9 3470
e7fd4179 3471 if (convert || error)
b3f58d8f 3472 __put_lkb(ls, lkb);
c85d65e9 3473 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
3474 error = 0;
3475 out:
85e86edf 3476 dlm_unlock_recovery(ls);
e7fd4179
DT
3477 dlm_put_lockspace(ls);
3478 return error;
3479}
3480
3481int dlm_unlock(dlm_lockspace_t *lockspace,
3482 uint32_t lkid,
3483 uint32_t flags,
3484 struct dlm_lksb *lksb,
3485 void *astarg)
3486{
3487 struct dlm_ls *ls;
3488 struct dlm_lkb *lkb;
3489 struct dlm_args args;
3490 int error;
3491
3492 ls = dlm_find_lockspace_local(lockspace);
3493 if (!ls)
3494 return -EINVAL;
3495
85e86edf 3496 dlm_lock_recovery(ls);
e7fd4179
DT
3497
3498 error = find_lkb(ls, lkid, &lkb);
3499 if (error)
3500 goto out;
3501
f1d3b8f9
AA
3502 trace_dlm_unlock_start(ls, lkb, flags);
3503
e7fd4179
DT
3504 error = set_unlock_args(flags, astarg, &args);
3505 if (error)
3506 goto out_put;
3507
3508 if (flags & DLM_LKF_CANCEL)
3509 error = cancel_lock(ls, lkb, &args);
3510 else
3511 error = unlock_lock(ls, lkb, &args);
3512
3513 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3514 error = 0;
ef0c2bb0
DT
3515 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3516 error = 0;
e7fd4179 3517 out_put:
f1d3b8f9
AA
3518 trace_dlm_unlock_end(ls, lkb, flags, error);
3519
b3f58d8f 3520 dlm_put_lkb(lkb);
e7fd4179 3521 out:
85e86edf 3522 dlm_unlock_recovery(ls);
e7fd4179
DT
3523 dlm_put_lockspace(ls);
3524 return error;
3525}
3526
3527/*
3528 * send/receive routines for remote operations and replies
3529 *
3530 * send_args
3531 * send_common
3532 * send_request receive_request
3533 * send_convert receive_convert
3534 * send_unlock receive_unlock
3535 * send_cancel receive_cancel
3536 * send_grant receive_grant
3537 * send_bast receive_bast
3538 * send_lookup receive_lookup
3539 * send_remove receive_remove
3540 *
3541 * send_common_reply
3542 * receive_request_reply send_request_reply
3543 * receive_convert_reply send_convert_reply
3544 * receive_unlock_reply send_unlock_reply
3545 * receive_cancel_reply send_cancel_reply
3546 * receive_lookup_reply send_lookup_reply
3547 */
3548
7e4dac33
DT
3549static int _create_message(struct dlm_ls *ls, int mb_len,
3550 int to_nodeid, int mstype,
3551 struct dlm_message **ms_ret,
3552 struct dlm_mhandle **mh_ret)
e7fd4179
DT
3553{
3554 struct dlm_message *ms;
3555 struct dlm_mhandle *mh;
3556 char *mb;
e7fd4179
DT
3557
3558 /* get_buffer gives us a message handle (mh) that we need to
a070a91c 3559 pass into midcomms_commit and a message buffer (mb) that we
e7fd4179
DT
3560 write our data into */
3561
a070a91c 3562 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
3563 if (!mh)
3564 return -ENOBUFS;
3565
e7fd4179
DT
3566 ms = (struct dlm_message *) mb;
3567
3428785a
AA
3568 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3569 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3570 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3571 ms->m_header.h_length = cpu_to_le16(mb_len);
e7fd4179
DT
3572 ms->m_header.h_cmd = DLM_MSG;
3573
00e99ccd 3574 ms->m_type = cpu_to_le32(mstype);
e7fd4179
DT
3575
3576 *mh_ret = mh;
3577 *ms_ret = ms;
3578 return 0;
3579}
3580
7e4dac33
DT
3581static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3582 int to_nodeid, int mstype,
3583 struct dlm_message **ms_ret,
3584 struct dlm_mhandle **mh_ret)
3585{
3586 int mb_len = sizeof(struct dlm_message);
3587
3588 switch (mstype) {
3589 case DLM_MSG_REQUEST:
3590 case DLM_MSG_LOOKUP:
3591 case DLM_MSG_REMOVE:
3592 mb_len += r->res_length;
3593 break;
3594 case DLM_MSG_CONVERT:
3595 case DLM_MSG_UNLOCK:
3596 case DLM_MSG_REQUEST_REPLY:
3597 case DLM_MSG_CONVERT_REPLY:
3598 case DLM_MSG_GRANT:
3599 if (lkb && lkb->lkb_lvbptr)
3600 mb_len += r->res_ls->ls_lvblen;
3601 break;
3602 }
3603
3604 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3605 ms_ret, mh_ret);
3606}
3607
e7fd4179
DT
3608/* further lowcomms enhancements or alternate implementations may make
3609 the return value from this function useful at some point */
3610
3611static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3612{
a070a91c 3613 dlm_midcomms_commit_mhandle(mh);
e7fd4179
DT
3614 return 0;
3615}
3616
3617static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3618 struct dlm_message *ms)
3619{
00e99ccd
AA
3620 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3621 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3622 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3623 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3624 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3625 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags);
3626 ms->m_flags = cpu_to_le32(lkb->lkb_flags);
3627 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3628 ms->m_status = cpu_to_le32(lkb->lkb_status);
3629 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3630 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3631 ms->m_hash = cpu_to_le32(r->res_hash);
e7fd4179
DT
3632
3633 /* m_result and m_bastmode are set from function args,
3634 not from lkb fields */
3635
e5dae548 3636 if (lkb->lkb_bastfn)
00e99ccd 3637 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
e5dae548 3638 if (lkb->lkb_astfn)
00e99ccd 3639 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
e7fd4179 3640
da49f36f
DT
3641 /* compare with switch in create_message; send_remove() doesn't
3642 use send_args() */
e7fd4179 3643
da49f36f 3644 switch (ms->m_type) {
00e99ccd
AA
3645 case cpu_to_le32(DLM_MSG_REQUEST):
3646 case cpu_to_le32(DLM_MSG_LOOKUP):
da49f36f
DT
3647 memcpy(ms->m_extra, r->res_name, r->res_length);
3648 break;
00e99ccd
AA
3649 case cpu_to_le32(DLM_MSG_CONVERT):
3650 case cpu_to_le32(DLM_MSG_UNLOCK):
3651 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3652 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3653 case cpu_to_le32(DLM_MSG_GRANT):
7175e131 3654 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
da49f36f 3655 break;
e7fd4179 3656 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
3657 break;
3658 }
e7fd4179
DT
3659}
3660
3661static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3662{
3663 struct dlm_message *ms;
3664 struct dlm_mhandle *mh;
3665 int to_nodeid, error;
3666
c6ff669b
DT
3667 to_nodeid = r->res_nodeid;
3668
3669 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
3670 if (error)
3671 return error;
e7fd4179 3672
e7fd4179
DT
3673 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3674 if (error)
3675 goto fail;
3676
3677 send_args(r, lkb, ms);
3678
3679 error = send_message(mh, ms);
3680 if (error)
3681 goto fail;
3682 return 0;
3683
3684 fail:
ef0c2bb0 3685 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
3686 return error;
3687}
3688
3689static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3690{
3691 return send_common(r, lkb, DLM_MSG_REQUEST);
3692}
3693
3694static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3695{
3696 int error;
3697
3698 error = send_common(r, lkb, DLM_MSG_CONVERT);
3699
3700 /* down conversions go without a reply from the master */
3701 if (!error && down_conversion(lkb)) {
ef0c2bb0 3702 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
00e99ccd
AA
3703 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
3704 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
e7fd4179
DT
3705 r->res_ls->ls_stub_ms.m_result = 0;
3706 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3707 }
3708
3709 return error;
3710}
3711
3712/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3713 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3714 that the master is still correct. */
3715
3716static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3717{
3718 return send_common(r, lkb, DLM_MSG_UNLOCK);
3719}
3720
3721static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3722{
3723 return send_common(r, lkb, DLM_MSG_CANCEL);
3724}
3725
3726static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3727{
3728 struct dlm_message *ms;
3729 struct dlm_mhandle *mh;
3730 int to_nodeid, error;
3731
3732 to_nodeid = lkb->lkb_nodeid;
3733
3734 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3735 if (error)
3736 goto out;
3737
3738 send_args(r, lkb, ms);
3739
3740 ms->m_result = 0;
3741
3742 error = send_message(mh, ms);
3743 out:
3744 return error;
3745}
3746
3747static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3748{
3749 struct dlm_message *ms;
3750 struct dlm_mhandle *mh;
3751 int to_nodeid, error;
3752
3753 to_nodeid = lkb->lkb_nodeid;
3754
3755 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3756 if (error)
3757 goto out;
3758
3759 send_args(r, lkb, ms);
3760
00e99ccd 3761 ms->m_bastmode = cpu_to_le32(mode);
e7fd4179
DT
3762
3763 error = send_message(mh, ms);
3764 out:
3765 return error;
3766}
3767
3768static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3769{
3770 struct dlm_message *ms;
3771 struct dlm_mhandle *mh;
3772 int to_nodeid, error;
3773
c6ff669b
DT
3774 to_nodeid = dlm_dir_nodeid(r);
3775
3776 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3777 if (error)
3778 return error;
e7fd4179 3779
e7fd4179
DT
3780 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3781 if (error)
3782 goto fail;
3783
3784 send_args(r, lkb, ms);
3785
3786 error = send_message(mh, ms);
3787 if (error)
3788 goto fail;
3789 return 0;
3790
3791 fail:
ef0c2bb0 3792 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3793 return error;
3794}
3795
3796static int send_remove(struct dlm_rsb *r)
3797{
3798 struct dlm_message *ms;
3799 struct dlm_mhandle *mh;
3800 int to_nodeid, error;
3801
3802 to_nodeid = dlm_dir_nodeid(r);
3803
3804 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3805 if (error)
3806 goto out;
3807
3808 memcpy(ms->m_extra, r->res_name, r->res_length);
00e99ccd 3809 ms->m_hash = cpu_to_le32(r->res_hash);
e7fd4179
DT
3810
3811 error = send_message(mh, ms);
3812 out:
3813 return error;
3814}
3815
3816static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3817 int mstype, int rv)
3818{
3819 struct dlm_message *ms;
3820 struct dlm_mhandle *mh;
3821 int to_nodeid, error;
3822
3823 to_nodeid = lkb->lkb_nodeid;
3824
3825 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3826 if (error)
3827 goto out;
3828
3829 send_args(r, lkb, ms);
3830
00e99ccd 3831 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
e7fd4179
DT
3832
3833 error = send_message(mh, ms);
3834 out:
3835 return error;
3836}
3837
3838static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3839{
3840 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3841}
3842
3843static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3844{
3845 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3846}
3847
3848static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3849{
3850 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3851}
3852
3853static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3854{
3855 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3856}
3857
3858static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3859 int ret_nodeid, int rv)
3860{
3861 struct dlm_rsb *r = &ls->ls_stub_rsb;
3862 struct dlm_message *ms;
3863 struct dlm_mhandle *mh;
3428785a 3864 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
e7fd4179
DT
3865
3866 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3867 if (error)
3868 goto out;
3869
3870 ms->m_lkid = ms_in->m_lkid;
00e99ccd
AA
3871 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3872 ms->m_nodeid = cpu_to_le32(ret_nodeid);
e7fd4179
DT
3873
3874 error = send_message(mh, ms);
3875 out:
3876 return error;
3877}
3878
3879/* which args we save from a received message depends heavily on the type
3880 of message, unlike the send side where we can safely send everything about
3881 the lkb for any type of message */
3882
3883static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3884{
00e99ccd
AA
3885 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3886 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
e7fd4179 3887 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
00e99ccd 3888 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
e7fd4179
DT
3889}
3890
3891static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3892{
00e99ccd 3893 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
2a7ce0ed
DT
3894 return;
3895
00e99ccd 3896 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
e7fd4179 3897 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
00e99ccd 3898 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
e7fd4179
DT
3899}
3900
3901static int receive_extralen(struct dlm_message *ms)
3902{
3428785a
AA
3903 return (le16_to_cpu(ms->m_header.h_length) -
3904 sizeof(struct dlm_message));
e7fd4179
DT
3905}
3906
e7fd4179
DT
3907static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3908 struct dlm_message *ms)
3909{
3910 int len;
3911
3912 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3913 if (!lkb->lkb_lvbptr)
52bda2b5 3914 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3915 if (!lkb->lkb_lvbptr)
3916 return -ENOMEM;
3917 len = receive_extralen(ms);
cfa805f6
BVA
3918 if (len > ls->ls_lvblen)
3919 len = ls->ls_lvblen;
e7fd4179
DT
3920 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3921 }
3922 return 0;
3923}
3924
e5dae548
DT
3925static void fake_bastfn(void *astparam, int mode)
3926{
3927 log_print("fake_bastfn should not be called");
3928}
3929
3930static void fake_astfn(void *astparam)
3931{
3932 log_print("fake_astfn should not be called");
3933}
3934
e7fd4179
DT
3935static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3936 struct dlm_message *ms)
3937{
3428785a 3938 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
00e99ccd
AA
3939 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3940 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
e7fd4179 3941 lkb->lkb_grmode = DLM_LOCK_IV;
00e99ccd 3942 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
e5dae548 3943
00e99ccd
AA
3944 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3945 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
e7fd4179 3946
8d07fd50
DT
3947 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3948 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3949 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3950 if (!lkb->lkb_lvbptr)
3951 return -ENOMEM;
3952 }
e7fd4179
DT
3953
3954 return 0;
3955}
3956
3957static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3958 struct dlm_message *ms)
3959{
e7fd4179
DT
3960 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3961 return -EBUSY;
3962
e7fd4179
DT
3963 if (receive_lvb(ls, lkb, ms))
3964 return -ENOMEM;
3965
00e99ccd
AA
3966 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3967 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
e7fd4179
DT
3968
3969 return 0;
3970}
3971
3972static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3973 struct dlm_message *ms)
3974{
e7fd4179
DT
3975 if (receive_lvb(ls, lkb, ms))
3976 return -ENOMEM;
3977 return 0;
3978}
3979
3980/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3981 uses to send a reply and that the remote end uses to process the reply. */
3982
3983static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3984{
3985 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3428785a 3986 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
00e99ccd 3987 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
e7fd4179
DT
3988}
3989
c54e04b0
DT
3990/* This is called after the rsb is locked so that we can safely inspect
3991 fields in the lkb. */
3992
3993static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3994{
3428785a 3995 int from = le32_to_cpu(ms->m_header.h_nodeid);
c54e04b0
DT
3996 int error = 0;
3997
6c2e3bf6 3998 /* currently mixing of user/kernel locks are not supported */
00e99ccd
AA
3999 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
4000 ~lkb->lkb_flags & DLM_IFL_USER) {
6c2e3bf6
AA
4001 log_error(lkb->lkb_resource->res_ls,
4002 "got user dlm message for a kernel lock");
4003 error = -EINVAL;
4004 goto out;
4005 }
4006
c54e04b0 4007 switch (ms->m_type) {
00e99ccd
AA
4008 case cpu_to_le32(DLM_MSG_CONVERT):
4009 case cpu_to_le32(DLM_MSG_UNLOCK):
4010 case cpu_to_le32(DLM_MSG_CANCEL):
c54e04b0
DT
4011 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
4012 error = -EINVAL;
4013 break;
4014
00e99ccd
AA
4015 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4016 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4017 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4018 case cpu_to_le32(DLM_MSG_GRANT):
4019 case cpu_to_le32(DLM_MSG_BAST):
c54e04b0
DT
4020 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4021 error = -EINVAL;
4022 break;
4023
00e99ccd 4024 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
c54e04b0
DT
4025 if (!is_process_copy(lkb))
4026 error = -EINVAL;
4027 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4028 error = -EINVAL;
4029 break;
4030
4031 default:
4032 error = -EINVAL;
4033 }
4034
6c2e3bf6 4035out:
c54e04b0
DT
4036 if (error)
4037 log_error(lkb->lkb_resource->res_ls,
4038 "ignore invalid message %d from %d %x %x %x %d",
00e99ccd
AA
4039 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
4040 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
c54e04b0
DT
4041 return error;
4042}
4043
96006ea6
DT
4044static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4045{
4046 char name[DLM_RESNAME_MAXLEN + 1];
4047 struct dlm_message *ms;
4048 struct dlm_mhandle *mh;
4049 struct dlm_rsb *r;
4050 uint32_t hash, b;
4051 int rv, dir_nodeid;
4052
4053 memset(name, 0, sizeof(name));
4054 memcpy(name, ms_name, len);
4055
4056 hash = jhash(name, len, 0);
4057 b = hash & (ls->ls_rsbtbl_size - 1);
4058
4059 dir_nodeid = dlm_hash2nodeid(ls, hash);
4060
4061 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4062
4063 spin_lock(&ls->ls_rsbtbl[b].lock);
4064 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4065 if (!rv) {
4066 spin_unlock(&ls->ls_rsbtbl[b].lock);
4067 log_error(ls, "repeat_remove on keep %s", name);
4068 return;
4069 }
4070
4071 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4072 if (!rv) {
4073 spin_unlock(&ls->ls_rsbtbl[b].lock);
4074 log_error(ls, "repeat_remove on toss %s", name);
4075 return;
4076 }
4077
4078 /* use ls->remove_name2 to avoid conflict with shrink? */
4079
4080 spin_lock(&ls->ls_remove_spin);
4081 ls->ls_remove_len = len;
4082 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4083 spin_unlock(&ls->ls_remove_spin);
4084 spin_unlock(&ls->ls_rsbtbl[b].lock);
4085
4086 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4087 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4088 if (rv)
ba589959 4089 goto out;
96006ea6
DT
4090
4091 memcpy(ms->m_extra, name, len);
00e99ccd 4092 ms->m_hash = cpu_to_le32(hash);
96006ea6
DT
4093
4094 send_message(mh, ms);
4095
ba589959 4096out:
96006ea6
DT
4097 spin_lock(&ls->ls_remove_spin);
4098 ls->ls_remove_len = 0;
4099 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4100 spin_unlock(&ls->ls_remove_spin);
f6f74183 4101 wake_up(&ls->ls_remove_wait);
96006ea6
DT
4102}
4103
6d40c4a7 4104static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4105{
4106 struct dlm_lkb *lkb;
4107 struct dlm_rsb *r;
c04fecb4 4108 int from_nodeid;
96006ea6 4109 int error, namelen = 0;
e7fd4179 4110
3428785a 4111 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
c04fecb4 4112
e7fd4179
DT
4113 error = create_lkb(ls, &lkb);
4114 if (error)
4115 goto fail;
4116
4117 receive_flags(lkb, ms);
4118 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4119 error = receive_request_args(ls, lkb, ms);
4120 if (error) {
b3f58d8f 4121 __put_lkb(ls, lkb);
e7fd4179
DT
4122 goto fail;
4123 }
4124
c04fecb4
DT
4125 /* The dir node is the authority on whether we are the master
4126 for this rsb or not, so if the master sends us a request, we should
4127 recreate the rsb if we've destroyed it. This race happens when we
4128 send a remove message to the dir node at the same time that the dir
4129 node sends us a request for the rsb. */
4130
e7fd4179
DT
4131 namelen = receive_extralen(ms);
4132
c04fecb4
DT
4133 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4134 R_RECEIVE_REQUEST, &r);
e7fd4179 4135 if (error) {
b3f58d8f 4136 __put_lkb(ls, lkb);
e7fd4179
DT
4137 goto fail;
4138 }
4139
4140 lock_rsb(r);
4141
c04fecb4
DT
4142 if (r->res_master_nodeid != dlm_our_nodeid()) {
4143 error = validate_master_nodeid(ls, r, from_nodeid);
4144 if (error) {
4145 unlock_rsb(r);
4146 put_rsb(r);
4147 __put_lkb(ls, lkb);
4148 goto fail;
4149 }
4150 }
4151
e7fd4179
DT
4152 attach_lkb(r, lkb);
4153 error = do_request(r, lkb);
4154 send_request_reply(r, lkb, error);
cf6620ac 4155 do_request_effects(r, lkb, error);
e7fd4179
DT
4156
4157 unlock_rsb(r);
4158 put_rsb(r);
4159
4160 if (error == -EINPROGRESS)
4161 error = 0;
4162 if (error)
b3f58d8f 4163 dlm_put_lkb(lkb);
6d40c4a7 4164 return 0;
e7fd4179
DT
4165
4166 fail:
c04fecb4
DT
4167 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4168 and do this receive_request again from process_lookup_list once
4169 we get the lookup reply. This would avoid a many repeated
4170 ENOTBLK request failures when the lookup reply designating us
4171 as master is delayed. */
4172
4173 /* We could repeatedly return -EBADR here if our send_remove() is
4174 delayed in being sent/arriving/being processed on the dir node.
4175 Another node would repeatedly lookup up the master, and the dir
4176 node would continue returning our nodeid until our send_remove
96006ea6
DT
4177 took effect.
4178
4179 We send another remove message in case our previous send_remove
4180 was lost/ignored/missed somehow. */
c04fecb4
DT
4181
4182 if (error != -ENOTBLK) {
4183 log_limit(ls, "receive_request %x from %d %d",
00e99ccd 4184 le32_to_cpu(ms->m_lkid), from_nodeid, error);
c04fecb4
DT
4185 }
4186
96006ea6
DT
4187 if (namelen && error == -EBADR) {
4188 send_repeat_remove(ls, ms->m_extra, namelen);
4189 msleep(1000);
4190 }
4191
e7fd4179
DT
4192 setup_stub_lkb(ls, ms);
4193 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 4194 return error;
e7fd4179
DT
4195}
4196
6d40c4a7 4197static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4198{
4199 struct dlm_lkb *lkb;
4200 struct dlm_rsb *r;
90135925 4201 int error, reply = 1;
e7fd4179 4202
00e99ccd 4203 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
e7fd4179
DT
4204 if (error)
4205 goto fail;
4206
00e99ccd 4207 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4875647a
DT
4208 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4209 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4210 (unsigned long long)lkb->lkb_recover_seq,
00e99ccd
AA
4211 le32_to_cpu(ms->m_header.h_nodeid),
4212 le32_to_cpu(ms->m_lkid));
6d40c4a7 4213 error = -ENOENT;
c0174726 4214 dlm_put_lkb(lkb);
6d40c4a7
DT
4215 goto fail;
4216 }
4217
e7fd4179
DT
4218 r = lkb->lkb_resource;
4219
4220 hold_rsb(r);
4221 lock_rsb(r);
4222
c54e04b0
DT
4223 error = validate_message(lkb, ms);
4224 if (error)
4225 goto out;
4226
e7fd4179 4227 receive_flags(lkb, ms);
cf6620ac 4228
e7fd4179 4229 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
4230 if (error) {
4231 send_convert_reply(r, lkb, error);
4232 goto out;
4233 }
4234
e7fd4179
DT
4235 reply = !down_conversion(lkb);
4236
4237 error = do_convert(r, lkb);
e7fd4179
DT
4238 if (reply)
4239 send_convert_reply(r, lkb, error);
cf6620ac 4240 do_convert_effects(r, lkb, error);
c54e04b0 4241 out:
e7fd4179
DT
4242 unlock_rsb(r);
4243 put_rsb(r);
b3f58d8f 4244 dlm_put_lkb(lkb);
6d40c4a7 4245 return 0;
e7fd4179
DT
4246
4247 fail:
4248 setup_stub_lkb(ls, ms);
4249 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 4250 return error;
e7fd4179
DT
4251}
4252
6d40c4a7 4253static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4254{
4255 struct dlm_lkb *lkb;
4256 struct dlm_rsb *r;
4257 int error;
4258
00e99ccd 4259 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
e7fd4179
DT
4260 if (error)
4261 goto fail;
4262
00e99ccd 4263 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
6d40c4a7
DT
4264 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4265 lkb->lkb_id, lkb->lkb_remid,
00e99ccd
AA
4266 le32_to_cpu(ms->m_header.h_nodeid),
4267 le32_to_cpu(ms->m_lkid));
6d40c4a7 4268 error = -ENOENT;
c0174726 4269 dlm_put_lkb(lkb);
6d40c4a7
DT
4270 goto fail;
4271 }
4272
e7fd4179
DT
4273 r = lkb->lkb_resource;
4274
4275 hold_rsb(r);
4276 lock_rsb(r);
4277
c54e04b0
DT
4278 error = validate_message(lkb, ms);
4279 if (error)
4280 goto out;
4281
e7fd4179 4282 receive_flags(lkb, ms);
cf6620ac 4283
e7fd4179 4284 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
4285 if (error) {
4286 send_unlock_reply(r, lkb, error);
4287 goto out;
4288 }
e7fd4179
DT
4289
4290 error = do_unlock(r, lkb);
e7fd4179 4291 send_unlock_reply(r, lkb, error);
cf6620ac 4292 do_unlock_effects(r, lkb, error);
c54e04b0 4293 out:
e7fd4179
DT
4294 unlock_rsb(r);
4295 put_rsb(r);
b3f58d8f 4296 dlm_put_lkb(lkb);
6d40c4a7 4297 return 0;
e7fd4179
DT
4298
4299 fail:
4300 setup_stub_lkb(ls, ms);
4301 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 4302 return error;
e7fd4179
DT
4303}
4304
6d40c4a7 4305static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4306{
4307 struct dlm_lkb *lkb;
4308 struct dlm_rsb *r;
4309 int error;
4310
00e99ccd 4311 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
e7fd4179
DT
4312 if (error)
4313 goto fail;
4314
4315 receive_flags(lkb, ms);
4316
4317 r = lkb->lkb_resource;
4318
4319 hold_rsb(r);
4320 lock_rsb(r);
4321
c54e04b0
DT
4322 error = validate_message(lkb, ms);
4323 if (error)
4324 goto out;
4325
e7fd4179
DT
4326 error = do_cancel(r, lkb);
4327 send_cancel_reply(r, lkb, error);
cf6620ac 4328 do_cancel_effects(r, lkb, error);
c54e04b0 4329 out:
e7fd4179
DT
4330 unlock_rsb(r);
4331 put_rsb(r);
b3f58d8f 4332 dlm_put_lkb(lkb);
6d40c4a7 4333 return 0;
e7fd4179
DT
4334
4335 fail:
4336 setup_stub_lkb(ls, ms);
4337 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
6d40c4a7 4338 return error;
e7fd4179
DT
4339}
4340
6d40c4a7 4341static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4342{
4343 struct dlm_lkb *lkb;
4344 struct dlm_rsb *r;
4345 int error;
4346
00e99ccd 4347 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4348 if (error)
4349 return error;
e7fd4179
DT
4350
4351 r = lkb->lkb_resource;
4352
4353 hold_rsb(r);
4354 lock_rsb(r);
4355
c54e04b0
DT
4356 error = validate_message(lkb, ms);
4357 if (error)
4358 goto out;
4359
e7fd4179 4360 receive_flags_reply(lkb, ms);
7d3c1feb
DT
4361 if (is_altmode(lkb))
4362 munge_altmode(lkb, ms);
e7fd4179
DT
4363 grant_lock_pc(r, lkb, ms);
4364 queue_cast(r, lkb, 0);
c54e04b0 4365 out:
e7fd4179
DT
4366 unlock_rsb(r);
4367 put_rsb(r);
b3f58d8f 4368 dlm_put_lkb(lkb);
6d40c4a7 4369 return 0;
e7fd4179
DT
4370}
4371
6d40c4a7 4372static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4373{
4374 struct dlm_lkb *lkb;
4375 struct dlm_rsb *r;
4376 int error;
4377
00e99ccd 4378 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4379 if (error)
4380 return error;
e7fd4179
DT
4381
4382 r = lkb->lkb_resource;
4383
4384 hold_rsb(r);
4385 lock_rsb(r);
4386
c54e04b0
DT
4387 error = validate_message(lkb, ms);
4388 if (error)
4389 goto out;
e7fd4179 4390
00e99ccd
AA
4391 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4392 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
c54e04b0 4393 out:
e7fd4179
DT
4394 unlock_rsb(r);
4395 put_rsb(r);
b3f58d8f 4396 dlm_put_lkb(lkb);
6d40c4a7 4397 return 0;
e7fd4179
DT
4398}
4399
4400static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4401{
c04fecb4 4402 int len, error, ret_nodeid, from_nodeid, our_nodeid;
e7fd4179 4403
3428785a 4404 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
e7fd4179
DT
4405 our_nodeid = dlm_our_nodeid();
4406
4407 len = receive_extralen(ms);
4408
c04fecb4
DT
4409 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4410 &ret_nodeid, NULL);
e7fd4179
DT
4411
4412 /* Optimization: we're master so treat lookup as a request */
4413 if (!error && ret_nodeid == our_nodeid) {
4414 receive_request(ls, ms);
4415 return;
4416 }
e7fd4179
DT
4417 send_lookup_reply(ls, ms, ret_nodeid, error);
4418}
4419
4420static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4421{
c04fecb4
DT
4422 char name[DLM_RESNAME_MAXLEN+1];
4423 struct dlm_rsb *r;
4424 uint32_t hash, b;
4425 int rv, len, dir_nodeid, from_nodeid;
e7fd4179 4426
3428785a 4427 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
e7fd4179
DT
4428
4429 len = receive_extralen(ms);
4430
c04fecb4
DT
4431 if (len > DLM_RESNAME_MAXLEN) {
4432 log_error(ls, "receive_remove from %d bad len %d",
4433 from_nodeid, len);
4434 return;
4435 }
4436
00e99ccd 4437 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
e7fd4179 4438 if (dir_nodeid != dlm_our_nodeid()) {
c04fecb4
DT
4439 log_error(ls, "receive_remove from %d bad nodeid %d",
4440 from_nodeid, dir_nodeid);
e7fd4179
DT
4441 return;
4442 }
4443
c04fecb4
DT
4444 /* Look for name on rsbtbl.toss, if it's there, kill it.
4445 If it's on rsbtbl.keep, it's being used, and we should ignore this
4446 message. This is an expected race between the dir node sending a
4447 request to the master node at the same time as the master node sends
4448 a remove to the dir node. The resolution to that race is for the
4449 dir node to ignore the remove message, and the master node to
4450 recreate the master rsb when it gets a request from the dir node for
4451 an rsb it doesn't have. */
4452
4453 memset(name, 0, sizeof(name));
4454 memcpy(name, ms->m_extra, len);
4455
4456 hash = jhash(name, len, 0);
4457 b = hash & (ls->ls_rsbtbl_size - 1);
4458
4459 spin_lock(&ls->ls_rsbtbl[b].lock);
4460
4461 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4462 if (rv) {
4463 /* verify the rsb is on keep list per comment above */
4464 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4465 if (rv) {
4466 /* should not happen */
4467 log_error(ls, "receive_remove from %d not found %s",
4468 from_nodeid, name);
4469 spin_unlock(&ls->ls_rsbtbl[b].lock);
4470 return;
4471 }
4472 if (r->res_master_nodeid != from_nodeid) {
4473 /* should not happen */
4474 log_error(ls, "receive_remove keep from %d master %d",
4475 from_nodeid, r->res_master_nodeid);
4476 dlm_print_rsb(r);
4477 spin_unlock(&ls->ls_rsbtbl[b].lock);
4478 return;
4479 }
4480
4481 log_debug(ls, "receive_remove from %d master %d first %x %s",
4482 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4483 name);
4484 spin_unlock(&ls->ls_rsbtbl[b].lock);
4485 return;
4486 }
4487
4488 if (r->res_master_nodeid != from_nodeid) {
4489 log_error(ls, "receive_remove toss from %d master %d",
4490 from_nodeid, r->res_master_nodeid);
4491 dlm_print_rsb(r);
4492 spin_unlock(&ls->ls_rsbtbl[b].lock);
4493 return;
4494 }
4495
4496 if (kref_put(&r->res_ref, kill_rsb)) {
4497 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4498 spin_unlock(&ls->ls_rsbtbl[b].lock);
4499 dlm_free_rsb(r);
4500 } else {
4501 log_error(ls, "receive_remove from %d rsb ref error",
4502 from_nodeid);
4503 dlm_print_rsb(r);
4504 spin_unlock(&ls->ls_rsbtbl[b].lock);
4505 }
e7fd4179
DT
4506}
4507
8499137d
DT
4508static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4509{
00e99ccd 4510 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
8499137d
DT
4511}
4512
6d40c4a7 4513static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4514{
4515 struct dlm_lkb *lkb;
4516 struct dlm_rsb *r;
ef0c2bb0 4517 int error, mstype, result;
3428785a 4518 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
e7fd4179 4519
00e99ccd 4520 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4521 if (error)
4522 return error;
e7fd4179 4523
e7fd4179
DT
4524 r = lkb->lkb_resource;
4525 hold_rsb(r);
4526 lock_rsb(r);
4527
c54e04b0
DT
4528 error = validate_message(lkb, ms);
4529 if (error)
4530 goto out;
4531
ef0c2bb0
DT
4532 mstype = lkb->lkb_wait_type;
4533 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4875647a
DT
4534 if (error) {
4535 log_error(ls, "receive_request_reply %x remote %d %x result %d",
00e99ccd
AA
4536 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4537 from_dlm_errno(le32_to_cpu(ms->m_result)));
4875647a 4538 dlm_dump_rsb(r);
ef0c2bb0 4539 goto out;
4875647a 4540 }
ef0c2bb0 4541
e7fd4179
DT
4542 /* Optimization: the dir node was also the master, so it took our
4543 lookup as a request and sent request reply instead of lookup reply */
4544 if (mstype == DLM_MSG_LOOKUP) {
c04fecb4
DT
4545 r->res_master_nodeid = from_nodeid;
4546 r->res_nodeid = from_nodeid;
4547 lkb->lkb_nodeid = from_nodeid;
e7fd4179
DT
4548 }
4549
ef0c2bb0 4550 /* this is the value returned from do_request() on the master */
00e99ccd 4551 result = from_dlm_errno(le32_to_cpu(ms->m_result));
ef0c2bb0
DT
4552
4553 switch (result) {
e7fd4179 4554 case -EAGAIN:
ef0c2bb0 4555 /* request would block (be queued) on remote master */
e7fd4179
DT
4556 queue_cast(r, lkb, -EAGAIN);
4557 confirm_master(r, -EAGAIN);
ef0c2bb0 4558 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
4559 break;
4560
4561 case -EINPROGRESS:
4562 case 0:
4563 /* request was queued or granted on remote master */
4564 receive_flags_reply(lkb, ms);
00e99ccd 4565 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
7d3c1feb
DT
4566 if (is_altmode(lkb))
4567 munge_altmode(lkb, ms);
3ae1acf9 4568 if (result) {
e7fd4179 4569 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
4570 add_timeout(lkb);
4571 } else {
e7fd4179
DT
4572 grant_lock_pc(r, lkb, ms);
4573 queue_cast(r, lkb, 0);
4574 }
ef0c2bb0 4575 confirm_master(r, result);
e7fd4179
DT
4576 break;
4577
597d0cae 4578 case -EBADR:
e7fd4179
DT
4579 case -ENOTBLK:
4580 /* find_rsb failed to find rsb or rsb wasn't master */
c04fecb4
DT
4581 log_limit(ls, "receive_request_reply %x from %d %d "
4582 "master %d dir %d first %x %s", lkb->lkb_id,
4583 from_nodeid, result, r->res_master_nodeid,
4584 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4585
4586 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4587 r->res_master_nodeid != dlm_our_nodeid()) {
4588 /* cause _request_lock->set_master->send_lookup */
4589 r->res_master_nodeid = 0;
4590 r->res_nodeid = -1;
4591 lkb->lkb_nodeid = -1;
4592 }
ef0c2bb0
DT
4593
4594 if (is_overlap(lkb)) {
4595 /* we'll ignore error in cancel/unlock reply */
4596 queue_cast_overlap(r, lkb);
aec64e1b 4597 confirm_master(r, result);
ef0c2bb0 4598 unhold_lkb(lkb); /* undoes create_lkb() */
c04fecb4 4599 } else {
ef0c2bb0 4600 _request_lock(r, lkb);
c04fecb4
DT
4601
4602 if (r->res_master_nodeid == dlm_our_nodeid())
4603 confirm_master(r, 0);
4604 }
e7fd4179
DT
4605 break;
4606
4607 default:
ef0c2bb0
DT
4608 log_error(ls, "receive_request_reply %x error %d",
4609 lkb->lkb_id, result);
e7fd4179
DT
4610 }
4611
ef0c2bb0
DT
4612 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4613 log_debug(ls, "receive_request_reply %x result %d unlock",
4614 lkb->lkb_id, result);
4615 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4616 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4617 send_unlock(r, lkb);
4618 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4619 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4620 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4621 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4622 send_cancel(r, lkb);
4623 } else {
4624 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4625 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4626 }
4627 out:
e7fd4179
DT
4628 unlock_rsb(r);
4629 put_rsb(r);
b3f58d8f 4630 dlm_put_lkb(lkb);
6d40c4a7 4631 return 0;
e7fd4179
DT
4632}
4633
4634static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4635 struct dlm_message *ms)
4636{
e7fd4179 4637 /* this is the value returned from do_convert() on the master */
00e99ccd 4638 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
e7fd4179
DT
4639 case -EAGAIN:
4640 /* convert would block (be queued) on remote master */
4641 queue_cast(r, lkb, -EAGAIN);
4642 break;
4643
c85d65e9
DT
4644 case -EDEADLK:
4645 receive_flags_reply(lkb, ms);
4646 revert_lock_pc(r, lkb);
4647 queue_cast(r, lkb, -EDEADLK);
4648 break;
4649
e7fd4179
DT
4650 case -EINPROGRESS:
4651 /* convert was queued on remote master */
7d3c1feb
DT
4652 receive_flags_reply(lkb, ms);
4653 if (is_demoted(lkb))
2a7ce0ed 4654 munge_demoted(lkb);
e7fd4179
DT
4655 del_lkb(r, lkb);
4656 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 4657 add_timeout(lkb);
e7fd4179
DT
4658 break;
4659
4660 case 0:
4661 /* convert was granted on remote master */
4662 receive_flags_reply(lkb, ms);
7d3c1feb 4663 if (is_demoted(lkb))
2a7ce0ed 4664 munge_demoted(lkb);
e7fd4179
DT
4665 grant_lock_pc(r, lkb, ms);
4666 queue_cast(r, lkb, 0);
4667 break;
4668
4669 default:
6d40c4a7 4670 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3428785a 4671 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
00e99ccd
AA
4672 le32_to_cpu(ms->m_lkid),
4673 from_dlm_errno(le32_to_cpu(ms->m_result)));
6d40c4a7
DT
4674 dlm_print_rsb(r);
4675 dlm_print_lkb(lkb);
e7fd4179
DT
4676 }
4677}
4678
4679static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4680{
4681 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 4682 int error;
e7fd4179
DT
4683
4684 hold_rsb(r);
4685 lock_rsb(r);
4686
c54e04b0
DT
4687 error = validate_message(lkb, ms);
4688 if (error)
4689 goto out;
4690
ef0c2bb0
DT
4691 /* stub reply can happen with waiters_mutex held */
4692 error = remove_from_waiters_ms(lkb, ms);
4693 if (error)
4694 goto out;
e7fd4179 4695
ef0c2bb0
DT
4696 __receive_convert_reply(r, lkb, ms);
4697 out:
e7fd4179
DT
4698 unlock_rsb(r);
4699 put_rsb(r);
4700}
4701
6d40c4a7 4702static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4703{
4704 struct dlm_lkb *lkb;
4705 int error;
4706
00e99ccd 4707 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4708 if (error)
4709 return error;
e7fd4179 4710
e7fd4179 4711 _receive_convert_reply(lkb, ms);
b3f58d8f 4712 dlm_put_lkb(lkb);
6d40c4a7 4713 return 0;
e7fd4179
DT
4714}
4715
4716static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4717{
4718 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 4719 int error;
e7fd4179
DT
4720
4721 hold_rsb(r);
4722 lock_rsb(r);
4723
c54e04b0
DT
4724 error = validate_message(lkb, ms);
4725 if (error)
4726 goto out;
4727
ef0c2bb0
DT
4728 /* stub reply can happen with waiters_mutex held */
4729 error = remove_from_waiters_ms(lkb, ms);
4730 if (error)
4731 goto out;
4732
e7fd4179
DT
4733 /* this is the value returned from do_unlock() on the master */
4734
00e99ccd 4735 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
e7fd4179
DT
4736 case -DLM_EUNLOCK:
4737 receive_flags_reply(lkb, ms);
4738 remove_lock_pc(r, lkb);
4739 queue_cast(r, lkb, -DLM_EUNLOCK);
4740 break;
ef0c2bb0
DT
4741 case -ENOENT:
4742 break;
e7fd4179 4743 default:
ef0c2bb0 4744 log_error(r->res_ls, "receive_unlock_reply %x error %d",
00e99ccd 4745 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
e7fd4179 4746 }
ef0c2bb0 4747 out:
e7fd4179
DT
4748 unlock_rsb(r);
4749 put_rsb(r);
4750}
4751
6d40c4a7 4752static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4753{
4754 struct dlm_lkb *lkb;
4755 int error;
4756
00e99ccd 4757 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4758 if (error)
4759 return error;
e7fd4179 4760
e7fd4179 4761 _receive_unlock_reply(lkb, ms);
b3f58d8f 4762 dlm_put_lkb(lkb);
6d40c4a7 4763 return 0;
e7fd4179
DT
4764}
4765
4766static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4767{
4768 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 4769 int error;
e7fd4179
DT
4770
4771 hold_rsb(r);
4772 lock_rsb(r);
4773
c54e04b0
DT
4774 error = validate_message(lkb, ms);
4775 if (error)
4776 goto out;
4777
ef0c2bb0
DT
4778 /* stub reply can happen with waiters_mutex held */
4779 error = remove_from_waiters_ms(lkb, ms);
4780 if (error)
4781 goto out;
4782
e7fd4179
DT
4783 /* this is the value returned from do_cancel() on the master */
4784
00e99ccd 4785 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
e7fd4179
DT
4786 case -DLM_ECANCEL:
4787 receive_flags_reply(lkb, ms);
4788 revert_lock_pc(r, lkb);
84d8cd69 4789 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
4790 break;
4791 case 0:
e7fd4179
DT
4792 break;
4793 default:
ef0c2bb0 4794 log_error(r->res_ls, "receive_cancel_reply %x error %d",
00e99ccd
AA
4795 lkb->lkb_id,
4796 from_dlm_errno(le32_to_cpu(ms->m_result)));
e7fd4179 4797 }
ef0c2bb0 4798 out:
e7fd4179
DT
4799 unlock_rsb(r);
4800 put_rsb(r);
4801}
4802
6d40c4a7 4803static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179
DT
4804{
4805 struct dlm_lkb *lkb;
4806 int error;
4807
00e99ccd 4808 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
6d40c4a7
DT
4809 if (error)
4810 return error;
e7fd4179 4811
e7fd4179 4812 _receive_cancel_reply(lkb, ms);
b3f58d8f 4813 dlm_put_lkb(lkb);
6d40c4a7 4814 return 0;
e7fd4179
DT
4815}
4816
4817static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4818{
4819 struct dlm_lkb *lkb;
4820 struct dlm_rsb *r;
4821 int error, ret_nodeid;
c04fecb4 4822 int do_lookup_list = 0;
e7fd4179 4823
00e99ccd 4824 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
e7fd4179 4825 if (error) {
00e99ccd
AA
4826 log_error(ls, "%s no lkid %x", __func__,
4827 le32_to_cpu(ms->m_lkid));
e7fd4179
DT
4828 return;
4829 }
4830
c04fecb4 4831 /* ms->m_result is the value returned by dlm_master_lookup on dir node
e7fd4179 4832 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
4833
4834 r = lkb->lkb_resource;
4835 hold_rsb(r);
4836 lock_rsb(r);
4837
ef0c2bb0
DT
4838 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4839 if (error)
4840 goto out;
4841
00e99ccd 4842 ret_nodeid = le32_to_cpu(ms->m_nodeid);
c04fecb4
DT
4843
4844 /* We sometimes receive a request from the dir node for this
4845 rsb before we've received the dir node's loookup_reply for it.
4846 The request from the dir node implies we're the master, so we set
4847 ourself as master in receive_request_reply, and verify here that
4848 we are indeed the master. */
4849
4850 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4851 /* This should never happen */
4852 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4853 "master %d dir %d our %d first %x %s",
3428785a
AA
4854 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4855 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
c04fecb4
DT
4856 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4857 }
4858
e7fd4179 4859 if (ret_nodeid == dlm_our_nodeid()) {
c04fecb4 4860 r->res_master_nodeid = ret_nodeid;
e7fd4179 4861 r->res_nodeid = 0;
c04fecb4 4862 do_lookup_list = 1;
e7fd4179 4863 r->res_first_lkid = 0;
c04fecb4
DT
4864 } else if (ret_nodeid == -1) {
4865 /* the remote node doesn't believe it's the dir node */
4866 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
3428785a 4867 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
c04fecb4
DT
4868 r->res_master_nodeid = 0;
4869 r->res_nodeid = -1;
4870 lkb->lkb_nodeid = -1;
e7fd4179 4871 } else {
c04fecb4
DT
4872 /* set_master() will set lkb_nodeid from r */
4873 r->res_master_nodeid = ret_nodeid;
e7fd4179
DT
4874 r->res_nodeid = ret_nodeid;
4875 }
4876
ef0c2bb0
DT
4877 if (is_overlap(lkb)) {
4878 log_debug(ls, "receive_lookup_reply %x unlock %x",
4879 lkb->lkb_id, lkb->lkb_flags);
4880 queue_cast_overlap(r, lkb);
4881 unhold_lkb(lkb); /* undoes create_lkb() */
4882 goto out_list;
4883 }
4884
e7fd4179
DT
4885 _request_lock(r, lkb);
4886
ef0c2bb0 4887 out_list:
c04fecb4 4888 if (do_lookup_list)
e7fd4179 4889 process_lookup_list(r);
ef0c2bb0 4890 out:
e7fd4179
DT
4891 unlock_rsb(r);
4892 put_rsb(r);
b3f58d8f 4893 dlm_put_lkb(lkb);
e7fd4179
DT
4894}
4895
6d40c4a7
DT
4896static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4897 uint32_t saved_seq)
e7fd4179 4898{
6d40c4a7
DT
4899 int error = 0, noent = 0;
4900
3428785a 4901 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
c04fecb4 4902 log_limit(ls, "receive %d from non-member %d %x %x %d",
00e99ccd
AA
4903 le32_to_cpu(ms->m_type),
4904 le32_to_cpu(ms->m_header.h_nodeid),
4905 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4906 from_dlm_errno(le32_to_cpu(ms->m_result)));
46b43eed
DT
4907 return;
4908 }
4909
e7fd4179
DT
4910 switch (ms->m_type) {
4911
4912 /* messages sent to a master node */
4913
00e99ccd 4914 case cpu_to_le32(DLM_MSG_REQUEST):
6d40c4a7 4915 error = receive_request(ls, ms);
e7fd4179
DT
4916 break;
4917
00e99ccd 4918 case cpu_to_le32(DLM_MSG_CONVERT):
6d40c4a7 4919 error = receive_convert(ls, ms);
e7fd4179
DT
4920 break;
4921
00e99ccd 4922 case cpu_to_le32(DLM_MSG_UNLOCK):
6d40c4a7 4923 error = receive_unlock(ls, ms);
e7fd4179
DT
4924 break;
4925
00e99ccd 4926 case cpu_to_le32(DLM_MSG_CANCEL):
6d40c4a7
DT
4927 noent = 1;
4928 error = receive_cancel(ls, ms);
e7fd4179
DT
4929 break;
4930
4931 /* messages sent from a master node (replies to above) */
4932
00e99ccd 4933 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
6d40c4a7 4934 error = receive_request_reply(ls, ms);
e7fd4179
DT
4935 break;
4936
00e99ccd 4937 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
6d40c4a7 4938 error = receive_convert_reply(ls, ms);
e7fd4179
DT
4939 break;
4940
00e99ccd 4941 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
6d40c4a7 4942 error = receive_unlock_reply(ls, ms);
e7fd4179
DT
4943 break;
4944
00e99ccd 4945 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
6d40c4a7 4946 error = receive_cancel_reply(ls, ms);
e7fd4179
DT
4947 break;
4948
4949 /* messages sent from a master node (only two types of async msg) */
4950
00e99ccd 4951 case cpu_to_le32(DLM_MSG_GRANT):
6d40c4a7
DT
4952 noent = 1;
4953 error = receive_grant(ls, ms);
e7fd4179
DT
4954 break;
4955
00e99ccd 4956 case cpu_to_le32(DLM_MSG_BAST):
6d40c4a7
DT
4957 noent = 1;
4958 error = receive_bast(ls, ms);
e7fd4179
DT
4959 break;
4960
4961 /* messages sent to a dir node */
4962
00e99ccd 4963 case cpu_to_le32(DLM_MSG_LOOKUP):
e7fd4179
DT
4964 receive_lookup(ls, ms);
4965 break;
4966
00e99ccd 4967 case cpu_to_le32(DLM_MSG_REMOVE):
e7fd4179
DT
4968 receive_remove(ls, ms);
4969 break;
4970
4971 /* messages sent from a dir node (remove has no reply) */
4972
00e99ccd 4973 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
e7fd4179
DT
4974 receive_lookup_reply(ls, ms);
4975 break;
4976
8499137d
DT
4977 /* other messages */
4978
00e99ccd 4979 case cpu_to_le32(DLM_MSG_PURGE):
8499137d
DT
4980 receive_purge(ls, ms);
4981 break;
4982
e7fd4179 4983 default:
00e99ccd
AA
4984 log_error(ls, "unknown message type %d",
4985 le32_to_cpu(ms->m_type));
e7fd4179 4986 }
6d40c4a7
DT
4987
4988 /*
4989 * When checking for ENOENT, we're checking the result of
4990 * find_lkb(m_remid):
4991 *
4992 * The lock id referenced in the message wasn't found. This may
4993 * happen in normal usage for the async messages and cancel, so
4994 * only use log_debug for them.
4995 *
4875647a 4996 * Some errors are expected and normal.
6d40c4a7
DT
4997 */
4998
4999 if (error == -ENOENT && noent) {
4875647a 5000 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
00e99ccd 5001 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
3428785a 5002 le32_to_cpu(ms->m_header.h_nodeid),
00e99ccd 5003 le32_to_cpu(ms->m_lkid), saved_seq);
6d40c4a7 5004 } else if (error == -ENOENT) {
4875647a 5005 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
00e99ccd 5006 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
3428785a 5007 le32_to_cpu(ms->m_header.h_nodeid),
00e99ccd 5008 le32_to_cpu(ms->m_lkid), saved_seq);
6d40c4a7 5009
00e99ccd
AA
5010 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
5011 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
6d40c4a7 5012 }
4875647a
DT
5013
5014 if (error == -EINVAL) {
5015 log_error(ls, "receive %d inval from %d lkid %x remid %x "
5016 "saved_seq %u",
00e99ccd
AA
5017 le32_to_cpu(ms->m_type),
5018 le32_to_cpu(ms->m_header.h_nodeid),
5019 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
5020 saved_seq);
4875647a 5021 }
e7fd4179
DT
5022}
5023
c36258b5
DT
5024/* If the lockspace is in recovery mode (locking stopped), then normal
5025 messages are saved on the requestqueue for processing after recovery is
5026 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
5027 messages off the requestqueue before we process new ones. This occurs right
5028 after recovery completes when we transition from saving all messages on
5029 requestqueue, to processing all the saved messages, to processing new
5030 messages as they arrive. */
e7fd4179 5031
c36258b5
DT
5032static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5033 int nodeid)
5034{
5035 if (dlm_locking_stopped(ls)) {
c04fecb4
DT
5036 /* If we were a member of this lockspace, left, and rejoined,
5037 other nodes may still be sending us messages from the
5038 lockspace generation before we left. */
5039 if (!ls->ls_generation) {
5040 log_limit(ls, "receive %d from %d ignore old gen",
00e99ccd 5041 le32_to_cpu(ms->m_type), nodeid);
c04fecb4
DT
5042 return;
5043 }
5044
8b0d8e03 5045 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
5046 } else {
5047 dlm_wait_requestqueue(ls);
6d40c4a7 5048 _receive_message(ls, ms, 0);
c36258b5
DT
5049 }
5050}
5051
5052/* This is called by dlm_recoverd to process messages that were saved on
5053 the requestqueue. */
5054
6d40c4a7
DT
5055void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5056 uint32_t saved_seq)
c36258b5 5057{
6d40c4a7 5058 _receive_message(ls, ms, saved_seq);
c36258b5
DT
5059}
5060
5061/* This is called by the midcomms layer when something is received for
5062 the lockspace. It could be either a MSG (normal message sent as part of
5063 standard locking activity) or an RCOM (recovery message sent as part of
5064 lockspace recovery). */
5065
eef7d739 5066void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 5067{
eef7d739 5068 struct dlm_header *hd = &p->header;
c36258b5
DT
5069 struct dlm_ls *ls;
5070 int type = 0;
5071
5072 switch (hd->h_cmd) {
5073 case DLM_MSG:
00e99ccd 5074 type = le32_to_cpu(p->message.m_type);
c36258b5
DT
5075 break;
5076 case DLM_RCOM:
2f9dbeda 5077 type = le32_to_cpu(p->rcom.rc_type);
c36258b5
DT
5078 break;
5079 default:
5080 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5081 return;
5082 }
5083
3428785a 5084 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
c36258b5 5085 log_print("invalid h_nodeid %d from %d lockspace %x",
3428785a
AA
5086 le32_to_cpu(hd->h_nodeid), nodeid,
5087 le32_to_cpu(hd->u.h_lockspace));
c36258b5
DT
5088 return;
5089 }
5090
3428785a 5091 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
c36258b5 5092 if (!ls) {
4875647a
DT
5093 if (dlm_config.ci_log_debug) {
5094 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5095 "%u from %d cmd %d type %d\n",
3428785a
AA
5096 le32_to_cpu(hd->u.h_lockspace), nodeid,
5097 hd->h_cmd, type);
4875647a 5098 }
c36258b5
DT
5099
5100 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 5101 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
5102 return;
5103 }
5104
5105 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5106 be inactive (in this ls) before transitioning to recovery mode */
5107
5108 down_read(&ls->ls_recv_active);
5109 if (hd->h_cmd == DLM_MSG)
eef7d739 5110 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 5111 else
eef7d739 5112 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
5113 up_read(&ls->ls_recv_active);
5114
5115 dlm_put_lockspace(ls);
5116}
e7fd4179 5117
2a7ce0ed
DT
5118static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5119 struct dlm_message *ms_stub)
e7fd4179
DT
5120{
5121 if (middle_conversion(lkb)) {
5122 hold_lkb(lkb);
2a7ce0ed 5123 memset(ms_stub, 0, sizeof(struct dlm_message));
00e99ccd
AA
5124 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5125 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5126 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
3428785a 5127 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
2a7ce0ed 5128 _receive_convert_reply(lkb, ms_stub);
e7fd4179
DT
5129
5130 /* Same special case as in receive_rcom_lock_args() */
5131 lkb->lkb_grmode = DLM_LOCK_IV;
5132 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5133 unhold_lkb(lkb);
5134
5135 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5136 lkb->lkb_flags |= DLM_IFL_RESEND;
5137 }
5138
5139 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5140 conversions are async; there's no reply from the remote master */
5141}
5142
5143/* A waiting lkb needs recovery if the master node has failed, or
5144 the master node is changing (only when no directory is used) */
5145
13ef1111
DT
5146static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5147 int dir_nodeid)
e7fd4179 5148{
4875647a 5149 if (dlm_no_directory(ls))
13ef1111
DT
5150 return 1;
5151
4875647a 5152 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
e7fd4179
DT
5153 return 1;
5154
5155 return 0;
5156}
5157
5158/* Recovery for locks that are waiting for replies from nodes that are now
5159 gone. We can just complete unlocks and cancels by faking a reply from the
5160 dead node. Requests and up-conversions we flag to be resent after
5161 recovery. Down-conversions can just be completed with a fake reply like
5162 unlocks. Conversions between PR and CW need special attention. */
5163
5164void dlm_recover_waiters_pre(struct dlm_ls *ls)
5165{
5166 struct dlm_lkb *lkb, *safe;
2a7ce0ed 5167 struct dlm_message *ms_stub;
601342ce 5168 int wait_type, stub_unlock_result, stub_cancel_result;
13ef1111 5169 int dir_nodeid;
e7fd4179 5170
102e67d4 5171 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
0d37eca7 5172 if (!ms_stub)
2a7ce0ed 5173 return;
2a7ce0ed 5174
90135925 5175 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
5176
5177 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
2a7ce0ed 5178
13ef1111
DT
5179 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5180
2a7ce0ed
DT
5181 /* exclude debug messages about unlocks because there can be so
5182 many and they aren't very interesting */
5183
5184 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
13ef1111
DT
5185 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5186 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5187 lkb->lkb_id,
5188 lkb->lkb_remid,
5189 lkb->lkb_wait_type,
5190 lkb->lkb_resource->res_nodeid,
5191 lkb->lkb_nodeid,
5192 lkb->lkb_wait_nodeid,
5193 dir_nodeid);
2a7ce0ed 5194 }
e7fd4179
DT
5195
5196 /* all outstanding lookups, regardless of destination will be
5197 resent after recovery is done */
5198
5199 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5200 lkb->lkb_flags |= DLM_IFL_RESEND;
5201 continue;
5202 }
5203
13ef1111 5204 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
e7fd4179
DT
5205 continue;
5206
601342ce
DT
5207 wait_type = lkb->lkb_wait_type;
5208 stub_unlock_result = -DLM_EUNLOCK;
5209 stub_cancel_result = -DLM_ECANCEL;
5210
5211 /* Main reply may have been received leaving a zero wait_type,
5212 but a reply for the overlapping op may not have been
5213 received. In that case we need to fake the appropriate
5214 reply for the overlap op. */
5215
5216 if (!wait_type) {
5217 if (is_overlap_cancel(lkb)) {
5218 wait_type = DLM_MSG_CANCEL;
5219 if (lkb->lkb_grmode == DLM_LOCK_IV)
5220 stub_cancel_result = 0;
5221 }
5222 if (is_overlap_unlock(lkb)) {
5223 wait_type = DLM_MSG_UNLOCK;
5224 if (lkb->lkb_grmode == DLM_LOCK_IV)
5225 stub_unlock_result = -ENOENT;
5226 }
5227
5228 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5229 lkb->lkb_id, lkb->lkb_flags, wait_type,
5230 stub_cancel_result, stub_unlock_result);
5231 }
5232
5233 switch (wait_type) {
e7fd4179
DT
5234
5235 case DLM_MSG_REQUEST:
5236 lkb->lkb_flags |= DLM_IFL_RESEND;
5237 break;
5238
5239 case DLM_MSG_CONVERT:
2a7ce0ed 5240 recover_convert_waiter(ls, lkb, ms_stub);
e7fd4179
DT
5241 break;
5242
5243 case DLM_MSG_UNLOCK:
5244 hold_lkb(lkb);
2a7ce0ed 5245 memset(ms_stub, 0, sizeof(struct dlm_message));
00e99ccd
AA
5246 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5247 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5248 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
3428785a 5249 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
2a7ce0ed 5250 _receive_unlock_reply(lkb, ms_stub);
b3f58d8f 5251 dlm_put_lkb(lkb);
e7fd4179
DT
5252 break;
5253
5254 case DLM_MSG_CANCEL:
5255 hold_lkb(lkb);
2a7ce0ed 5256 memset(ms_stub, 0, sizeof(struct dlm_message));
00e99ccd
AA
5257 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5258 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5259 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
3428785a 5260 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
2a7ce0ed 5261 _receive_cancel_reply(lkb, ms_stub);
b3f58d8f 5262 dlm_put_lkb(lkb);
e7fd4179
DT
5263 break;
5264
5265 default:
601342ce
DT
5266 log_error(ls, "invalid lkb wait_type %d %d",
5267 lkb->lkb_wait_type, wait_type);
e7fd4179 5268 }
81456807 5269 schedule();
e7fd4179 5270 }
90135925 5271 mutex_unlock(&ls->ls_waiters_mutex);
2a7ce0ed 5272 kfree(ms_stub);
e7fd4179
DT
5273}
5274
ef0c2bb0 5275static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179 5276{
dc1acd5c 5277 struct dlm_lkb *lkb = NULL, *iter;
e7fd4179 5278
90135925 5279 mutex_lock(&ls->ls_waiters_mutex);
dc1acd5c
JK
5280 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5281 if (iter->lkb_flags & DLM_IFL_RESEND) {
5282 hold_lkb(iter);
5283 lkb = iter;
e7fd4179
DT
5284 break;
5285 }
5286 }
90135925 5287 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 5288
ef0c2bb0 5289 return lkb;
e7fd4179
DT
5290}
5291
5292/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5293 master or dir-node for r. Processing the lkb may result in it being placed
5294 back on waiters. */
5295
ef0c2bb0
DT
5296/* We do this after normal locking has been enabled and any saved messages
5297 (in requestqueue) have been processed. We should be confident that at
5298 this point we won't get or process a reply to any of these waiting
5299 operations. But, new ops may be coming in on the rsbs/locks here from
5300 userspace or remotely. */
5301
5302/* there may have been an overlap unlock/cancel prior to recovery or after
5303 recovery. if before, the lkb may still have a pos wait_count; if after, the
5304 overlap flag would just have been set and nothing new sent. we can be
5305 confident here than any replies to either the initial op or overlap ops
5306 prior to recovery have been received. */
5307
e7fd4179
DT
5308int dlm_recover_waiters_post(struct dlm_ls *ls)
5309{
5310 struct dlm_lkb *lkb;
5311 struct dlm_rsb *r;
ef0c2bb0 5312 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
5313
5314 while (1) {
5315 if (dlm_locking_stopped(ls)) {
5316 log_debug(ls, "recover_waiters_post aborted");
5317 error = -EINTR;
5318 break;
5319 }
5320
ef0c2bb0
DT
5321 lkb = find_resend_waiter(ls);
5322 if (!lkb)
e7fd4179
DT
5323 break;
5324
5325 r = lkb->lkb_resource;
ef0c2bb0
DT
5326 hold_rsb(r);
5327 lock_rsb(r);
5328
5329 mstype = lkb->lkb_wait_type;
5330 oc = is_overlap_cancel(lkb);
5331 ou = is_overlap_unlock(lkb);
5332 err = 0;
e7fd4179 5333
13ef1111
DT
5334 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5335 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5336 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5337 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5338 dlm_dir_nodeid(r), oc, ou);
e7fd4179 5339
ef0c2bb0
DT
5340 /* At this point we assume that we won't get a reply to any
5341 previous op or overlap op on this lock. First, do a big
5342 remove_from_waiters() for all previous ops. */
5343
5344 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5345 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5346 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5347 lkb->lkb_wait_type = 0;
1689c169
AA
5348 /* drop all wait_count references we still
5349 * hold a reference for this iteration.
5350 */
5351 while (lkb->lkb_wait_count) {
5352 lkb->lkb_wait_count--;
5353 unhold_lkb(lkb);
5354 }
ef0c2bb0
DT
5355 mutex_lock(&ls->ls_waiters_mutex);
5356 list_del_init(&lkb->lkb_wait_reply);
5357 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
5358
5359 if (oc || ou) {
5360 /* do an unlock or cancel instead of resending */
5361 switch (mstype) {
5362 case DLM_MSG_LOOKUP:
5363 case DLM_MSG_REQUEST:
5364 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5365 -DLM_ECANCEL);
5366 unhold_lkb(lkb); /* undoes create_lkb() */
5367 break;
5368 case DLM_MSG_CONVERT:
5369 if (oc) {
5370 queue_cast(r, lkb, -DLM_ECANCEL);
5371 } else {
5372 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5373 _unlock_lock(r, lkb);
5374 }
5375 break;
5376 default:
5377 err = 1;
5378 }
5379 } else {
5380 switch (mstype) {
5381 case DLM_MSG_LOOKUP:
5382 case DLM_MSG_REQUEST:
5383 _request_lock(r, lkb);
5384 if (is_master(r))
5385 confirm_master(r, 0);
5386 break;
5387 case DLM_MSG_CONVERT:
5388 _convert_lock(r, lkb);
5389 break;
5390 default:
5391 err = 1;
5392 }
e7fd4179 5393 }
ef0c2bb0 5394
13ef1111
DT
5395 if (err) {
5396 log_error(ls, "waiter %x msg %d r_nodeid %d "
5397 "dir_nodeid %d overlap %d %d",
5398 lkb->lkb_id, mstype, r->res_nodeid,
5399 dlm_dir_nodeid(r), oc, ou);
5400 }
ef0c2bb0
DT
5401 unlock_rsb(r);
5402 put_rsb(r);
5403 dlm_put_lkb(lkb);
e7fd4179
DT
5404 }
5405
5406 return error;
5407}
5408
4875647a
DT
5409static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5410 struct list_head *list)
e7fd4179 5411{
e7fd4179
DT
5412 struct dlm_lkb *lkb, *safe;
5413
4875647a
DT
5414 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5415 if (!is_master_copy(lkb))
5416 continue;
5417
5418 /* don't purge lkbs we've added in recover_master_copy for
5419 the current recovery seq */
5420
5421 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5422 continue;
5423
5424 del_lkb(r, lkb);
5425
5426 /* this put should free the lkb */
5427 if (!dlm_put_lkb(lkb))
5428 log_error(ls, "purged mstcpy lkb not released");
e7fd4179
DT
5429 }
5430}
5431
4875647a 5432void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
e7fd4179 5433{
4875647a 5434 struct dlm_ls *ls = r->res_ls;
e7fd4179 5435
4875647a
DT
5436 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5437 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5438 purge_mstcpy_list(ls, r, &r->res_waitqueue);
e7fd4179
DT
5439}
5440
4875647a
DT
5441static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5442 struct list_head *list,
5443 int nodeid_gone, unsigned int *count)
e7fd4179 5444{
4875647a 5445 struct dlm_lkb *lkb, *safe;
e7fd4179 5446
4875647a
DT
5447 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5448 if (!is_master_copy(lkb))
5449 continue;
5450
5451 if ((lkb->lkb_nodeid == nodeid_gone) ||
5452 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5453
da8c6663
DT
5454 /* tell recover_lvb to invalidate the lvb
5455 because a node holding EX/PW failed */
5456 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5457 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5458 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5459 }
5460
4875647a
DT
5461 del_lkb(r, lkb);
5462
5463 /* this put should free the lkb */
5464 if (!dlm_put_lkb(lkb))
5465 log_error(ls, "purged dead lkb not released");
5466
5467 rsb_set_flag(r, RSB_RECOVER_GRANT);
5468
5469 (*count)++;
5470 }
5471 }
e7fd4179
DT
5472}
5473
5474/* Get rid of locks held by nodes that are gone. */
5475
4875647a 5476void dlm_recover_purge(struct dlm_ls *ls)
e7fd4179
DT
5477{
5478 struct dlm_rsb *r;
4875647a
DT
5479 struct dlm_member *memb;
5480 int nodes_count = 0;
5481 int nodeid_gone = 0;
5482 unsigned int lkb_count = 0;
5483
5484 /* cache one removed nodeid to optimize the common
5485 case of a single node removed */
5486
5487 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5488 nodes_count++;
5489 nodeid_gone = memb->nodeid;
5490 }
e7fd4179 5491
4875647a
DT
5492 if (!nodes_count)
5493 return;
e7fd4179
DT
5494
5495 down_write(&ls->ls_root_sem);
5496 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5497 hold_rsb(r);
5498 lock_rsb(r);
4875647a
DT
5499 if (is_master(r)) {
5500 purge_dead_list(ls, r, &r->res_grantqueue,
5501 nodeid_gone, &lkb_count);
5502 purge_dead_list(ls, r, &r->res_convertqueue,
5503 nodeid_gone, &lkb_count);
5504 purge_dead_list(ls, r, &r->res_waitqueue,
5505 nodeid_gone, &lkb_count);
5506 }
e7fd4179
DT
5507 unlock_rsb(r);
5508 unhold_rsb(r);
4875647a 5509 cond_resched();
e7fd4179
DT
5510 }
5511 up_write(&ls->ls_root_sem);
5512
4875647a 5513 if (lkb_count)
075f0177 5514 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
4875647a 5515 lkb_count, nodes_count);
e7fd4179
DT
5516}
5517
4875647a 5518static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
97a35d1e 5519{
9beb3bf5 5520 struct rb_node *n;
4875647a 5521 struct dlm_rsb *r;
97a35d1e 5522
c7be761a 5523 spin_lock(&ls->ls_rsbtbl[bucket].lock);
9beb3bf5
BP
5524 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5525 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4875647a
DT
5526
5527 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5528 continue;
c503a621
DT
5529 if (!is_master(r)) {
5530 rsb_clear_flag(r, RSB_RECOVER_GRANT);
97a35d1e 5531 continue;
c503a621 5532 }
97a35d1e 5533 hold_rsb(r);
4875647a
DT
5534 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5535 return r;
97a35d1e 5536 }
c7be761a 5537 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4875647a 5538 return NULL;
97a35d1e
DT
5539}
5540
4875647a
DT
5541/*
5542 * Attempt to grant locks on resources that we are the master of.
5543 * Locks may have become grantable during recovery because locks
5544 * from departed nodes have been purged (or not rebuilt), allowing
5545 * previously blocked locks to now be granted. The subset of rsb's
5546 * we are interested in are those with lkb's on either the convert or
5547 * waiting queues.
5548 *
5549 * Simplest would be to go through each master rsb and check for non-empty
5550 * convert or waiting queues, and attempt to grant on those rsbs.
5551 * Checking the queues requires lock_rsb, though, for which we'd need
5552 * to release the rsbtbl lock. This would make iterating through all
5553 * rsb's very inefficient. So, we rely on earlier recovery routines
5554 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5555 * locks for.
5556 */
5557
5558void dlm_recover_grant(struct dlm_ls *ls)
e7fd4179
DT
5559{
5560 struct dlm_rsb *r;
2b4e926a 5561 int bucket = 0;
4875647a
DT
5562 unsigned int count = 0;
5563 unsigned int rsb_count = 0;
5564 unsigned int lkb_count = 0;
e7fd4179 5565
2b4e926a 5566 while (1) {
4875647a 5567 r = find_grant_rsb(ls, bucket);
2b4e926a
DT
5568 if (!r) {
5569 if (bucket == ls->ls_rsbtbl_size - 1)
5570 break;
5571 bucket++;
97a35d1e 5572 continue;
2b4e926a 5573 }
4875647a
DT
5574 rsb_count++;
5575 count = 0;
97a35d1e 5576 lock_rsb(r);
c503a621 5577 /* the RECOVER_GRANT flag is checked in the grant path */
4875647a 5578 grant_pending_locks(r, &count);
c503a621 5579 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4875647a
DT
5580 lkb_count += count;
5581 confirm_master(r, 0);
97a35d1e
DT
5582 unlock_rsb(r);
5583 put_rsb(r);
4875647a 5584 cond_resched();
e7fd4179 5585 }
4875647a
DT
5586
5587 if (lkb_count)
075f0177 5588 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
4875647a 5589 lkb_count, rsb_count);
e7fd4179
DT
5590}
5591
5592static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5593 uint32_t remid)
5594{
5595 struct dlm_lkb *lkb;
5596
5597 list_for_each_entry(lkb, head, lkb_statequeue) {
5598 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5599 return lkb;
5600 }
5601 return NULL;
5602}
5603
5604static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5605 uint32_t remid)
5606{
5607 struct dlm_lkb *lkb;
5608
5609 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5610 if (lkb)
5611 return lkb;
5612 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5613 if (lkb)
5614 return lkb;
5615 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5616 if (lkb)
5617 return lkb;
5618 return NULL;
5619}
5620
ae773d0b 5621/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
5622static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5623 struct dlm_rsb *r, struct dlm_rcom *rc)
5624{
5625 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179 5626
3428785a 5627 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
163a1859
AV
5628 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5629 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5630 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5631 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 5632 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 5633 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
5634 lkb->lkb_rqmode = rl->rl_rqmode;
5635 lkb->lkb_grmode = rl->rl_grmode;
5636 /* don't set lkb_status because add_lkb wants to itself */
5637
8304d6f2
DT
5638 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5639 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 5640
e7fd4179 5641 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3428785a
AA
5642 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5643 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
a5dd0631
AV
5644 if (lvblen > ls->ls_lvblen)
5645 return -EINVAL;
52bda2b5 5646 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
5647 if (!lkb->lkb_lvbptr)
5648 return -ENOMEM;
e7fd4179
DT
5649 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5650 }
5651
5652 /* Conversions between PR and CW (middle modes) need special handling.
5653 The real granted mode of these converting locks cannot be determined
5654 until all locks have been rebuilt on the rsb (recover_conversion) */
5655
163a1859
AV
5656 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5657 middle_conversion(lkb)) {
e7fd4179
DT
5658 rl->rl_status = DLM_LKSTS_CONVERT;
5659 lkb->lkb_grmode = DLM_LOCK_IV;
5660 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5661 }
5662
5663 return 0;
5664}
5665
5666/* This lkb may have been recovered in a previous aborted recovery so we need
5667 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5668 If so we just send back a standard reply. If not, we create a new lkb with
5669 the given values and send back our lkid. We send back our lkid by sending
5670 back the rcom_lock struct we got but with the remid field filled in. */
5671
ae773d0b 5672/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
5673int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5674{
5675 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5676 struct dlm_rsb *r;
5677 struct dlm_lkb *lkb;
6d40c4a7 5678 uint32_t remid = 0;
3428785a 5679 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
e7fd4179
DT
5680 int error;
5681
5682 if (rl->rl_parent_lkid) {
5683 error = -EOPNOTSUPP;
5684 goto out;
5685 }
5686
6d40c4a7
DT
5687 remid = le32_to_cpu(rl->rl_lkid);
5688
4875647a
DT
5689 /* In general we expect the rsb returned to be R_MASTER, but we don't
5690 have to require it. Recovery of masters on one node can overlap
5691 recovery of locks on another node, so one node can send us MSTCPY
5692 locks before we've made ourselves master of this rsb. We can still
5693 add new MSTCPY locks that we receive here without any harm; when
5694 we make ourselves master, dlm_recover_masters() won't touch the
5695 MSTCPY locks we've received early. */
5696
c04fecb4
DT
5697 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5698 from_nodeid, R_RECEIVE_RECOVER, &r);
e7fd4179
DT
5699 if (error)
5700 goto out;
5701
c04fecb4
DT
5702 lock_rsb(r);
5703
4875647a
DT
5704 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5705 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
c04fecb4 5706 from_nodeid, remid);
4875647a 5707 error = -EBADR;
c04fecb4 5708 goto out_unlock;
4875647a
DT
5709 }
5710
c04fecb4 5711 lkb = search_remid(r, from_nodeid, remid);
e7fd4179
DT
5712 if (lkb) {
5713 error = -EEXIST;
5714 goto out_remid;
5715 }
5716
5717 error = create_lkb(ls, &lkb);
5718 if (error)
5719 goto out_unlock;
5720
5721 error = receive_rcom_lock_args(ls, lkb, r, rc);
5722 if (error) {
b3f58d8f 5723 __put_lkb(ls, lkb);
e7fd4179
DT
5724 goto out_unlock;
5725 }
5726
5727 attach_lkb(r, lkb);
5728 add_lkb(r, lkb, rl->rl_status);
4875647a
DT
5729 ls->ls_recover_locks_in++;
5730
5731 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5732 rsb_set_flag(r, RSB_RECOVER_GRANT);
e7fd4179
DT
5733
5734 out_remid:
5735 /* this is the new value returned to the lock holder for
5736 saving in its process-copy lkb */
163a1859 5737 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179 5738
4875647a
DT
5739 lkb->lkb_recover_seq = ls->ls_recover_seq;
5740
e7fd4179
DT
5741 out_unlock:
5742 unlock_rsb(r);
5743 put_rsb(r);
5744 out:
6d40c4a7 5745 if (error && error != -EEXIST)
075f0177 5746 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
c04fecb4 5747 from_nodeid, remid, error);
163a1859 5748 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
5749 return error;
5750}
5751
ae773d0b 5752/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
5753int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5754{
5755 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5756 struct dlm_rsb *r;
5757 struct dlm_lkb *lkb;
6d40c4a7
DT
5758 uint32_t lkid, remid;
5759 int error, result;
5760
5761 lkid = le32_to_cpu(rl->rl_lkid);
5762 remid = le32_to_cpu(rl->rl_remid);
5763 result = le32_to_cpu(rl->rl_result);
e7fd4179 5764
6d40c4a7 5765 error = find_lkb(ls, lkid, &lkb);
e7fd4179 5766 if (error) {
6d40c4a7 5767 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
3428785a
AA
5768 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5769 result);
e7fd4179
DT
5770 return error;
5771 }
5772
4875647a
DT
5773 r = lkb->lkb_resource;
5774 hold_rsb(r);
5775 lock_rsb(r);
5776
6d40c4a7
DT
5777 if (!is_process_copy(lkb)) {
5778 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
3428785a
AA
5779 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5780 result);
4875647a
DT
5781 dlm_dump_rsb(r);
5782 unlock_rsb(r);
5783 put_rsb(r);
5784 dlm_put_lkb(lkb);
6d40c4a7
DT
5785 return -EINVAL;
5786 }
e7fd4179 5787
6d40c4a7 5788 switch (result) {
dc200a88
DT
5789 case -EBADR:
5790 /* There's a chance the new master received our lock before
5791 dlm_recover_master_reply(), this wouldn't happen if we did
5792 a barrier between recover_masters and recover_locks. */
6d40c4a7
DT
5793
5794 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
3428785a
AA
5795 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5796 result);
6d40c4a7 5797
dc200a88
DT
5798 dlm_send_rcom_lock(r, lkb);
5799 goto out;
e7fd4179 5800 case -EEXIST:
e7fd4179 5801 case 0:
6d40c4a7 5802 lkb->lkb_remid = remid;
e7fd4179
DT
5803 break;
5804 default:
6d40c4a7 5805 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
3428785a
AA
5806 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5807 result);
e7fd4179
DT
5808 }
5809
5810 /* an ack for dlm_recover_locks() which waits for replies from
5811 all the locks it sends to new masters */
5812 dlm_recovered_lock(r);
dc200a88 5813 out:
e7fd4179
DT
5814 unlock_rsb(r);
5815 put_rsb(r);
b3f58d8f 5816 dlm_put_lkb(lkb);
e7fd4179
DT
5817
5818 return 0;
5819}
5820
6b0afc0c 5821#ifdef CONFIG_DLM_DEPRECATED_API
597d0cae
DT
5822int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5823 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 5824 unsigned long timeout_cs)
6b0afc0c
AA
5825#else
5826int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5827 int mode, uint32_t flags, void *name, unsigned int namelen)
5828#endif
597d0cae
DT
5829{
5830 struct dlm_lkb *lkb;
5831 struct dlm_args args;
5832 int error;
5833
85e86edf 5834 dlm_lock_recovery(ls);
597d0cae
DT
5835
5836 error = create_lkb(ls, &lkb);
5837 if (error) {
5838 kfree(ua);
5839 goto out;
5840 }
5841
5842 if (flags & DLM_LKF_VALBLK) {
573c24c4 5843 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
5844 if (!ua->lksb.sb_lvbptr) {
5845 kfree(ua);
5846 __put_lkb(ls, lkb);
5847 error = -ENOMEM;
5848 goto out;
5849 }
5850 }
6b0afc0c 5851#ifdef CONFIG_DLM_DEPRECATED_API
d7db923e 5852 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 5853 fake_astfn, ua, fake_bastfn, &args);
6b0afc0c
AA
5854#else
5855 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5856 fake_bastfn, &args);
5857#endif
597d0cae 5858 if (error) {
d47b41ac
VA
5859 kfree(ua->lksb.sb_lvbptr);
5860 ua->lksb.sb_lvbptr = NULL;
5861 kfree(ua);
597d0cae
DT
5862 __put_lkb(ls, lkb);
5863 goto out;
5864 }
5865
d47b41ac
VA
5866 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5867 When DLM_IFL_USER is set, the dlm knows that this is a userspace
5868 lock and that lkb_astparam is the dlm_user_args structure. */
5869 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
5870 error = request_lock(ls, lkb, name, namelen, &args);
5871
5872 switch (error) {
5873 case 0:
5874 break;
5875 case -EINPROGRESS:
5876 error = 0;
5877 break;
5878 case -EAGAIN:
5879 error = 0;
df561f66 5880 fallthrough;
597d0cae
DT
5881 default:
5882 __put_lkb(ls, lkb);
5883 goto out;
5884 }
5885
5886 /* add this new lkb to the per-process list of locks */
5887 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 5888 hold_lkb(lkb);
597d0cae
DT
5889 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5890 spin_unlock(&ua->proc->locks_spin);
5891 out:
85e86edf 5892 dlm_unlock_recovery(ls);
597d0cae
DT
5893 return error;
5894}
5895
6b0afc0c 5896#ifdef CONFIG_DLM_DEPRECATED_API
597d0cae 5897int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
5898 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5899 unsigned long timeout_cs)
6b0afc0c
AA
5900#else
5901int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5902 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5903#endif
597d0cae
DT
5904{
5905 struct dlm_lkb *lkb;
5906 struct dlm_args args;
5907 struct dlm_user_args *ua;
5908 int error;
5909
85e86edf 5910 dlm_lock_recovery(ls);
597d0cae
DT
5911
5912 error = find_lkb(ls, lkid, &lkb);
5913 if (error)
5914 goto out;
5915
5916 /* user can change the params on its lock when it converts it, or
5917 add an lvb that didn't exist before */
5918
d292c0cc 5919 ua = lkb->lkb_ua;
597d0cae
DT
5920
5921 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 5922 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
5923 if (!ua->lksb.sb_lvbptr) {
5924 error = -ENOMEM;
5925 goto out_put;
5926 }
5927 }
5928 if (lvb_in && ua->lksb.sb_lvbptr)
5929 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5930
d7db923e 5931 ua->xid = ua_tmp->xid;
597d0cae
DT
5932 ua->castparam = ua_tmp->castparam;
5933 ua->castaddr = ua_tmp->castaddr;
5934 ua->bastparam = ua_tmp->bastparam;
5935 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 5936 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 5937
6b0afc0c 5938#ifdef CONFIG_DLM_DEPRECATED_API
d7db923e 5939 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 5940 fake_astfn, ua, fake_bastfn, &args);
6b0afc0c
AA
5941#else
5942 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5943 fake_bastfn, &args);
5944#endif
597d0cae
DT
5945 if (error)
5946 goto out_put;
5947
5948 error = convert_lock(ls, lkb, &args);
5949
c85d65e9 5950 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
5951 error = 0;
5952 out_put:
5953 dlm_put_lkb(lkb);
5954 out:
85e86edf 5955 dlm_unlock_recovery(ls);
597d0cae
DT
5956 kfree(ua_tmp);
5957 return error;
5958}
5959
2ab4bd8e
DT
5960/*
5961 * The caller asks for an orphan lock on a given resource with a given mode.
5962 * If a matching lock exists, it's moved to the owner's list of locks and
5963 * the lkid is returned.
5964 */
5965
5966int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5967 int mode, uint32_t flags, void *name, unsigned int namelen,
8d614a44 5968 uint32_t *lkid)
2ab4bd8e 5969{
dc1acd5c 5970 struct dlm_lkb *lkb = NULL, *iter;
2ab4bd8e
DT
5971 struct dlm_user_args *ua;
5972 int found_other_mode = 0;
2ab4bd8e
DT
5973 int rv = 0;
5974
5975 mutex_lock(&ls->ls_orphans_mutex);
dc1acd5c
JK
5976 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5977 if (iter->lkb_resource->res_length != namelen)
2ab4bd8e 5978 continue;
dc1acd5c 5979 if (memcmp(iter->lkb_resource->res_name, name, namelen))
2ab4bd8e 5980 continue;
dc1acd5c 5981 if (iter->lkb_grmode != mode) {
2ab4bd8e
DT
5982 found_other_mode = 1;
5983 continue;
5984 }
5985
dc1acd5c
JK
5986 lkb = iter;
5987 list_del_init(&iter->lkb_ownqueue);
5988 iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5989 *lkid = iter->lkb_id;
2ab4bd8e
DT
5990 break;
5991 }
5992 mutex_unlock(&ls->ls_orphans_mutex);
5993
dc1acd5c 5994 if (!lkb && found_other_mode) {
2ab4bd8e
DT
5995 rv = -EAGAIN;
5996 goto out;
5997 }
5998
dc1acd5c 5999 if (!lkb) {
2ab4bd8e
DT
6000 rv = -ENOENT;
6001 goto out;
6002 }
6003
6004 lkb->lkb_exflags = flags;
6005 lkb->lkb_ownpid = (int) current->pid;
6006
6007 ua = lkb->lkb_ua;
6008
6009 ua->proc = ua_tmp->proc;
6010 ua->xid = ua_tmp->xid;
6011 ua->castparam = ua_tmp->castparam;
6012 ua->castaddr = ua_tmp->castaddr;
6013 ua->bastparam = ua_tmp->bastparam;
6014 ua->bastaddr = ua_tmp->bastaddr;
6015 ua->user_lksb = ua_tmp->user_lksb;
6016
6017 /*
6018 * The lkb reference from the ls_orphans list was not
6019 * removed above, and is now considered the reference
6020 * for the proc locks list.
6021 */
6022
6023 spin_lock(&ua->proc->locks_spin);
6024 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
6025 spin_unlock(&ua->proc->locks_spin);
6026 out:
6027 kfree(ua_tmp);
6028 return rv;
6029}
6030
597d0cae
DT
6031int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6032 uint32_t flags, uint32_t lkid, char *lvb_in)
6033{
6034 struct dlm_lkb *lkb;
6035 struct dlm_args args;
6036 struct dlm_user_args *ua;
6037 int error;
6038
85e86edf 6039 dlm_lock_recovery(ls);
597d0cae
DT
6040
6041 error = find_lkb(ls, lkid, &lkb);
6042 if (error)
6043 goto out;
6044
d292c0cc 6045 ua = lkb->lkb_ua;
597d0cae
DT
6046
6047 if (lvb_in && ua->lksb.sb_lvbptr)
6048 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
6049 if (ua_tmp->castparam)
6050 ua->castparam = ua_tmp->castparam;
cc346d55 6051 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
6052
6053 error = set_unlock_args(flags, ua, &args);
6054 if (error)
6055 goto out_put;
6056
6057 error = unlock_lock(ls, lkb, &args);
6058
6059 if (error == -DLM_EUNLOCK)
6060 error = 0;
ef0c2bb0
DT
6061 /* from validate_unlock_args() */
6062 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6063 error = 0;
597d0cae
DT
6064 if (error)
6065 goto out_put;
6066
6067 spin_lock(&ua->proc->locks_spin);
23e8e1aa 6068 /* dlm_user_add_cb() may have already taken lkb off the proc list */
a1bc86e6
DT
6069 if (!list_empty(&lkb->lkb_ownqueue))
6070 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 6071 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
6072 out_put:
6073 dlm_put_lkb(lkb);
6074 out:
85e86edf 6075 dlm_unlock_recovery(ls);
ef0c2bb0 6076 kfree(ua_tmp);
597d0cae
DT
6077 return error;
6078}
6079
6080int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6081 uint32_t flags, uint32_t lkid)
6082{
6083 struct dlm_lkb *lkb;
6084 struct dlm_args args;
6085 struct dlm_user_args *ua;
6086 int error;
6087
85e86edf 6088 dlm_lock_recovery(ls);
597d0cae
DT
6089
6090 error = find_lkb(ls, lkid, &lkb);
6091 if (error)
6092 goto out;
6093
d292c0cc 6094 ua = lkb->lkb_ua;
b434eda6
PC
6095 if (ua_tmp->castparam)
6096 ua->castparam = ua_tmp->castparam;
c059f70e 6097 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
6098
6099 error = set_unlock_args(flags, ua, &args);
6100 if (error)
6101 goto out_put;
6102
6103 error = cancel_lock(ls, lkb, &args);
6104
6105 if (error == -DLM_ECANCEL)
6106 error = 0;
ef0c2bb0
DT
6107 /* from validate_unlock_args() */
6108 if (error == -EBUSY)
6109 error = 0;
597d0cae
DT
6110 out_put:
6111 dlm_put_lkb(lkb);
6112 out:
85e86edf 6113 dlm_unlock_recovery(ls);
ef0c2bb0 6114 kfree(ua_tmp);
597d0cae
DT
6115 return error;
6116}
6117
8b4021fa
DT
6118int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6119{
6120 struct dlm_lkb *lkb;
6121 struct dlm_args args;
6122 struct dlm_user_args *ua;
6123 struct dlm_rsb *r;
6124 int error;
6125
6126 dlm_lock_recovery(ls);
6127
6128 error = find_lkb(ls, lkid, &lkb);
6129 if (error)
6130 goto out;
6131
d292c0cc 6132 ua = lkb->lkb_ua;
8b4021fa
DT
6133
6134 error = set_unlock_args(flags, ua, &args);
6135 if (error)
6136 goto out_put;
6137
6138 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6139
6140 r = lkb->lkb_resource;
6141 hold_rsb(r);
6142 lock_rsb(r);
6143
6144 error = validate_unlock_args(lkb, &args);
6145 if (error)
6146 goto out_r;
6147 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6148
6149 error = _cancel_lock(r, lkb);
6150 out_r:
6151 unlock_rsb(r);
6152 put_rsb(r);
6153
6154 if (error == -DLM_ECANCEL)
6155 error = 0;
6156 /* from validate_unlock_args() */
6157 if (error == -EBUSY)
6158 error = 0;
6159 out_put:
6160 dlm_put_lkb(lkb);
6161 out:
6162 dlm_unlock_recovery(ls);
6163 return error;
6164}
6165
ef0c2bb0
DT
6166/* lkb's that are removed from the waiters list by revert are just left on the
6167 orphans list with the granted orphan locks, to be freed by purge */
6168
597d0cae
DT
6169static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6170{
ef0c2bb0
DT
6171 struct dlm_args args;
6172 int error;
597d0cae 6173
2ab4bd8e 6174 hold_lkb(lkb); /* reference for the ls_orphans list */
ef0c2bb0
DT
6175 mutex_lock(&ls->ls_orphans_mutex);
6176 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6177 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 6178
d292c0cc 6179 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
6180
6181 error = cancel_lock(ls, lkb, &args);
6182 if (error == -DLM_ECANCEL)
6183 error = 0;
6184 return error;
597d0cae
DT
6185}
6186
da8c6663
DT
6187/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6188 granted. Regardless of what rsb queue the lock is on, it's removed and
6189 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6190 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
597d0cae
DT
6191
6192static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6193{
597d0cae
DT
6194 struct dlm_args args;
6195 int error;
6196
da8c6663
DT
6197 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6198 lkb->lkb_ua, &args);
597d0cae
DT
6199
6200 error = unlock_lock(ls, lkb, &args);
6201 if (error == -DLM_EUNLOCK)
6202 error = 0;
6203 return error;
6204}
6205
ef0c2bb0
DT
6206/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6207 (which does lock_rsb) due to deadlock with receiving a message that does
23e8e1aa 6208 lock_rsb followed by dlm_user_add_cb() */
ef0c2bb0
DT
6209
6210static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6211 struct dlm_user_proc *proc)
6212{
6213 struct dlm_lkb *lkb = NULL;
6214
6215 mutex_lock(&ls->ls_clear_proc_locks);
6216 if (list_empty(&proc->locks))
6217 goto out;
6218
6219 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6220 list_del_init(&lkb->lkb_ownqueue);
6221
6222 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6223 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6224 else
6225 lkb->lkb_flags |= DLM_IFL_DEAD;
6226 out:
6227 mutex_unlock(&ls->ls_clear_proc_locks);
6228 return lkb;
6229}
6230
23e8e1aa 6231/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
597d0cae
DT
6232 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6233 which we clear here. */
6234
6235/* proc CLOSING flag is set so no more device_reads should look at proc->asts
6236 list, and no more device_writes should add lkb's to proc->locks list; so we
6237 shouldn't need to take asts_spin or locks_spin here. this assumes that
6238 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6239 them ourself. */
6240
6241void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6242{
6243 struct dlm_lkb *lkb, *safe;
6244
85e86edf 6245 dlm_lock_recovery(ls);
597d0cae 6246
ef0c2bb0
DT
6247 while (1) {
6248 lkb = del_proc_lock(ls, proc);
6249 if (!lkb)
6250 break;
84d8cd69 6251 del_timeout(lkb);
ef0c2bb0 6252 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 6253 orphan_proc_lock(ls, lkb);
ef0c2bb0 6254 else
597d0cae 6255 unlock_proc_lock(ls, lkb);
597d0cae
DT
6256
6257 /* this removes the reference for the proc->locks list
6258 added by dlm_user_request, it may result in the lkb
6259 being freed */
6260
6261 dlm_put_lkb(lkb);
6262 }
a1bc86e6 6263
ef0c2bb0
DT
6264 mutex_lock(&ls->ls_clear_proc_locks);
6265
a1bc86e6
DT
6266 /* in-progress unlocks */
6267 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6268 list_del_init(&lkb->lkb_ownqueue);
6269 lkb->lkb_flags |= DLM_IFL_DEAD;
6270 dlm_put_lkb(lkb);
6271 }
6272
23e8e1aa 6273 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
6274 memset(&lkb->lkb_callbacks, 0,
6275 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 6276 list_del_init(&lkb->lkb_cb_list);
a1bc86e6
DT
6277 dlm_put_lkb(lkb);
6278 }
6279
597d0cae 6280 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 6281 dlm_unlock_recovery(ls);
597d0cae 6282}
a1bc86e6 6283
8499137d
DT
6284static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6285{
6286 struct dlm_lkb *lkb, *safe;
6287
6288 while (1) {
6289 lkb = NULL;
6290 spin_lock(&proc->locks_spin);
6291 if (!list_empty(&proc->locks)) {
6292 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6293 lkb_ownqueue);
6294 list_del_init(&lkb->lkb_ownqueue);
6295 }
6296 spin_unlock(&proc->locks_spin);
6297
6298 if (!lkb)
6299 break;
6300
6301 lkb->lkb_flags |= DLM_IFL_DEAD;
6302 unlock_proc_lock(ls, lkb);
6303 dlm_put_lkb(lkb); /* ref from proc->locks list */
6304 }
6305
6306 spin_lock(&proc->locks_spin);
6307 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6308 list_del_init(&lkb->lkb_ownqueue);
6309 lkb->lkb_flags |= DLM_IFL_DEAD;
6310 dlm_put_lkb(lkb);
6311 }
6312 spin_unlock(&proc->locks_spin);
6313
6314 spin_lock(&proc->asts_spin);
23e8e1aa 6315 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
8304d6f2
DT
6316 memset(&lkb->lkb_callbacks, 0,
6317 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
23e8e1aa 6318 list_del_init(&lkb->lkb_cb_list);
8499137d
DT
6319 dlm_put_lkb(lkb);
6320 }
6321 spin_unlock(&proc->asts_spin);
6322}
6323
6324/* pid of 0 means purge all orphans */
6325
6326static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6327{
6328 struct dlm_lkb *lkb, *safe;
6329
6330 mutex_lock(&ls->ls_orphans_mutex);
6331 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6332 if (pid && lkb->lkb_ownpid != pid)
6333 continue;
6334 unlock_proc_lock(ls, lkb);
6335 list_del_init(&lkb->lkb_ownqueue);
6336 dlm_put_lkb(lkb);
6337 }
6338 mutex_unlock(&ls->ls_orphans_mutex);
6339}
6340
6341static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6342{
6343 struct dlm_message *ms;
6344 struct dlm_mhandle *mh;
6345 int error;
6346
6347 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6348 DLM_MSG_PURGE, &ms, &mh);
6349 if (error)
6350 return error;
00e99ccd
AA
6351 ms->m_nodeid = cpu_to_le32(nodeid);
6352 ms->m_pid = cpu_to_le32(pid);
8499137d
DT
6353
6354 return send_message(mh, ms);
6355}
6356
6357int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6358 int nodeid, int pid)
6359{
6360 int error = 0;
6361
2ab4bd8e 6362 if (nodeid && (nodeid != dlm_our_nodeid())) {
8499137d
DT
6363 error = send_purge(ls, nodeid, pid);
6364 } else {
85e86edf 6365 dlm_lock_recovery(ls);
8499137d
DT
6366 if (pid == current->pid)
6367 purge_proc_locks(ls, proc);
6368 else
6369 do_purge(ls, nodeid, pid);
85e86edf 6370 dlm_unlock_recovery(ls);
8499137d
DT
6371 }
6372 return error;
6373}
6374
5054e79d
AA
6375/* debug functionality */
6376int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6377 int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
6378{
6379 struct dlm_lksb *lksb;
6380 struct dlm_lkb *lkb;
6381 struct dlm_rsb *r;
6382 int error;
6383
6384 /* we currently can't set a valid user lock */
6385 if (lkb_flags & DLM_IFL_USER)
6386 return -EOPNOTSUPP;
6387
6388 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6389 if (!lksb)
6390 return -ENOMEM;
6391
6392 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6393 if (error) {
6394 kfree(lksb);
6395 return error;
6396 }
6397
6398 lkb->lkb_flags = lkb_flags;
6399 lkb->lkb_nodeid = lkb_nodeid;
6400 lkb->lkb_lksb = lksb;
6401 /* user specific pointer, just don't have it NULL for kernel locks */
6402 if (~lkb_flags & DLM_IFL_USER)
6403 lkb->lkb_astparam = (void *)0xDEADBEEF;
6404
6405 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6406 if (error) {
6407 kfree(lksb);
6408 __put_lkb(ls, lkb);
6409 return error;
6410 }
6411
6412 lock_rsb(r);
6413 attach_lkb(r, lkb);
6414 add_lkb(r, lkb, lkb_status);
6415 unlock_rsb(r);
6416 put_rsb(r);
6417
6418 return 0;
6419}
6420
63eab2b0
AA
6421int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6422 int mstype, int to_nodeid)
6423{
6424 struct dlm_lkb *lkb;
6425 int error;
6426
6427 error = find_lkb(ls, lkb_id, &lkb);
6428 if (error)
6429 return error;
6430
6431 error = add_to_waiters(lkb, mstype, to_nodeid);
6432 dlm_put_lkb(lkb);
6433 return error;
6434}
6435
This page took 1.931495 seconds and 4 git commands to generate.