]> Git Repo - linux.git/blob - fs/dlm/lockspace.c
mm: memcg/slab: stop setting page->mem_cgroup pointer for slab pages
[linux.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161
162 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163                              char *buf)
164 {
165         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167         return a->show ? a->show(ls, buf) : 0;
168 }
169
170 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171                               const char *buf, size_t len)
172 {
173         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175         return a->store ? a->store(ls, buf, len) : len;
176 }
177
178 static void lockspace_kobj_release(struct kobject *k)
179 {
180         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181         kfree(ls);
182 }
183
184 static const struct sysfs_ops dlm_attr_ops = {
185         .show  = dlm_attr_show,
186         .store = dlm_attr_store,
187 };
188
189 static struct kobj_type dlm_ktype = {
190         .default_attrs = dlm_attrs,
191         .sysfs_ops     = &dlm_attr_ops,
192         .release       = lockspace_kobj_release,
193 };
194
195 static struct kset *dlm_kset;
196
197 static int do_uevent(struct dlm_ls *ls, int in)
198 {
199         int error;
200
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         error = wait_event_interruptible(ls->ls_uevent_wait,
212                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215
216         if (error)
217                 goto out;
218
219         error = ls->ls_uevent_result;
220  out:
221         if (error)
222                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223                           error, ls->ls_uevent_result);
224         return error;
225 }
226
227 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228                       struct kobj_uevent_env *env)
229 {
230         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231
232         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233         return 0;
234 }
235
236 static const struct kset_uevent_ops dlm_uevent_ops = {
237         .uevent = dlm_uevent,
238 };
239
240 int __init dlm_lockspace_init(void)
241 {
242         ls_count = 0;
243         mutex_init(&ls_lock);
244         INIT_LIST_HEAD(&lslist);
245         spin_lock_init(&lslist_lock);
246
247         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248         if (!dlm_kset) {
249                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
250                 return -ENOMEM;
251         }
252         return 0;
253 }
254
255 void dlm_lockspace_exit(void)
256 {
257         kset_unregister(dlm_kset);
258 }
259
260 static struct dlm_ls *find_ls_to_scan(void)
261 {
262         struct dlm_ls *ls;
263
264         spin_lock(&lslist_lock);
265         list_for_each_entry(ls, &lslist, ls_list) {
266                 if (time_after_eq(jiffies, ls->ls_scan_time +
267                                             dlm_config.ci_scan_secs * HZ)) {
268                         spin_unlock(&lslist_lock);
269                         return ls;
270                 }
271         }
272         spin_unlock(&lslist_lock);
273         return NULL;
274 }
275
276 static int dlm_scand(void *data)
277 {
278         struct dlm_ls *ls;
279
280         while (!kthread_should_stop()) {
281                 ls = find_ls_to_scan();
282                 if (ls) {
283                         if (dlm_lock_recovery_try(ls)) {
284                                 ls->ls_scan_time = jiffies;
285                                 dlm_scan_rsbs(ls);
286                                 dlm_scan_timeout(ls);
287                                 dlm_scan_waiters(ls);
288                                 dlm_unlock_recovery(ls);
289                         } else {
290                                 ls->ls_scan_time += HZ;
291                         }
292                         continue;
293                 }
294                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295         }
296         return 0;
297 }
298
299 static int dlm_scand_start(void)
300 {
301         struct task_struct *p;
302         int error = 0;
303
304         p = kthread_run(dlm_scand, NULL, "dlm_scand");
305         if (IS_ERR(p))
306                 error = PTR_ERR(p);
307         else
308                 scand_task = p;
309         return error;
310 }
311
312 static void dlm_scand_stop(void)
313 {
314         kthread_stop(scand_task);
315 }
316
317 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318 {
319         struct dlm_ls *ls;
320
321         spin_lock(&lslist_lock);
322
323         list_for_each_entry(ls, &lslist, ls_list) {
324                 if (ls->ls_global_id == id) {
325                         ls->ls_count++;
326                         goto out;
327                 }
328         }
329         ls = NULL;
330  out:
331         spin_unlock(&lslist_lock);
332         return ls;
333 }
334
335 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336 {
337         struct dlm_ls *ls;
338
339         spin_lock(&lslist_lock);
340         list_for_each_entry(ls, &lslist, ls_list) {
341                 if (ls->ls_local_handle == lockspace) {
342                         ls->ls_count++;
343                         goto out;
344                 }
345         }
346         ls = NULL;
347  out:
348         spin_unlock(&lslist_lock);
349         return ls;
350 }
351
352 struct dlm_ls *dlm_find_lockspace_device(int minor)
353 {
354         struct dlm_ls *ls;
355
356         spin_lock(&lslist_lock);
357         list_for_each_entry(ls, &lslist, ls_list) {
358                 if (ls->ls_device.minor == minor) {
359                         ls->ls_count++;
360                         goto out;
361                 }
362         }
363         ls = NULL;
364  out:
365         spin_unlock(&lslist_lock);
366         return ls;
367 }
368
369 void dlm_put_lockspace(struct dlm_ls *ls)
370 {
371         spin_lock(&lslist_lock);
372         ls->ls_count--;
373         spin_unlock(&lslist_lock);
374 }
375
376 static void remove_lockspace(struct dlm_ls *ls)
377 {
378         for (;;) {
379                 spin_lock(&lslist_lock);
380                 if (ls->ls_count == 0) {
381                         WARN_ON(ls->ls_create_count != 0);
382                         list_del(&ls->ls_list);
383                         spin_unlock(&lslist_lock);
384                         return;
385                 }
386                 spin_unlock(&lslist_lock);
387                 ssleep(1);
388         }
389 }
390
391 static int threads_start(void)
392 {
393         int error;
394
395         error = dlm_scand_start();
396         if (error) {
397                 log_print("cannot start dlm_scand thread %d", error);
398                 goto fail;
399         }
400
401         /* Thread for sending/receiving messages for all lockspace's */
402         error = dlm_lowcomms_start();
403         if (error) {
404                 log_print("cannot start dlm lowcomms %d", error);
405                 goto scand_fail;
406         }
407
408         return 0;
409
410  scand_fail:
411         dlm_scand_stop();
412  fail:
413         return error;
414 }
415
416 static void threads_stop(void)
417 {
418         dlm_scand_stop();
419         dlm_lowcomms_stop();
420 }
421
422 static int new_lockspace(const char *name, const char *cluster,
423                          uint32_t flags, int lvblen,
424                          const struct dlm_lockspace_ops *ops, void *ops_arg,
425                          int *ops_result, dlm_lockspace_t **lockspace)
426 {
427         struct dlm_ls *ls;
428         int i, size, error;
429         int do_unreg = 0;
430         int namelen = strlen(name);
431
432         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
433                 return -EINVAL;
434
435         if (!lvblen || (lvblen % 8))
436                 return -EINVAL;
437
438         if (!try_module_get(THIS_MODULE))
439                 return -EINVAL;
440
441         if (!dlm_user_daemon_available()) {
442                 log_print("dlm user daemon not available");
443                 error = -EUNATCH;
444                 goto out;
445         }
446
447         if (ops && ops_result) {
448                 if (!dlm_config.ci_recover_callbacks)
449                         *ops_result = -EOPNOTSUPP;
450                 else
451                         *ops_result = 0;
452         }
453
454         if (!cluster)
455                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
456                           dlm_config.ci_cluster_name);
457
458         if (dlm_config.ci_recover_callbacks && cluster &&
459             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
460                 log_print("dlm cluster name '%s' does not match "
461                           "the application cluster name '%s'",
462                           dlm_config.ci_cluster_name, cluster);
463                 error = -EBADR;
464                 goto out;
465         }
466
467         error = 0;
468
469         spin_lock(&lslist_lock);
470         list_for_each_entry(ls, &lslist, ls_list) {
471                 WARN_ON(ls->ls_create_count <= 0);
472                 if (ls->ls_namelen != namelen)
473                         continue;
474                 if (memcmp(ls->ls_name, name, namelen))
475                         continue;
476                 if (flags & DLM_LSFL_NEWEXCL) {
477                         error = -EEXIST;
478                         break;
479                 }
480                 ls->ls_create_count++;
481                 *lockspace = ls;
482                 error = 1;
483                 break;
484         }
485         spin_unlock(&lslist_lock);
486
487         if (error)
488                 goto out;
489
490         error = -ENOMEM;
491
492         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
493         if (!ls)
494                 goto out;
495         memcpy(ls->ls_name, name, namelen);
496         ls->ls_namelen = namelen;
497         ls->ls_lvblen = lvblen;
498         ls->ls_count = 0;
499         ls->ls_flags = 0;
500         ls->ls_scan_time = jiffies;
501
502         if (ops && dlm_config.ci_recover_callbacks) {
503                 ls->ls_ops = ops;
504                 ls->ls_ops_arg = ops_arg;
505         }
506
507         if (flags & DLM_LSFL_TIMEWARN)
508                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
509
510         /* ls_exflags are forced to match among nodes, and we don't
511            need to require all nodes to have some flags set */
512         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
513                                     DLM_LSFL_NEWEXCL));
514
515         size = dlm_config.ci_rsbtbl_size;
516         ls->ls_rsbtbl_size = size;
517
518         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519         if (!ls->ls_rsbtbl)
520                 goto out_lsfree;
521         for (i = 0; i < size; i++) {
522                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
523                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
524                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
525         }
526
527         spin_lock_init(&ls->ls_remove_spin);
528
529         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
530                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
531                                                  GFP_KERNEL);
532                 if (!ls->ls_remove_names[i])
533                         goto out_rsbtbl;
534         }
535
536         idr_init(&ls->ls_lkbidr);
537         spin_lock_init(&ls->ls_lkbidr_spin);
538
539         INIT_LIST_HEAD(&ls->ls_waiters);
540         mutex_init(&ls->ls_waiters_mutex);
541         INIT_LIST_HEAD(&ls->ls_orphans);
542         mutex_init(&ls->ls_orphans_mutex);
543         INIT_LIST_HEAD(&ls->ls_timeout);
544         mutex_init(&ls->ls_timeout_mutex);
545
546         INIT_LIST_HEAD(&ls->ls_new_rsb);
547         spin_lock_init(&ls->ls_new_rsb_spin);
548
549         INIT_LIST_HEAD(&ls->ls_nodes);
550         INIT_LIST_HEAD(&ls->ls_nodes_gone);
551         ls->ls_num_nodes = 0;
552         ls->ls_low_nodeid = 0;
553         ls->ls_total_weight = 0;
554         ls->ls_node_array = NULL;
555
556         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
557         ls->ls_stub_rsb.res_ls = ls;
558
559         ls->ls_debug_rsb_dentry = NULL;
560         ls->ls_debug_waiters_dentry = NULL;
561
562         init_waitqueue_head(&ls->ls_uevent_wait);
563         ls->ls_uevent_result = 0;
564         init_completion(&ls->ls_members_done);
565         ls->ls_members_result = -1;
566
567         mutex_init(&ls->ls_cb_mutex);
568         INIT_LIST_HEAD(&ls->ls_cb_delay);
569
570         ls->ls_recoverd_task = NULL;
571         mutex_init(&ls->ls_recoverd_active);
572         spin_lock_init(&ls->ls_recover_lock);
573         spin_lock_init(&ls->ls_rcom_spin);
574         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
575         ls->ls_recover_status = 0;
576         ls->ls_recover_seq = 0;
577         ls->ls_recover_args = NULL;
578         init_rwsem(&ls->ls_in_recovery);
579         init_rwsem(&ls->ls_recv_active);
580         INIT_LIST_HEAD(&ls->ls_requestqueue);
581         mutex_init(&ls->ls_requestqueue_mutex);
582         mutex_init(&ls->ls_clear_proc_locks);
583
584         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
585         if (!ls->ls_recover_buf)
586                 goto out_lkbidr;
587
588         ls->ls_slot = 0;
589         ls->ls_num_slots = 0;
590         ls->ls_slots_size = 0;
591         ls->ls_slots = NULL;
592
593         INIT_LIST_HEAD(&ls->ls_recover_list);
594         spin_lock_init(&ls->ls_recover_list_lock);
595         idr_init(&ls->ls_recover_idr);
596         spin_lock_init(&ls->ls_recover_idr_lock);
597         ls->ls_recover_list_count = 0;
598         ls->ls_local_handle = ls;
599         init_waitqueue_head(&ls->ls_wait_general);
600         INIT_LIST_HEAD(&ls->ls_root_list);
601         init_rwsem(&ls->ls_root_sem);
602
603         spin_lock(&lslist_lock);
604         ls->ls_create_count = 1;
605         list_add(&ls->ls_list, &lslist);
606         spin_unlock(&lslist_lock);
607
608         if (flags & DLM_LSFL_FS) {
609                 error = dlm_callback_start(ls);
610                 if (error) {
611                         log_error(ls, "can't start dlm_callback %d", error);
612                         goto out_delist;
613                 }
614         }
615
616         init_waitqueue_head(&ls->ls_recover_lock_wait);
617
618         /*
619          * Once started, dlm_recoverd first looks for ls in lslist, then
620          * initializes ls_in_recovery as locked in "down" mode.  We need
621          * to wait for the wakeup from dlm_recoverd because in_recovery
622          * has to start out in down mode.
623          */
624
625         error = dlm_recoverd_start(ls);
626         if (error) {
627                 log_error(ls, "can't start dlm_recoverd %d", error);
628                 goto out_callback;
629         }
630
631         wait_event(ls->ls_recover_lock_wait,
632                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
633
634         ls->ls_kobj.kset = dlm_kset;
635         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
636                                      "%s", ls->ls_name);
637         if (error)
638                 goto out_recoverd;
639         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
640
641         /* let kobject handle freeing of ls if there's an error */
642         do_unreg = 1;
643
644         /* This uevent triggers dlm_controld in userspace to add us to the
645            group of nodes that are members of this lockspace (managed by the
646            cluster infrastructure.)  Once it's done that, it tells us who the
647            current lockspace members are (via configfs) and then tells the
648            lockspace to start running (via sysfs) in dlm_ls_start(). */
649
650         error = do_uevent(ls, 1);
651         if (error)
652                 goto out_recoverd;
653
654         wait_for_completion(&ls->ls_members_done);
655         error = ls->ls_members_result;
656         if (error)
657                 goto out_members;
658
659         dlm_create_debug_file(ls);
660
661         log_rinfo(ls, "join complete");
662         *lockspace = ls;
663         return 0;
664
665  out_members:
666         do_uevent(ls, 0);
667         dlm_clear_members(ls);
668         kfree(ls->ls_node_array);
669  out_recoverd:
670         dlm_recoverd_stop(ls);
671  out_callback:
672         dlm_callback_stop(ls);
673  out_delist:
674         spin_lock(&lslist_lock);
675         list_del(&ls->ls_list);
676         spin_unlock(&lslist_lock);
677         idr_destroy(&ls->ls_recover_idr);
678         kfree(ls->ls_recover_buf);
679  out_lkbidr:
680         idr_destroy(&ls->ls_lkbidr);
681  out_rsbtbl:
682         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
683                 kfree(ls->ls_remove_names[i]);
684         vfree(ls->ls_rsbtbl);
685  out_lsfree:
686         if (do_unreg)
687                 kobject_put(&ls->ls_kobj);
688         else
689                 kfree(ls);
690  out:
691         module_put(THIS_MODULE);
692         return error;
693 }
694
695 int dlm_new_lockspace(const char *name, const char *cluster,
696                       uint32_t flags, int lvblen,
697                       const struct dlm_lockspace_ops *ops, void *ops_arg,
698                       int *ops_result, dlm_lockspace_t **lockspace)
699 {
700         int error = 0;
701
702         mutex_lock(&ls_lock);
703         if (!ls_count)
704                 error = threads_start();
705         if (error)
706                 goto out;
707
708         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
709                               ops_result, lockspace);
710         if (!error)
711                 ls_count++;
712         if (error > 0)
713                 error = 0;
714         if (!ls_count)
715                 threads_stop();
716  out:
717         mutex_unlock(&ls_lock);
718         return error;
719 }
720
721 static int lkb_idr_is_local(int id, void *p, void *data)
722 {
723         struct dlm_lkb *lkb = p;
724
725         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
726 }
727
728 static int lkb_idr_is_any(int id, void *p, void *data)
729 {
730         return 1;
731 }
732
733 static int lkb_idr_free(int id, void *p, void *data)
734 {
735         struct dlm_lkb *lkb = p;
736
737         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
738                 dlm_free_lvb(lkb->lkb_lvbptr);
739
740         dlm_free_lkb(lkb);
741         return 0;
742 }
743
744 /* NOTE: We check the lkbidr here rather than the resource table.
745    This is because there may be LKBs queued as ASTs that have been unlinked
746    from their RSBs and are pending deletion once the AST has been delivered */
747
748 static int lockspace_busy(struct dlm_ls *ls, int force)
749 {
750         int rv;
751
752         spin_lock(&ls->ls_lkbidr_spin);
753         if (force == 0) {
754                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
755         } else if (force == 1) {
756                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
757         } else {
758                 rv = 0;
759         }
760         spin_unlock(&ls->ls_lkbidr_spin);
761         return rv;
762 }
763
764 static int release_lockspace(struct dlm_ls *ls, int force)
765 {
766         struct dlm_rsb *rsb;
767         struct rb_node *n;
768         int i, busy, rv;
769
770         busy = lockspace_busy(ls, force);
771
772         spin_lock(&lslist_lock);
773         if (ls->ls_create_count == 1) {
774                 if (busy) {
775                         rv = -EBUSY;
776                 } else {
777                         /* remove_lockspace takes ls off lslist */
778                         ls->ls_create_count = 0;
779                         rv = 0;
780                 }
781         } else if (ls->ls_create_count > 1) {
782                 rv = --ls->ls_create_count;
783         } else {
784                 rv = -EINVAL;
785         }
786         spin_unlock(&lslist_lock);
787
788         if (rv) {
789                 log_debug(ls, "release_lockspace no remove %d", rv);
790                 return rv;
791         }
792
793         dlm_device_deregister(ls);
794
795         if (force < 3 && dlm_user_daemon_available())
796                 do_uevent(ls, 0);
797
798         dlm_recoverd_stop(ls);
799
800         dlm_callback_stop(ls);
801
802         remove_lockspace(ls);
803
804         dlm_delete_debug_file(ls);
805
806         idr_destroy(&ls->ls_recover_idr);
807         kfree(ls->ls_recover_buf);
808
809         /*
810          * Free all lkb's in idr
811          */
812
813         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
814         idr_destroy(&ls->ls_lkbidr);
815
816         /*
817          * Free all rsb's on rsbtbl[] lists
818          */
819
820         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
821                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
822                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
823                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
824                         dlm_free_rsb(rsb);
825                 }
826
827                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
828                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
829                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
830                         dlm_free_rsb(rsb);
831                 }
832         }
833
834         vfree(ls->ls_rsbtbl);
835
836         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
837                 kfree(ls->ls_remove_names[i]);
838
839         while (!list_empty(&ls->ls_new_rsb)) {
840                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
841                                        res_hashchain);
842                 list_del(&rsb->res_hashchain);
843                 dlm_free_rsb(rsb);
844         }
845
846         /*
847          * Free structures on any other lists
848          */
849
850         dlm_purge_requestqueue(ls);
851         kfree(ls->ls_recover_args);
852         dlm_clear_members(ls);
853         dlm_clear_members_gone(ls);
854         kfree(ls->ls_node_array);
855         log_rinfo(ls, "release_lockspace final free");
856         kobject_put(&ls->ls_kobj);
857         /* The ls structure will be freed when the kobject is done with */
858
859         module_put(THIS_MODULE);
860         return 0;
861 }
862
863 /*
864  * Called when a system has released all its locks and is not going to use the
865  * lockspace any longer.  We free everything we're managing for this lockspace.
866  * Remaining nodes will go through the recovery process as if we'd died.  The
867  * lockspace must continue to function as usual, participating in recoveries,
868  * until this returns.
869  *
870  * Force has 4 possible values:
871  * 0 - don't destroy locksapce if it has any LKBs
872  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
873  * 2 - destroy lockspace regardless of LKBs
874  * 3 - destroy lockspace as part of a forced shutdown
875  */
876
877 int dlm_release_lockspace(void *lockspace, int force)
878 {
879         struct dlm_ls *ls;
880         int error;
881
882         ls = dlm_find_lockspace_local(lockspace);
883         if (!ls)
884                 return -EINVAL;
885         dlm_put_lockspace(ls);
886
887         mutex_lock(&ls_lock);
888         error = release_lockspace(ls, force);
889         if (!error)
890                 ls_count--;
891         if (!ls_count)
892                 threads_stop();
893         mutex_unlock(&ls_lock);
894
895         return error;
896 }
897
898 void dlm_stop_lockspaces(void)
899 {
900         struct dlm_ls *ls;
901         int count;
902
903  restart:
904         count = 0;
905         spin_lock(&lslist_lock);
906         list_for_each_entry(ls, &lslist, ls_list) {
907                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
908                         count++;
909                         continue;
910                 }
911                 spin_unlock(&lslist_lock);
912                 log_error(ls, "no userland control daemon, stopping lockspace");
913                 dlm_ls_stop(ls);
914                 goto restart;
915         }
916         spin_unlock(&lslist_lock);
917
918         if (count)
919                 log_print("dlm user daemon left %d lockspaces", count);
920 }
921
This page took 0.08705 seconds and 4 git commands to generate.