]> Git Repo - linux.git/blob - fs/dlm/recoverd.c
filelock: Remove locks reliably when fcntl/close race is detected
[linux.git] / fs / dlm / recoverd.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "dir.h"
16 #include "ast.h"
17 #include "recover.h"
18 #include "lowcomms.h"
19 #include "lock.h"
20 #include "requestqueue.h"
21 #include "recoverd.h"
22
23 static int dlm_create_masters_list(struct dlm_ls *ls)
24 {
25         struct dlm_rsb *r;
26         int error = 0;
27
28         write_lock_bh(&ls->ls_masters_lock);
29         if (!list_empty(&ls->ls_masters_list)) {
30                 log_error(ls, "root list not empty");
31                 error = -EINVAL;
32                 goto out;
33         }
34
35         read_lock_bh(&ls->ls_rsbtbl_lock);
36         list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
37                 if (r->res_nodeid)
38                         continue;
39
40                 list_add(&r->res_masters_list, &ls->ls_masters_list);
41                 dlm_hold_rsb(r);
42         }
43         read_unlock_bh(&ls->ls_rsbtbl_lock);
44  out:
45         write_unlock_bh(&ls->ls_masters_lock);
46         return error;
47 }
48
49 static void dlm_release_masters_list(struct dlm_ls *ls)
50 {
51         struct dlm_rsb *r, *safe;
52
53         write_lock_bh(&ls->ls_masters_lock);
54         list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
55                 list_del_init(&r->res_masters_list);
56                 dlm_put_rsb(r);
57         }
58         write_unlock_bh(&ls->ls_masters_lock);
59 }
60
61 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
62 {
63         struct dlm_rsb *r;
64
65         read_lock_bh(&ls->ls_rsbtbl_lock);
66         list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
67                 list_add(&r->res_root_list, root_list);
68                 dlm_hold_rsb(r);
69         }
70
71         WARN_ON_ONCE(!list_empty(&ls->ls_toss));
72         read_unlock_bh(&ls->ls_rsbtbl_lock);
73 }
74
75 static void dlm_release_root_list(struct list_head *root_list)
76 {
77         struct dlm_rsb *r, *safe;
78
79         list_for_each_entry_safe(r, safe, root_list, res_root_list) {
80                 list_del_init(&r->res_root_list);
81                 dlm_put_rsb(r);
82         }
83 }
84
85 /* If the start for which we're re-enabling locking (seq) has been superseded
86    by a newer stop (ls_recover_seq), we need to leave locking disabled.
87
88    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
89    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
90    enables locking and clears the requestqueue between a and b. */
91
92 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
93 {
94         int error = -EINTR;
95
96         write_lock_bh(&ls->ls_recv_active);
97
98         spin_lock_bh(&ls->ls_recover_lock);
99         if (ls->ls_recover_seq == seq) {
100                 set_bit(LSFL_RUNNING, &ls->ls_flags);
101                 /* Schedule next timer if recovery put something on toss.
102                  *
103                  * The rsbs that was queued while recovery on toss hasn't
104                  * started yet because LSFL_RUNNING was set everything
105                  * else recovery hasn't started as well because ls_in_recovery
106                  * is still hold. So we should not run into the case that
107                  * dlm_timer_resume() queues a timer that can occur in
108                  * a no op.
109                  */
110                 dlm_timer_resume(ls);
111                 /* unblocks processes waiting to enter the dlm */
112                 up_write(&ls->ls_in_recovery);
113                 clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
114                 error = 0;
115         }
116         spin_unlock_bh(&ls->ls_recover_lock);
117
118         write_unlock_bh(&ls->ls_recv_active);
119         return error;
120 }
121
122 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
123 {
124         LIST_HEAD(root_list);
125         unsigned long start;
126         int error, neg = 0;
127
128         log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
129
130         mutex_lock(&ls->ls_recoverd_active);
131
132         dlm_callback_suspend(ls);
133
134         dlm_clear_toss(ls);
135
136         /*
137          * This list of root rsb's will be the basis of most of the recovery
138          * routines.
139          */
140
141         dlm_create_root_list(ls, &root_list);
142
143         /*
144          * Add or remove nodes from the lockspace's ls_nodes list.
145          *
146          * Due to the fact that we must report all membership changes to lsops
147          * or midcomms layer, it is not permitted to abort ls_recover() until
148          * this is done.
149          */
150
151         error = dlm_recover_members(ls, rv, &neg);
152         if (error) {
153                 log_rinfo(ls, "dlm_recover_members error %d", error);
154                 goto fail;
155         }
156
157         dlm_recover_dir_nodeid(ls, &root_list);
158
159         /* Create a snapshot of all active rsbs were we are the master of.
160          * During the barrier between dlm_recover_members_wait() and
161          * dlm_recover_directory() other nodes can dump their necessary
162          * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
163          * communication dlm_copy_master_names() handling.
164          *
165          * TODO We should create a per lockspace list that contains rsbs
166          * that we are the master of. Instead of creating this list while
167          * recovery we keep track of those rsbs while locking handling and
168          * recovery can use it when necessary.
169          */
170         error = dlm_create_masters_list(ls);
171         if (error) {
172                 log_rinfo(ls, "dlm_create_masters_list error %d", error);
173                 goto fail_root_list;
174         }
175
176         ls->ls_recover_locks_in = 0;
177
178         dlm_set_recover_status(ls, DLM_RS_NODES);
179
180         error = dlm_recover_members_wait(ls, rv->seq);
181         if (error) {
182                 log_rinfo(ls, "dlm_recover_members_wait error %d", error);
183                 dlm_release_masters_list(ls);
184                 goto fail_root_list;
185         }
186
187         start = jiffies;
188
189         /*
190          * Rebuild our own share of the directory by collecting from all other
191          * nodes their master rsb names that hash to us.
192          */
193
194         error = dlm_recover_directory(ls, rv->seq);
195         if (error) {
196                 log_rinfo(ls, "dlm_recover_directory error %d", error);
197                 dlm_release_masters_list(ls);
198                 goto fail_root_list;
199         }
200
201         dlm_set_recover_status(ls, DLM_RS_DIR);
202
203         error = dlm_recover_directory_wait(ls, rv->seq);
204         if (error) {
205                 log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
206                 dlm_release_masters_list(ls);
207                 goto fail_root_list;
208         }
209
210         dlm_release_masters_list(ls);
211
212         /*
213          * We may have outstanding operations that are waiting for a reply from
214          * a failed node.  Mark these to be resent after recovery.  Unlock and
215          * cancel ops can just be completed.
216          */
217
218         dlm_recover_waiters_pre(ls);
219
220         if (dlm_recovery_stopped(ls)) {
221                 error = -EINTR;
222                 goto fail_root_list;
223         }
224
225         if (neg || dlm_no_directory(ls)) {
226                 /*
227                  * Clear lkb's for departed nodes.
228                  */
229
230                 dlm_recover_purge(ls, &root_list);
231
232                 /*
233                  * Get new master nodeid's for rsb's that were mastered on
234                  * departed nodes.
235                  */
236
237                 error = dlm_recover_masters(ls, rv->seq, &root_list);
238                 if (error) {
239                         log_rinfo(ls, "dlm_recover_masters error %d", error);
240                         goto fail_root_list;
241                 }
242
243                 /*
244                  * Send our locks on remastered rsb's to the new masters.
245                  */
246
247                 error = dlm_recover_locks(ls, rv->seq, &root_list);
248                 if (error) {
249                         log_rinfo(ls, "dlm_recover_locks error %d", error);
250                         goto fail_root_list;
251                 }
252
253                 dlm_set_recover_status(ls, DLM_RS_LOCKS);
254
255                 error = dlm_recover_locks_wait(ls, rv->seq);
256                 if (error) {
257                         log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
258                         goto fail_root_list;
259                 }
260
261                 log_rinfo(ls, "dlm_recover_locks %u in",
262                           ls->ls_recover_locks_in);
263
264                 /*
265                  * Finalize state in master rsb's now that all locks can be
266                  * checked.  This includes conversion resolution and lvb
267                  * settings.
268                  */
269
270                 dlm_recover_rsbs(ls, &root_list);
271         } else {
272                 /*
273                  * Other lockspace members may be going through the "neg" steps
274                  * while also adding us to the lockspace, in which case they'll
275                  * be doing the recover_locks (RS_LOCKS) barrier.
276                  */
277                 dlm_set_recover_status(ls, DLM_RS_LOCKS);
278
279                 error = dlm_recover_locks_wait(ls, rv->seq);
280                 if (error) {
281                         log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
282                         goto fail_root_list;
283                 }
284         }
285
286         dlm_release_root_list(&root_list);
287
288         /*
289          * Purge directory-related requests that are saved in requestqueue.
290          * All dir requests from before recovery are invalid now due to the dir
291          * rebuild and will be resent by the requesting nodes.
292          */
293
294         dlm_purge_requestqueue(ls);
295
296         dlm_set_recover_status(ls, DLM_RS_DONE);
297
298         error = dlm_recover_done_wait(ls, rv->seq);
299         if (error) {
300                 log_rinfo(ls, "dlm_recover_done_wait error %d", error);
301                 goto fail;
302         }
303
304         dlm_clear_members_gone(ls);
305
306         dlm_callback_resume(ls);
307
308         error = enable_locking(ls, rv->seq);
309         if (error) {
310                 log_rinfo(ls, "enable_locking error %d", error);
311                 goto fail;
312         }
313
314         error = dlm_process_requestqueue(ls);
315         if (error) {
316                 log_rinfo(ls, "dlm_process_requestqueue error %d", error);
317                 goto fail;
318         }
319
320         error = dlm_recover_waiters_post(ls);
321         if (error) {
322                 log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
323                 goto fail;
324         }
325
326         dlm_recover_grant(ls);
327
328         log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
329                   (unsigned long long)rv->seq, ls->ls_generation,
330                   jiffies_to_msecs(jiffies - start));
331         mutex_unlock(&ls->ls_recoverd_active);
332
333         return 0;
334
335  fail_root_list:
336         dlm_release_root_list(&root_list);
337  fail:
338         mutex_unlock(&ls->ls_recoverd_active);
339
340         return error;
341 }
342
343 /* The dlm_ls_start() that created the rv we take here may already have been
344    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
345    flag set. */
346
347 static void do_ls_recovery(struct dlm_ls *ls)
348 {
349         struct dlm_recover *rv = NULL;
350         int error;
351
352         spin_lock_bh(&ls->ls_recover_lock);
353         rv = ls->ls_recover_args;
354         ls->ls_recover_args = NULL;
355         if (rv && ls->ls_recover_seq == rv->seq)
356                 clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
357         spin_unlock_bh(&ls->ls_recover_lock);
358
359         if (rv) {
360                 error = ls_recover(ls, rv);
361                 switch (error) {
362                 case 0:
363                         ls->ls_recovery_result = 0;
364                         complete(&ls->ls_recovery_done);
365
366                         dlm_lsop_recover_done(ls);
367                         break;
368                 case -EINTR:
369                         /* if recovery was interrupted -EINTR we wait for the next
370                          * ls_recover() iteration until it hopefully succeeds.
371                          */
372                         log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
373                                   __func__, (unsigned long long)rv->seq);
374                         break;
375                 default:
376                         log_rinfo(ls, "%s %llu error %d", __func__,
377                                   (unsigned long long)rv->seq, error);
378
379                         /* let new_lockspace() get aware of critical error */
380                         ls->ls_recovery_result = error;
381                         complete(&ls->ls_recovery_done);
382                         break;
383                 }
384
385                 kfree(rv->nodes);
386                 kfree(rv);
387         }
388 }
389
390 static int dlm_recoverd(void *arg)
391 {
392         struct dlm_ls *ls;
393
394         ls = dlm_find_lockspace_local(arg);
395         if (!ls) {
396                 log_print("dlm_recoverd: no lockspace %p", arg);
397                 return -1;
398         }
399
400         down_write(&ls->ls_in_recovery);
401         set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
402         wake_up(&ls->ls_recover_lock_wait);
403
404         while (1) {
405                 /*
406                  * We call kthread_should_stop() after set_current_state().
407                  * This is because it works correctly if kthread_stop() is
408                  * called just before set_current_state().
409                  */
410                 set_current_state(TASK_INTERRUPTIBLE);
411                 if (kthread_should_stop()) {
412                         set_current_state(TASK_RUNNING);
413                         break;
414                 }
415                 if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
416                     !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
417                         if (kthread_should_stop())
418                                 break;
419                         schedule();
420                 }
421                 set_current_state(TASK_RUNNING);
422
423                 if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
424                         down_write(&ls->ls_in_recovery);
425                         set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
426                         wake_up(&ls->ls_recover_lock_wait);
427                 }
428
429                 if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
430                         do_ls_recovery(ls);
431         }
432
433         if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
434                 up_write(&ls->ls_in_recovery);
435
436         dlm_put_lockspace(ls);
437         return 0;
438 }
439
440 int dlm_recoverd_start(struct dlm_ls *ls)
441 {
442         struct task_struct *p;
443         int error = 0;
444
445         p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
446         if (IS_ERR(p))
447                 error = PTR_ERR(p);
448         else
449                 ls->ls_recoverd_task = p;
450         return error;
451 }
452
453 void dlm_recoverd_stop(struct dlm_ls *ls)
454 {
455         kthread_stop(ls->ls_recoverd_task);
456 }
457
458 void dlm_recoverd_suspend(struct dlm_ls *ls)
459 {
460         wake_up(&ls->ls_wait_general);
461         mutex_lock(&ls->ls_recoverd_active);
462 }
463
464 void dlm_recoverd_resume(struct dlm_ls *ls)
465 {
466         mutex_unlock(&ls->ls_recoverd_active);
467 }
468
This page took 0.055374 seconds and 4 git commands to generate.