]> Git Repo - linux.git/blob - fs/dlm/lockspace.c
dlm: use rsbtbl as resource directory
[linux.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84         int val = simple_strtoul(buf, NULL, 0);
85         if (val == 1)
86                 set_bit(LSFL_NODIR, &ls->ls_flags);
87         return len;
88 }
89
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
92         uint32_t status = dlm_recover_status(ls);
93         return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100
101 struct dlm_attr {
102         struct attribute attr;
103         ssize_t (*show)(struct dlm_ls *, char *);
104         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106
107 static struct dlm_attr dlm_attr_control = {
108         .attr  = {.name = "control", .mode = S_IWUSR},
109         .store = dlm_control_store
110 };
111
112 static struct dlm_attr dlm_attr_event = {
113         .attr  = {.name = "event_done", .mode = S_IWUSR},
114         .store = dlm_event_store
115 };
116
117 static struct dlm_attr dlm_attr_id = {
118         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119         .show  = dlm_id_show,
120         .store = dlm_id_store
121 };
122
123 static struct dlm_attr dlm_attr_nodir = {
124         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125         .show  = dlm_nodir_show,
126         .store = dlm_nodir_store
127 };
128
129 static struct dlm_attr dlm_attr_recover_status = {
130         .attr  = {.name = "recover_status", .mode = S_IRUGO},
131         .show  = dlm_recover_status_show
132 };
133
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136         .show  = dlm_recover_nodeid_show
137 };
138
139 static struct attribute *dlm_attrs[] = {
140         &dlm_attr_control.attr,
141         &dlm_attr_event.attr,
142         &dlm_attr_id.attr,
143         &dlm_attr_nodir.attr,
144         &dlm_attr_recover_status.attr,
145         &dlm_attr_recover_nodeid.attr,
146         NULL,
147 };
148
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150                              char *buf)
151 {
152         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154         return a->show ? a->show(ls, buf) : 0;
155 }
156
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158                               const char *buf, size_t len)
159 {
160         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162         return a->store ? a->store(ls, buf, len) : len;
163 }
164
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168         kfree(ls);
169 }
170
171 static const struct sysfs_ops dlm_attr_ops = {
172         .show  = dlm_attr_show,
173         .store = dlm_attr_store,
174 };
175
176 static struct kobj_type dlm_ktype = {
177         .default_attrs = dlm_attrs,
178         .sysfs_ops     = &dlm_attr_ops,
179         .release       = lockspace_kobj_release,
180 };
181
182 static struct kset *dlm_kset;
183
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186         int error;
187
188         if (in)
189                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190         else
191                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195         /* dlm_controld will see the uevent, do the necessary group management
196            and then write to sysfs to wake us */
197
198         error = wait_event_interruptible(ls->ls_uevent_wait,
199                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203         if (error)
204                 goto out;
205
206         error = ls->ls_uevent_result;
207  out:
208         if (error)
209                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210                           error, ls->ls_uevent_result);
211         return error;
212 }
213
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215                       struct kobj_uevent_env *env)
216 {
217         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220         return 0;
221 }
222
223 static struct kset_uevent_ops dlm_uevent_ops = {
224         .uevent = dlm_uevent,
225 };
226
227 int __init dlm_lockspace_init(void)
228 {
229         ls_count = 0;
230         mutex_init(&ls_lock);
231         INIT_LIST_HEAD(&lslist);
232         spin_lock_init(&lslist_lock);
233
234         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235         if (!dlm_kset) {
236                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237                 return -ENOMEM;
238         }
239         return 0;
240 }
241
242 void dlm_lockspace_exit(void)
243 {
244         kset_unregister(dlm_kset);
245 }
246
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249         struct dlm_ls *ls;
250
251         spin_lock(&lslist_lock);
252         list_for_each_entry(ls, &lslist, ls_list) {
253                 if (time_after_eq(jiffies, ls->ls_scan_time +
254                                             dlm_config.ci_scan_secs * HZ)) {
255                         spin_unlock(&lslist_lock);
256                         return ls;
257                 }
258         }
259         spin_unlock(&lslist_lock);
260         return NULL;
261 }
262
263 static int dlm_scand(void *data)
264 {
265         struct dlm_ls *ls;
266
267         while (!kthread_should_stop()) {
268                 ls = find_ls_to_scan();
269                 if (ls) {
270                         if (dlm_lock_recovery_try(ls)) {
271                                 ls->ls_scan_time = jiffies;
272                                 dlm_scan_rsbs(ls);
273                                 dlm_scan_timeout(ls);
274                                 dlm_scan_waiters(ls);
275                                 dlm_unlock_recovery(ls);
276                         } else {
277                                 ls->ls_scan_time += HZ;
278                         }
279                         continue;
280                 }
281                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282         }
283         return 0;
284 }
285
286 static int dlm_scand_start(void)
287 {
288         struct task_struct *p;
289         int error = 0;
290
291         p = kthread_run(dlm_scand, NULL, "dlm_scand");
292         if (IS_ERR(p))
293                 error = PTR_ERR(p);
294         else
295                 scand_task = p;
296         return error;
297 }
298
299 static void dlm_scand_stop(void)
300 {
301         kthread_stop(scand_task);
302 }
303
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305 {
306         struct dlm_ls *ls;
307
308         spin_lock(&lslist_lock);
309
310         list_for_each_entry(ls, &lslist, ls_list) {
311                 if (ls->ls_global_id == id) {
312                         ls->ls_count++;
313                         goto out;
314                 }
315         }
316         ls = NULL;
317  out:
318         spin_unlock(&lslist_lock);
319         return ls;
320 }
321
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323 {
324         struct dlm_ls *ls;
325
326         spin_lock(&lslist_lock);
327         list_for_each_entry(ls, &lslist, ls_list) {
328                 if (ls->ls_local_handle == lockspace) {
329                         ls->ls_count++;
330                         goto out;
331                 }
332         }
333         ls = NULL;
334  out:
335         spin_unlock(&lslist_lock);
336         return ls;
337 }
338
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
340 {
341         struct dlm_ls *ls;
342
343         spin_lock(&lslist_lock);
344         list_for_each_entry(ls, &lslist, ls_list) {
345                 if (ls->ls_device.minor == minor) {
346                         ls->ls_count++;
347                         goto out;
348                 }
349         }
350         ls = NULL;
351  out:
352         spin_unlock(&lslist_lock);
353         return ls;
354 }
355
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358         spin_lock(&lslist_lock);
359         ls->ls_count--;
360         spin_unlock(&lslist_lock);
361 }
362
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365         for (;;) {
366                 spin_lock(&lslist_lock);
367                 if (ls->ls_count == 0) {
368                         WARN_ON(ls->ls_create_count != 0);
369                         list_del(&ls->ls_list);
370                         spin_unlock(&lslist_lock);
371                         return;
372                 }
373                 spin_unlock(&lslist_lock);
374                 ssleep(1);
375         }
376 }
377
378 static int threads_start(void)
379 {
380         int error;
381
382         error = dlm_scand_start();
383         if (error) {
384                 log_print("cannot start dlm_scand thread %d", error);
385                 goto fail;
386         }
387
388         /* Thread for sending/receiving messages for all lockspace's */
389         error = dlm_lowcomms_start();
390         if (error) {
391                 log_print("cannot start dlm lowcomms %d", error);
392                 goto scand_fail;
393         }
394
395         return 0;
396
397  scand_fail:
398         dlm_scand_stop();
399  fail:
400         return error;
401 }
402
403 static void threads_stop(void)
404 {
405         dlm_scand_stop();
406         dlm_lowcomms_stop();
407 }
408
409 static int new_lockspace(const char *name, const char *cluster,
410                          uint32_t flags, int lvblen,
411                          const struct dlm_lockspace_ops *ops, void *ops_arg,
412                          int *ops_result, dlm_lockspace_t **lockspace)
413 {
414         struct dlm_ls *ls;
415         int i, size, error;
416         int do_unreg = 0;
417         int namelen = strlen(name);
418
419         if (namelen > DLM_LOCKSPACE_LEN)
420                 return -EINVAL;
421
422         if (!lvblen || (lvblen % 8))
423                 return -EINVAL;
424
425         if (!try_module_get(THIS_MODULE))
426                 return -EINVAL;
427
428         if (!dlm_user_daemon_available()) {
429                 log_print("dlm user daemon not available");
430                 error = -EUNATCH;
431                 goto out;
432         }
433
434         if (ops && ops_result) {
435                 if (!dlm_config.ci_recover_callbacks)
436                         *ops_result = -EOPNOTSUPP;
437                 else
438                         *ops_result = 0;
439         }
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name %s mismatch %s",
444                           dlm_config.ci_cluster_name, cluster);
445                 error = -EBADR;
446                 goto out;
447         }
448
449         error = 0;
450
451         spin_lock(&lslist_lock);
452         list_for_each_entry(ls, &lslist, ls_list) {
453                 WARN_ON(ls->ls_create_count <= 0);
454                 if (ls->ls_namelen != namelen)
455                         continue;
456                 if (memcmp(ls->ls_name, name, namelen))
457                         continue;
458                 if (flags & DLM_LSFL_NEWEXCL) {
459                         error = -EEXIST;
460                         break;
461                 }
462                 ls->ls_create_count++;
463                 *lockspace = ls;
464                 error = 1;
465                 break;
466         }
467         spin_unlock(&lslist_lock);
468
469         if (error)
470                 goto out;
471
472         error = -ENOMEM;
473
474         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475         if (!ls)
476                 goto out;
477         memcpy(ls->ls_name, name, namelen);
478         ls->ls_namelen = namelen;
479         ls->ls_lvblen = lvblen;
480         ls->ls_count = 0;
481         ls->ls_flags = 0;
482         ls->ls_scan_time = jiffies;
483
484         if (ops && dlm_config.ci_recover_callbacks) {
485                 ls->ls_ops = ops;
486                 ls->ls_ops_arg = ops_arg;
487         }
488
489         if (flags & DLM_LSFL_TIMEWARN)
490                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492         /* ls_exflags are forced to match among nodes, and we don't
493            need to require all nodes to have some flags set */
494         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495                                     DLM_LSFL_NEWEXCL));
496
497         size = dlm_config.ci_rsbtbl_size;
498         ls->ls_rsbtbl_size = size;
499
500         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501         if (!ls->ls_rsbtbl)
502                 goto out_lsfree;
503         for (i = 0; i < size; i++) {
504                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507         }
508
509         idr_init(&ls->ls_lkbidr);
510         spin_lock_init(&ls->ls_lkbidr_spin);
511
512         INIT_LIST_HEAD(&ls->ls_waiters);
513         mutex_init(&ls->ls_waiters_mutex);
514         INIT_LIST_HEAD(&ls->ls_orphans);
515         mutex_init(&ls->ls_orphans_mutex);
516         INIT_LIST_HEAD(&ls->ls_timeout);
517         mutex_init(&ls->ls_timeout_mutex);
518
519         INIT_LIST_HEAD(&ls->ls_new_rsb);
520         spin_lock_init(&ls->ls_new_rsb_spin);
521
522         INIT_LIST_HEAD(&ls->ls_nodes);
523         INIT_LIST_HEAD(&ls->ls_nodes_gone);
524         ls->ls_num_nodes = 0;
525         ls->ls_low_nodeid = 0;
526         ls->ls_total_weight = 0;
527         ls->ls_node_array = NULL;
528
529         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
530         ls->ls_stub_rsb.res_ls = ls;
531
532         ls->ls_debug_rsb_dentry = NULL;
533         ls->ls_debug_waiters_dentry = NULL;
534
535         init_waitqueue_head(&ls->ls_uevent_wait);
536         ls->ls_uevent_result = 0;
537         init_completion(&ls->ls_members_done);
538         ls->ls_members_result = -1;
539
540         mutex_init(&ls->ls_cb_mutex);
541         INIT_LIST_HEAD(&ls->ls_cb_delay);
542
543         ls->ls_recoverd_task = NULL;
544         mutex_init(&ls->ls_recoverd_active);
545         spin_lock_init(&ls->ls_recover_lock);
546         spin_lock_init(&ls->ls_rcom_spin);
547         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
548         ls->ls_recover_status = 0;
549         ls->ls_recover_seq = 0;
550         ls->ls_recover_args = NULL;
551         init_rwsem(&ls->ls_in_recovery);
552         init_rwsem(&ls->ls_recv_active);
553         INIT_LIST_HEAD(&ls->ls_requestqueue);
554         mutex_init(&ls->ls_requestqueue_mutex);
555         mutex_init(&ls->ls_clear_proc_locks);
556
557         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
558         if (!ls->ls_recover_buf)
559                 goto out_lkbfree;
560
561         ls->ls_slot = 0;
562         ls->ls_num_slots = 0;
563         ls->ls_slots_size = 0;
564         ls->ls_slots = NULL;
565
566         INIT_LIST_HEAD(&ls->ls_recover_list);
567         spin_lock_init(&ls->ls_recover_list_lock);
568         ls->ls_recover_list_count = 0;
569         ls->ls_local_handle = ls;
570         init_waitqueue_head(&ls->ls_wait_general);
571         INIT_LIST_HEAD(&ls->ls_root_list);
572         init_rwsem(&ls->ls_root_sem);
573
574         down_write(&ls->ls_in_recovery);
575
576         spin_lock(&lslist_lock);
577         ls->ls_create_count = 1;
578         list_add(&ls->ls_list, &lslist);
579         spin_unlock(&lslist_lock);
580
581         if (flags & DLM_LSFL_FS) {
582                 error = dlm_callback_start(ls);
583                 if (error) {
584                         log_error(ls, "can't start dlm_callback %d", error);
585                         goto out_delist;
586                 }
587         }
588
589         /* needs to find ls in lslist */
590         error = dlm_recoverd_start(ls);
591         if (error) {
592                 log_error(ls, "can't start dlm_recoverd %d", error);
593                 goto out_callback;
594         }
595
596         ls->ls_kobj.kset = dlm_kset;
597         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
598                                      "%s", ls->ls_name);
599         if (error)
600                 goto out_recoverd;
601         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
602
603         /* let kobject handle freeing of ls if there's an error */
604         do_unreg = 1;
605
606         /* This uevent triggers dlm_controld in userspace to add us to the
607            group of nodes that are members of this lockspace (managed by the
608            cluster infrastructure.)  Once it's done that, it tells us who the
609            current lockspace members are (via configfs) and then tells the
610            lockspace to start running (via sysfs) in dlm_ls_start(). */
611
612         error = do_uevent(ls, 1);
613         if (error)
614                 goto out_recoverd;
615
616         wait_for_completion(&ls->ls_members_done);
617         error = ls->ls_members_result;
618         if (error)
619                 goto out_members;
620
621         dlm_create_debug_file(ls);
622
623         log_debug(ls, "join complete");
624         *lockspace = ls;
625         return 0;
626
627  out_members:
628         do_uevent(ls, 0);
629         dlm_clear_members(ls);
630         kfree(ls->ls_node_array);
631  out_recoverd:
632         dlm_recoverd_stop(ls);
633  out_callback:
634         dlm_callback_stop(ls);
635  out_delist:
636         spin_lock(&lslist_lock);
637         list_del(&ls->ls_list);
638         spin_unlock(&lslist_lock);
639         kfree(ls->ls_recover_buf);
640  out_lkbfree:
641         idr_destroy(&ls->ls_lkbidr);
642         vfree(ls->ls_rsbtbl);
643  out_lsfree:
644         if (do_unreg)
645                 kobject_put(&ls->ls_kobj);
646         else
647                 kfree(ls);
648  out:
649         module_put(THIS_MODULE);
650         return error;
651 }
652
653 int dlm_new_lockspace(const char *name, const char *cluster,
654                       uint32_t flags, int lvblen,
655                       const struct dlm_lockspace_ops *ops, void *ops_arg,
656                       int *ops_result, dlm_lockspace_t **lockspace)
657 {
658         int error = 0;
659
660         mutex_lock(&ls_lock);
661         if (!ls_count)
662                 error = threads_start();
663         if (error)
664                 goto out;
665
666         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
667                               ops_result, lockspace);
668         if (!error)
669                 ls_count++;
670         if (error > 0)
671                 error = 0;
672         if (!ls_count)
673                 threads_stop();
674  out:
675         mutex_unlock(&ls_lock);
676         return error;
677 }
678
679 static int lkb_idr_is_local(int id, void *p, void *data)
680 {
681         struct dlm_lkb *lkb = p;
682
683         if (!lkb->lkb_nodeid)
684                 return 1;
685         return 0;
686 }
687
688 static int lkb_idr_is_any(int id, void *p, void *data)
689 {
690         return 1;
691 }
692
693 static int lkb_idr_free(int id, void *p, void *data)
694 {
695         struct dlm_lkb *lkb = p;
696
697         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
698                 dlm_free_lvb(lkb->lkb_lvbptr);
699
700         dlm_free_lkb(lkb);
701         return 0;
702 }
703
704 /* NOTE: We check the lkbidr here rather than the resource table.
705    This is because there may be LKBs queued as ASTs that have been unlinked
706    from their RSBs and are pending deletion once the AST has been delivered */
707
708 static int lockspace_busy(struct dlm_ls *ls, int force)
709 {
710         int rv;
711
712         spin_lock(&ls->ls_lkbidr_spin);
713         if (force == 0) {
714                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
715         } else if (force == 1) {
716                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
717         } else {
718                 rv = 0;
719         }
720         spin_unlock(&ls->ls_lkbidr_spin);
721         return rv;
722 }
723
724 static int release_lockspace(struct dlm_ls *ls, int force)
725 {
726         struct dlm_rsb *rsb;
727         struct rb_node *n;
728         int i, busy, rv;
729
730         busy = lockspace_busy(ls, force);
731
732         spin_lock(&lslist_lock);
733         if (ls->ls_create_count == 1) {
734                 if (busy) {
735                         rv = -EBUSY;
736                 } else {
737                         /* remove_lockspace takes ls off lslist */
738                         ls->ls_create_count = 0;
739                         rv = 0;
740                 }
741         } else if (ls->ls_create_count > 1) {
742                 rv = --ls->ls_create_count;
743         } else {
744                 rv = -EINVAL;
745         }
746         spin_unlock(&lslist_lock);
747
748         if (rv) {
749                 log_debug(ls, "release_lockspace no remove %d", rv);
750                 return rv;
751         }
752
753         dlm_device_deregister(ls);
754
755         if (force < 3 && dlm_user_daemon_available())
756                 do_uevent(ls, 0);
757
758         dlm_recoverd_stop(ls);
759
760         dlm_callback_stop(ls);
761
762         remove_lockspace(ls);
763
764         dlm_delete_debug_file(ls);
765
766         kfree(ls->ls_recover_buf);
767
768         /*
769          * Free all lkb's in idr
770          */
771
772         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
773         idr_remove_all(&ls->ls_lkbidr);
774         idr_destroy(&ls->ls_lkbidr);
775
776         /*
777          * Free all rsb's on rsbtbl[] lists
778          */
779
780         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
781                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
782                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
783                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
784                         dlm_free_rsb(rsb);
785                 }
786
787                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
788                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
789                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
790                         dlm_free_rsb(rsb);
791                 }
792         }
793
794         vfree(ls->ls_rsbtbl);
795
796         while (!list_empty(&ls->ls_new_rsb)) {
797                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
798                                        res_hashchain);
799                 list_del(&rsb->res_hashchain);
800                 dlm_free_rsb(rsb);
801         }
802
803         /*
804          * Free structures on any other lists
805          */
806
807         dlm_purge_requestqueue(ls);
808         kfree(ls->ls_recover_args);
809         dlm_clear_members(ls);
810         dlm_clear_members_gone(ls);
811         kfree(ls->ls_node_array);
812         log_debug(ls, "release_lockspace final free");
813         kobject_put(&ls->ls_kobj);
814         /* The ls structure will be freed when the kobject is done with */
815
816         module_put(THIS_MODULE);
817         return 0;
818 }
819
820 /*
821  * Called when a system has released all its locks and is not going to use the
822  * lockspace any longer.  We free everything we're managing for this lockspace.
823  * Remaining nodes will go through the recovery process as if we'd died.  The
824  * lockspace must continue to function as usual, participating in recoveries,
825  * until this returns.
826  *
827  * Force has 4 possible values:
828  * 0 - don't destroy locksapce if it has any LKBs
829  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
830  * 2 - destroy lockspace regardless of LKBs
831  * 3 - destroy lockspace as part of a forced shutdown
832  */
833
834 int dlm_release_lockspace(void *lockspace, int force)
835 {
836         struct dlm_ls *ls;
837         int error;
838
839         ls = dlm_find_lockspace_local(lockspace);
840         if (!ls)
841                 return -EINVAL;
842         dlm_put_lockspace(ls);
843
844         mutex_lock(&ls_lock);
845         error = release_lockspace(ls, force);
846         if (!error)
847                 ls_count--;
848         if (!ls_count)
849                 threads_stop();
850         mutex_unlock(&ls_lock);
851
852         return error;
853 }
854
855 void dlm_stop_lockspaces(void)
856 {
857         struct dlm_ls *ls;
858
859  restart:
860         spin_lock(&lslist_lock);
861         list_for_each_entry(ls, &lslist, ls_list) {
862                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
863                         continue;
864                 spin_unlock(&lslist_lock);
865                 log_error(ls, "no userland control daemon, stopping lockspace");
866                 dlm_ls_stop(ls);
867                 goto restart;
868         }
869         spin_unlock(&lslist_lock);
870 }
871
This page took 0.087137 seconds and 4 git commands to generate.