]> Git Repo - linux.git/blob - fs/dlm/lockspace.c
act_ct: prepare for stolen verdict coming from conntrack and nat engine
[linux.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35         ssize_t ret = len;
36         int n;
37         int rc = kstrtoint(buf, 0, &n);
38
39         if (rc)
40                 return rc;
41         ls = dlm_find_lockspace_local(ls->ls_local_handle);
42         if (!ls)
43                 return -EINVAL;
44
45         switch (n) {
46         case 0:
47                 dlm_ls_stop(ls);
48                 break;
49         case 1:
50                 dlm_ls_start(ls);
51                 break;
52         default:
53                 ret = -EINVAL;
54         }
55         dlm_put_lockspace(ls);
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63         if (rc)
64                 return rc;
65         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66         wake_up(&ls->ls_uevent_wait);
67         return len;
68 }
69
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79         if (rc)
80                 return rc;
81         return len;
82 }
83
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91         int val;
92         int rc = kstrtoint(buf, 0, &val);
93
94         if (rc)
95                 return rc;
96         if (val == 1)
97                 set_bit(LSFL_NODIR, &ls->ls_flags);
98         return len;
99 }
100
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103         uint32_t status = dlm_recover_status(ls);
104         return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111
112 struct dlm_attr {
113         struct attribute attr;
114         ssize_t (*show)(struct dlm_ls *, char *);
115         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117
118 static struct dlm_attr dlm_attr_control = {
119         .attr  = {.name = "control", .mode = S_IWUSR},
120         .store = dlm_control_store
121 };
122
123 static struct dlm_attr dlm_attr_event = {
124         .attr  = {.name = "event_done", .mode = S_IWUSR},
125         .store = dlm_event_store
126 };
127
128 static struct dlm_attr dlm_attr_id = {
129         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130         .show  = dlm_id_show,
131         .store = dlm_id_store
132 };
133
134 static struct dlm_attr dlm_attr_nodir = {
135         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136         .show  = dlm_nodir_show,
137         .store = dlm_nodir_store
138 };
139
140 static struct dlm_attr dlm_attr_recover_status = {
141         .attr  = {.name = "recover_status", .mode = S_IRUGO},
142         .show  = dlm_recover_status_show
143 };
144
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147         .show  = dlm_recover_nodeid_show
148 };
149
150 static struct attribute *dlm_attrs[] = {
151         &dlm_attr_control.attr,
152         &dlm_attr_event.attr,
153         &dlm_attr_id.attr,
154         &dlm_attr_nodir.attr,
155         &dlm_attr_recover_status.attr,
156         &dlm_attr_recover_nodeid.attr,
157         NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162                              char *buf)
163 {
164         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166         return a->show ? a->show(ls, buf) : 0;
167 }
168
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170                               const char *buf, size_t len)
171 {
172         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174         return a->store ? a->store(ls, buf, len) : len;
175 }
176
177 static void lockspace_kobj_release(struct kobject *k)
178 {
179         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
180         kfree(ls);
181 }
182
183 static const struct sysfs_ops dlm_attr_ops = {
184         .show  = dlm_attr_show,
185         .store = dlm_attr_store,
186 };
187
188 static struct kobj_type dlm_ktype = {
189         .default_groups = dlm_groups,
190         .sysfs_ops     = &dlm_attr_ops,
191         .release       = lockspace_kobj_release,
192 };
193
194 static struct kset *dlm_kset;
195
196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198         if (in)
199                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200         else
201                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202
203         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204
205         /* dlm_controld will see the uevent, do the necessary group management
206            and then write to sysfs to wake us */
207
208         wait_event(ls->ls_uevent_wait,
209                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210
211         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212
213         return ls->ls_uevent_result;
214 }
215
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218         const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219
220         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221         return 0;
222 }
223
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225         .uevent = dlm_uevent,
226 };
227
228 int __init dlm_lockspace_init(void)
229 {
230         ls_count = 0;
231         mutex_init(&ls_lock);
232         INIT_LIST_HEAD(&lslist);
233         spin_lock_init(&lslist_lock);
234
235         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236         if (!dlm_kset) {
237                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
238                 return -ENOMEM;
239         }
240         return 0;
241 }
242
243 void dlm_lockspace_exit(void)
244 {
245         kset_unregister(dlm_kset);
246 }
247
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250         struct dlm_ls *ls;
251
252         spin_lock_bh(&lslist_lock);
253
254         list_for_each_entry(ls, &lslist, ls_list) {
255                 if (ls->ls_global_id == id) {
256                         atomic_inc(&ls->ls_count);
257                         goto out;
258                 }
259         }
260         ls = NULL;
261  out:
262         spin_unlock_bh(&lslist_lock);
263         return ls;
264 }
265
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268         struct dlm_ls *ls;
269
270         spin_lock_bh(&lslist_lock);
271         list_for_each_entry(ls, &lslist, ls_list) {
272                 if (ls->ls_local_handle == lockspace) {
273                         atomic_inc(&ls->ls_count);
274                         goto out;
275                 }
276         }
277         ls = NULL;
278  out:
279         spin_unlock_bh(&lslist_lock);
280         return ls;
281 }
282
283 struct dlm_ls *dlm_find_lockspace_device(int minor)
284 {
285         struct dlm_ls *ls;
286
287         spin_lock_bh(&lslist_lock);
288         list_for_each_entry(ls, &lslist, ls_list) {
289                 if (ls->ls_device.minor == minor) {
290                         atomic_inc(&ls->ls_count);
291                         goto out;
292                 }
293         }
294         ls = NULL;
295  out:
296         spin_unlock_bh(&lslist_lock);
297         return ls;
298 }
299
300 void dlm_put_lockspace(struct dlm_ls *ls)
301 {
302         if (atomic_dec_and_test(&ls->ls_count))
303                 wake_up(&ls->ls_count_wait);
304 }
305
306 static void remove_lockspace(struct dlm_ls *ls)
307 {
308 retry:
309         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
310
311         spin_lock_bh(&lslist_lock);
312         if (atomic_read(&ls->ls_count) != 0) {
313                 spin_unlock_bh(&lslist_lock);
314                 goto retry;
315         }
316
317         WARN_ON(ls->ls_create_count != 0);
318         list_del(&ls->ls_list);
319         spin_unlock_bh(&lslist_lock);
320 }
321
322 static int threads_start(void)
323 {
324         int error;
325
326         /* Thread for sending/receiving messages for all lockspace's */
327         error = dlm_midcomms_start();
328         if (error)
329                 log_print("cannot start dlm midcomms %d", error);
330
331         return error;
332 }
333
334 static int new_lockspace(const char *name, const char *cluster,
335                          uint32_t flags, int lvblen,
336                          const struct dlm_lockspace_ops *ops, void *ops_arg,
337                          int *ops_result, dlm_lockspace_t **lockspace)
338 {
339         struct dlm_ls *ls;
340         int do_unreg = 0;
341         int namelen = strlen(name);
342         int error;
343
344         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
345                 return -EINVAL;
346
347         if (lvblen % 8)
348                 return -EINVAL;
349
350         if (!try_module_get(THIS_MODULE))
351                 return -EINVAL;
352
353         if (!dlm_user_daemon_available()) {
354                 log_print("dlm user daemon not available");
355                 error = -EUNATCH;
356                 goto out;
357         }
358
359         if (ops && ops_result) {
360                 if (!dlm_config.ci_recover_callbacks)
361                         *ops_result = -EOPNOTSUPP;
362                 else
363                         *ops_result = 0;
364         }
365
366         if (!cluster)
367                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
368                           dlm_config.ci_cluster_name);
369
370         if (dlm_config.ci_recover_callbacks && cluster &&
371             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
372                 log_print("dlm cluster name '%s' does not match "
373                           "the application cluster name '%s'",
374                           dlm_config.ci_cluster_name, cluster);
375                 error = -EBADR;
376                 goto out;
377         }
378
379         error = 0;
380
381         spin_lock_bh(&lslist_lock);
382         list_for_each_entry(ls, &lslist, ls_list) {
383                 WARN_ON(ls->ls_create_count <= 0);
384                 if (ls->ls_namelen != namelen)
385                         continue;
386                 if (memcmp(ls->ls_name, name, namelen))
387                         continue;
388                 if (flags & DLM_LSFL_NEWEXCL) {
389                         error = -EEXIST;
390                         break;
391                 }
392                 ls->ls_create_count++;
393                 *lockspace = ls;
394                 error = 1;
395                 break;
396         }
397         spin_unlock_bh(&lslist_lock);
398
399         if (error)
400                 goto out;
401
402         error = -ENOMEM;
403
404         ls = kzalloc(sizeof(*ls), GFP_NOFS);
405         if (!ls)
406                 goto out;
407         memcpy(ls->ls_name, name, namelen);
408         ls->ls_namelen = namelen;
409         ls->ls_lvblen = lvblen;
410         atomic_set(&ls->ls_count, 0);
411         init_waitqueue_head(&ls->ls_count_wait);
412         ls->ls_flags = 0;
413         ls->ls_scan_time = jiffies;
414
415         if (ops && dlm_config.ci_recover_callbacks) {
416                 ls->ls_ops = ops;
417                 ls->ls_ops_arg = ops_arg;
418         }
419
420         /* ls_exflags are forced to match among nodes, and we don't
421          * need to require all nodes to have some flags set
422          */
423         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
424
425         INIT_LIST_HEAD(&ls->ls_toss);
426         INIT_LIST_HEAD(&ls->ls_keep);
427         rwlock_init(&ls->ls_rsbtbl_lock);
428
429         error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
430         if (error)
431                 goto out_lsfree;
432
433         idr_init(&ls->ls_lkbidr);
434         rwlock_init(&ls->ls_lkbidr_lock);
435
436         INIT_LIST_HEAD(&ls->ls_waiters);
437         spin_lock_init(&ls->ls_waiters_lock);
438         INIT_LIST_HEAD(&ls->ls_orphans);
439         spin_lock_init(&ls->ls_orphans_lock);
440
441         INIT_LIST_HEAD(&ls->ls_new_rsb);
442         spin_lock_init(&ls->ls_new_rsb_spin);
443
444         INIT_LIST_HEAD(&ls->ls_nodes);
445         INIT_LIST_HEAD(&ls->ls_nodes_gone);
446         ls->ls_num_nodes = 0;
447         ls->ls_low_nodeid = 0;
448         ls->ls_total_weight = 0;
449         ls->ls_node_array = NULL;
450
451         memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
452         ls->ls_local_rsb.res_ls = ls;
453
454         ls->ls_debug_rsb_dentry = NULL;
455         ls->ls_debug_waiters_dentry = NULL;
456
457         init_waitqueue_head(&ls->ls_uevent_wait);
458         ls->ls_uevent_result = 0;
459         init_completion(&ls->ls_recovery_done);
460         ls->ls_recovery_result = -1;
461
462         spin_lock_init(&ls->ls_cb_lock);
463         INIT_LIST_HEAD(&ls->ls_cb_delay);
464
465         ls->ls_recoverd_task = NULL;
466         mutex_init(&ls->ls_recoverd_active);
467         spin_lock_init(&ls->ls_recover_lock);
468         spin_lock_init(&ls->ls_rcom_spin);
469         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
470         ls->ls_recover_status = 0;
471         ls->ls_recover_seq = get_random_u64();
472         ls->ls_recover_args = NULL;
473         init_rwsem(&ls->ls_in_recovery);
474         rwlock_init(&ls->ls_recv_active);
475         INIT_LIST_HEAD(&ls->ls_requestqueue);
476         rwlock_init(&ls->ls_requestqueue_lock);
477         spin_lock_init(&ls->ls_clear_proc_locks);
478
479         /* Due backwards compatibility with 3.1 we need to use maximum
480          * possible dlm message size to be sure the message will fit and
481          * not having out of bounds issues. However on sending side 3.2
482          * might send less.
483          */
484         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
485         if (!ls->ls_recover_buf) {
486                 error = -ENOMEM;
487                 goto out_lkbidr;
488         }
489
490         ls->ls_slot = 0;
491         ls->ls_num_slots = 0;
492         ls->ls_slots_size = 0;
493         ls->ls_slots = NULL;
494
495         INIT_LIST_HEAD(&ls->ls_recover_list);
496         spin_lock_init(&ls->ls_recover_list_lock);
497         idr_init(&ls->ls_recover_idr);
498         spin_lock_init(&ls->ls_recover_idr_lock);
499         ls->ls_recover_list_count = 0;
500         ls->ls_local_handle = ls;
501         init_waitqueue_head(&ls->ls_wait_general);
502         INIT_LIST_HEAD(&ls->ls_masters_list);
503         rwlock_init(&ls->ls_masters_lock);
504         INIT_LIST_HEAD(&ls->ls_dir_dump_list);
505         rwlock_init(&ls->ls_dir_dump_lock);
506
507         INIT_LIST_HEAD(&ls->ls_toss_q);
508         spin_lock_init(&ls->ls_toss_q_lock);
509         timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
510                     TIMER_DEFERRABLE);
511
512         spin_lock_bh(&lslist_lock);
513         ls->ls_create_count = 1;
514         list_add(&ls->ls_list, &lslist);
515         spin_unlock_bh(&lslist_lock);
516
517         if (flags & DLM_LSFL_FS) {
518                 error = dlm_callback_start(ls);
519                 if (error) {
520                         log_error(ls, "can't start dlm_callback %d", error);
521                         goto out_delist;
522                 }
523         }
524
525         init_waitqueue_head(&ls->ls_recover_lock_wait);
526
527         /*
528          * Once started, dlm_recoverd first looks for ls in lslist, then
529          * initializes ls_in_recovery as locked in "down" mode.  We need
530          * to wait for the wakeup from dlm_recoverd because in_recovery
531          * has to start out in down mode.
532          */
533
534         error = dlm_recoverd_start(ls);
535         if (error) {
536                 log_error(ls, "can't start dlm_recoverd %d", error);
537                 goto out_callback;
538         }
539
540         wait_event(ls->ls_recover_lock_wait,
541                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
542
543         /* let kobject handle freeing of ls if there's an error */
544         do_unreg = 1;
545
546         ls->ls_kobj.kset = dlm_kset;
547         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
548                                      "%s", ls->ls_name);
549         if (error)
550                 goto out_recoverd;
551         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
552
553         /* This uevent triggers dlm_controld in userspace to add us to the
554            group of nodes that are members of this lockspace (managed by the
555            cluster infrastructure.)  Once it's done that, it tells us who the
556            current lockspace members are (via configfs) and then tells the
557            lockspace to start running (via sysfs) in dlm_ls_start(). */
558
559         error = do_uevent(ls, 1);
560         if (error)
561                 goto out_recoverd;
562
563         /* wait until recovery is successful or failed */
564         wait_for_completion(&ls->ls_recovery_done);
565         error = ls->ls_recovery_result;
566         if (error)
567                 goto out_members;
568
569         dlm_create_debug_file(ls);
570
571         log_rinfo(ls, "join complete");
572         *lockspace = ls;
573         return 0;
574
575  out_members:
576         do_uevent(ls, 0);
577         dlm_clear_members(ls);
578         kfree(ls->ls_node_array);
579  out_recoverd:
580         dlm_recoverd_stop(ls);
581  out_callback:
582         dlm_callback_stop(ls);
583  out_delist:
584         spin_lock_bh(&lslist_lock);
585         list_del(&ls->ls_list);
586         spin_unlock_bh(&lslist_lock);
587         idr_destroy(&ls->ls_recover_idr);
588         kfree(ls->ls_recover_buf);
589  out_lkbidr:
590         idr_destroy(&ls->ls_lkbidr);
591         rhashtable_destroy(&ls->ls_rsbtbl);
592  out_lsfree:
593         if (do_unreg)
594                 kobject_put(&ls->ls_kobj);
595         else
596                 kfree(ls);
597  out:
598         module_put(THIS_MODULE);
599         return error;
600 }
601
602 static int __dlm_new_lockspace(const char *name, const char *cluster,
603                                uint32_t flags, int lvblen,
604                                const struct dlm_lockspace_ops *ops,
605                                void *ops_arg, int *ops_result,
606                                dlm_lockspace_t **lockspace)
607 {
608         int error = 0;
609
610         mutex_lock(&ls_lock);
611         if (!ls_count)
612                 error = threads_start();
613         if (error)
614                 goto out;
615
616         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
617                               ops_result, lockspace);
618         if (!error)
619                 ls_count++;
620         if (error > 0)
621                 error = 0;
622         if (!ls_count) {
623                 dlm_midcomms_shutdown();
624                 dlm_midcomms_stop();
625         }
626  out:
627         mutex_unlock(&ls_lock);
628         return error;
629 }
630
631 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
632                       int lvblen, const struct dlm_lockspace_ops *ops,
633                       void *ops_arg, int *ops_result,
634                       dlm_lockspace_t **lockspace)
635 {
636         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
637                                    ops, ops_arg, ops_result, lockspace);
638 }
639
640 int dlm_new_user_lockspace(const char *name, const char *cluster,
641                            uint32_t flags, int lvblen,
642                            const struct dlm_lockspace_ops *ops,
643                            void *ops_arg, int *ops_result,
644                            dlm_lockspace_t **lockspace)
645 {
646         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
647                                    ops_arg, ops_result, lockspace);
648 }
649
650 static int lkb_idr_is_local(int id, void *p, void *data)
651 {
652         struct dlm_lkb *lkb = p;
653
654         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
655 }
656
657 static int lkb_idr_is_any(int id, void *p, void *data)
658 {
659         return 1;
660 }
661
662 static int lkb_idr_free(int id, void *p, void *data)
663 {
664         struct dlm_lkb *lkb = p;
665
666         if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
667                 dlm_free_lvb(lkb->lkb_lvbptr);
668
669         dlm_free_lkb(lkb);
670         return 0;
671 }
672
673 /* NOTE: We check the lkbidr here rather than the resource table.
674    This is because there may be LKBs queued as ASTs that have been unlinked
675    from their RSBs and are pending deletion once the AST has been delivered */
676
677 static int lockspace_busy(struct dlm_ls *ls, int force)
678 {
679         int rv;
680
681         read_lock_bh(&ls->ls_lkbidr_lock);
682         if (force == 0) {
683                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
684         } else if (force == 1) {
685                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
686         } else {
687                 rv = 0;
688         }
689         read_unlock_bh(&ls->ls_lkbidr_lock);
690         return rv;
691 }
692
693 static void rhash_free_rsb(void *ptr, void *arg)
694 {
695         struct dlm_rsb *rsb = ptr;
696
697         dlm_free_rsb(rsb);
698 }
699
700 static int release_lockspace(struct dlm_ls *ls, int force)
701 {
702         struct dlm_rsb *rsb;
703         int busy, rv;
704
705         busy = lockspace_busy(ls, force);
706
707         spin_lock_bh(&lslist_lock);
708         if (ls->ls_create_count == 1) {
709                 if (busy) {
710                         rv = -EBUSY;
711                 } else {
712                         /* remove_lockspace takes ls off lslist */
713                         ls->ls_create_count = 0;
714                         rv = 0;
715                 }
716         } else if (ls->ls_create_count > 1) {
717                 rv = --ls->ls_create_count;
718         } else {
719                 rv = -EINVAL;
720         }
721         spin_unlock_bh(&lslist_lock);
722
723         if (rv) {
724                 log_debug(ls, "release_lockspace no remove %d", rv);
725                 return rv;
726         }
727
728         if (ls_count == 1)
729                 dlm_midcomms_version_wait();
730
731         dlm_device_deregister(ls);
732
733         if (force < 3 && dlm_user_daemon_available())
734                 do_uevent(ls, 0);
735
736         dlm_recoverd_stop(ls);
737
738         /* clear the LSFL_RUNNING flag to fast up
739          * time_shutdown_sync(), we don't care anymore
740          */
741         clear_bit(LSFL_RUNNING, &ls->ls_flags);
742         timer_shutdown_sync(&ls->ls_timer);
743
744         if (ls_count == 1) {
745                 dlm_clear_members(ls);
746                 dlm_midcomms_shutdown();
747         }
748
749         dlm_callback_stop(ls);
750
751         remove_lockspace(ls);
752
753         dlm_delete_debug_file(ls);
754
755         idr_destroy(&ls->ls_recover_idr);
756         kfree(ls->ls_recover_buf);
757
758         /*
759          * Free all lkb's in idr
760          */
761
762         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
763         idr_destroy(&ls->ls_lkbidr);
764
765         /*
766          * Free all rsb's on rsbtbl
767          */
768         rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
769
770         while (!list_empty(&ls->ls_new_rsb)) {
771                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
772                                        res_hashchain);
773                 list_del(&rsb->res_hashchain);
774                 dlm_free_rsb(rsb);
775         }
776
777         /*
778          * Free structures on any other lists
779          */
780
781         dlm_purge_requestqueue(ls);
782         kfree(ls->ls_recover_args);
783         dlm_clear_members(ls);
784         dlm_clear_members_gone(ls);
785         kfree(ls->ls_node_array);
786         log_rinfo(ls, "release_lockspace final free");
787         kobject_put(&ls->ls_kobj);
788         /* The ls structure will be freed when the kobject is done with */
789
790         module_put(THIS_MODULE);
791         return 0;
792 }
793
794 /*
795  * Called when a system has released all its locks and is not going to use the
796  * lockspace any longer.  We free everything we're managing for this lockspace.
797  * Remaining nodes will go through the recovery process as if we'd died.  The
798  * lockspace must continue to function as usual, participating in recoveries,
799  * until this returns.
800  *
801  * Force has 4 possible values:
802  * 0 - don't destroy lockspace if it has any LKBs
803  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804  * 2 - destroy lockspace regardless of LKBs
805  * 3 - destroy lockspace as part of a forced shutdown
806  */
807
808 int dlm_release_lockspace(void *lockspace, int force)
809 {
810         struct dlm_ls *ls;
811         int error;
812
813         ls = dlm_find_lockspace_local(lockspace);
814         if (!ls)
815                 return -EINVAL;
816         dlm_put_lockspace(ls);
817
818         mutex_lock(&ls_lock);
819         error = release_lockspace(ls, force);
820         if (!error)
821                 ls_count--;
822         if (!ls_count)
823                 dlm_midcomms_stop();
824         mutex_unlock(&ls_lock);
825
826         return error;
827 }
828
829 void dlm_stop_lockspaces(void)
830 {
831         struct dlm_ls *ls;
832         int count;
833
834  restart:
835         count = 0;
836         spin_lock_bh(&lslist_lock);
837         list_for_each_entry(ls, &lslist, ls_list) {
838                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
839                         count++;
840                         continue;
841                 }
842                 spin_unlock_bh(&lslist_lock);
843                 log_error(ls, "no userland control daemon, stopping lockspace");
844                 dlm_ls_stop(ls);
845                 goto restart;
846         }
847         spin_unlock_bh(&lslist_lock);
848
849         if (count)
850                 log_print("dlm user daemon left %d lockspaces", count);
851 }
This page took 0.079545 seconds and 4 git commands to generate.