1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include <linux/module.h>
14 #include "dlm_internal.h"
15 #include "lockspace.h"
24 #include "requestqueue.h"
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 int rc = kstrtoint(buf, 0, &n);
41 ls = dlm_find_lockspace_local(ls->ls_local_handle);
55 dlm_put_lockspace(ls);
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 int rc = kstrtoint(buf, 0, &val);
97 set_bit(LSFL_NODIR, &ls->ls_flags);
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
123 static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
128 static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
131 .store = dlm_id_store
134 static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
140 static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
150 static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
159 ATTRIBUTE_GROUPS(dlm);
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
177 static void lockspace_kobj_release(struct kobject *k)
179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
183 static const struct sysfs_ops dlm_attr_ops = {
184 .show = dlm_attr_show,
185 .store = dlm_attr_store,
188 static struct kobj_type dlm_ktype = {
189 .default_groups = dlm_groups,
190 .sysfs_ops = &dlm_attr_ops,
191 .release = lockspace_kobj_release,
194 static struct kset *dlm_kset;
196 static int do_uevent(struct dlm_ls *ls, int in)
199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
205 /* dlm_controld will see the uevent, do the necessary group management
206 and then write to sysfs to wake us */
208 wait_event(ls->ls_uevent_wait,
209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
213 return ls->ls_uevent_result;
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225 .uevent = dlm_uevent,
228 int __init dlm_lockspace_init(void)
231 mutex_init(&ls_lock);
232 INIT_LIST_HEAD(&lslist);
233 spin_lock_init(&lslist_lock);
235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
237 printk(KERN_WARNING "%s: can not create kset\n", __func__);
243 void dlm_lockspace_exit(void)
245 kset_unregister(dlm_kset);
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
252 spin_lock_bh(&lslist_lock);
254 list_for_each_entry(ls, &lslist, ls_list) {
255 if (ls->ls_global_id == id) {
256 atomic_inc(&ls->ls_count);
262 spin_unlock_bh(&lslist_lock);
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
270 spin_lock_bh(&lslist_lock);
271 list_for_each_entry(ls, &lslist, ls_list) {
272 if (ls->ls_local_handle == lockspace) {
273 atomic_inc(&ls->ls_count);
279 spin_unlock_bh(&lslist_lock);
283 struct dlm_ls *dlm_find_lockspace_device(int minor)
287 spin_lock_bh(&lslist_lock);
288 list_for_each_entry(ls, &lslist, ls_list) {
289 if (ls->ls_device.minor == minor) {
290 atomic_inc(&ls->ls_count);
296 spin_unlock_bh(&lslist_lock);
300 void dlm_put_lockspace(struct dlm_ls *ls)
302 if (atomic_dec_and_test(&ls->ls_count))
303 wake_up(&ls->ls_count_wait);
306 static void remove_lockspace(struct dlm_ls *ls)
309 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
311 spin_lock_bh(&lslist_lock);
312 if (atomic_read(&ls->ls_count) != 0) {
313 spin_unlock_bh(&lslist_lock);
317 WARN_ON(ls->ls_create_count != 0);
318 list_del(&ls->ls_list);
319 spin_unlock_bh(&lslist_lock);
322 static int threads_start(void)
326 /* Thread for sending/receiving messages for all lockspace's */
327 error = dlm_midcomms_start();
329 log_print("cannot start dlm midcomms %d", error);
334 static int new_lockspace(const char *name, const char *cluster,
335 uint32_t flags, int lvblen,
336 const struct dlm_lockspace_ops *ops, void *ops_arg,
337 int *ops_result, dlm_lockspace_t **lockspace)
341 int namelen = strlen(name);
344 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
350 if (!try_module_get(THIS_MODULE))
353 if (!dlm_user_daemon_available()) {
354 log_print("dlm user daemon not available");
359 if (ops && ops_result) {
360 if (!dlm_config.ci_recover_callbacks)
361 *ops_result = -EOPNOTSUPP;
367 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
368 dlm_config.ci_cluster_name);
370 if (dlm_config.ci_recover_callbacks && cluster &&
371 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
372 log_print("dlm cluster name '%s' does not match "
373 "the application cluster name '%s'",
374 dlm_config.ci_cluster_name, cluster);
381 spin_lock_bh(&lslist_lock);
382 list_for_each_entry(ls, &lslist, ls_list) {
383 WARN_ON(ls->ls_create_count <= 0);
384 if (ls->ls_namelen != namelen)
386 if (memcmp(ls->ls_name, name, namelen))
388 if (flags & DLM_LSFL_NEWEXCL) {
392 ls->ls_create_count++;
397 spin_unlock_bh(&lslist_lock);
404 ls = kzalloc(sizeof(*ls), GFP_NOFS);
407 memcpy(ls->ls_name, name, namelen);
408 ls->ls_namelen = namelen;
409 ls->ls_lvblen = lvblen;
410 atomic_set(&ls->ls_count, 0);
411 init_waitqueue_head(&ls->ls_count_wait);
413 ls->ls_scan_time = jiffies;
415 if (ops && dlm_config.ci_recover_callbacks) {
417 ls->ls_ops_arg = ops_arg;
420 /* ls_exflags are forced to match among nodes, and we don't
421 * need to require all nodes to have some flags set
423 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
425 INIT_LIST_HEAD(&ls->ls_toss);
426 INIT_LIST_HEAD(&ls->ls_keep);
427 rwlock_init(&ls->ls_rsbtbl_lock);
429 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
433 idr_init(&ls->ls_lkbidr);
434 rwlock_init(&ls->ls_lkbidr_lock);
436 INIT_LIST_HEAD(&ls->ls_waiters);
437 spin_lock_init(&ls->ls_waiters_lock);
438 INIT_LIST_HEAD(&ls->ls_orphans);
439 spin_lock_init(&ls->ls_orphans_lock);
441 INIT_LIST_HEAD(&ls->ls_new_rsb);
442 spin_lock_init(&ls->ls_new_rsb_spin);
444 INIT_LIST_HEAD(&ls->ls_nodes);
445 INIT_LIST_HEAD(&ls->ls_nodes_gone);
446 ls->ls_num_nodes = 0;
447 ls->ls_low_nodeid = 0;
448 ls->ls_total_weight = 0;
449 ls->ls_node_array = NULL;
451 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
452 ls->ls_local_rsb.res_ls = ls;
454 ls->ls_debug_rsb_dentry = NULL;
455 ls->ls_debug_waiters_dentry = NULL;
457 init_waitqueue_head(&ls->ls_uevent_wait);
458 ls->ls_uevent_result = 0;
459 init_completion(&ls->ls_recovery_done);
460 ls->ls_recovery_result = -1;
462 spin_lock_init(&ls->ls_cb_lock);
463 INIT_LIST_HEAD(&ls->ls_cb_delay);
465 ls->ls_recoverd_task = NULL;
466 mutex_init(&ls->ls_recoverd_active);
467 spin_lock_init(&ls->ls_recover_lock);
468 spin_lock_init(&ls->ls_rcom_spin);
469 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
470 ls->ls_recover_status = 0;
471 ls->ls_recover_seq = get_random_u64();
472 ls->ls_recover_args = NULL;
473 init_rwsem(&ls->ls_in_recovery);
474 rwlock_init(&ls->ls_recv_active);
475 INIT_LIST_HEAD(&ls->ls_requestqueue);
476 rwlock_init(&ls->ls_requestqueue_lock);
477 spin_lock_init(&ls->ls_clear_proc_locks);
479 /* Due backwards compatibility with 3.1 we need to use maximum
480 * possible dlm message size to be sure the message will fit and
481 * not having out of bounds issues. However on sending side 3.2
484 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
485 if (!ls->ls_recover_buf) {
491 ls->ls_num_slots = 0;
492 ls->ls_slots_size = 0;
495 INIT_LIST_HEAD(&ls->ls_recover_list);
496 spin_lock_init(&ls->ls_recover_list_lock);
497 idr_init(&ls->ls_recover_idr);
498 spin_lock_init(&ls->ls_recover_idr_lock);
499 ls->ls_recover_list_count = 0;
500 ls->ls_local_handle = ls;
501 init_waitqueue_head(&ls->ls_wait_general);
502 INIT_LIST_HEAD(&ls->ls_masters_list);
503 rwlock_init(&ls->ls_masters_lock);
504 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
505 rwlock_init(&ls->ls_dir_dump_lock);
507 INIT_LIST_HEAD(&ls->ls_toss_q);
508 spin_lock_init(&ls->ls_toss_q_lock);
509 timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
512 spin_lock_bh(&lslist_lock);
513 ls->ls_create_count = 1;
514 list_add(&ls->ls_list, &lslist);
515 spin_unlock_bh(&lslist_lock);
517 if (flags & DLM_LSFL_FS) {
518 error = dlm_callback_start(ls);
520 log_error(ls, "can't start dlm_callback %d", error);
525 init_waitqueue_head(&ls->ls_recover_lock_wait);
528 * Once started, dlm_recoverd first looks for ls in lslist, then
529 * initializes ls_in_recovery as locked in "down" mode. We need
530 * to wait for the wakeup from dlm_recoverd because in_recovery
531 * has to start out in down mode.
534 error = dlm_recoverd_start(ls);
536 log_error(ls, "can't start dlm_recoverd %d", error);
540 wait_event(ls->ls_recover_lock_wait,
541 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
543 /* let kobject handle freeing of ls if there's an error */
546 ls->ls_kobj.kset = dlm_kset;
547 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
551 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
553 /* This uevent triggers dlm_controld in userspace to add us to the
554 group of nodes that are members of this lockspace (managed by the
555 cluster infrastructure.) Once it's done that, it tells us who the
556 current lockspace members are (via configfs) and then tells the
557 lockspace to start running (via sysfs) in dlm_ls_start(). */
559 error = do_uevent(ls, 1);
563 /* wait until recovery is successful or failed */
564 wait_for_completion(&ls->ls_recovery_done);
565 error = ls->ls_recovery_result;
569 dlm_create_debug_file(ls);
571 log_rinfo(ls, "join complete");
577 dlm_clear_members(ls);
578 kfree(ls->ls_node_array);
580 dlm_recoverd_stop(ls);
582 dlm_callback_stop(ls);
584 spin_lock_bh(&lslist_lock);
585 list_del(&ls->ls_list);
586 spin_unlock_bh(&lslist_lock);
587 idr_destroy(&ls->ls_recover_idr);
588 kfree(ls->ls_recover_buf);
590 idr_destroy(&ls->ls_lkbidr);
591 rhashtable_destroy(&ls->ls_rsbtbl);
594 kobject_put(&ls->ls_kobj);
598 module_put(THIS_MODULE);
602 static int __dlm_new_lockspace(const char *name, const char *cluster,
603 uint32_t flags, int lvblen,
604 const struct dlm_lockspace_ops *ops,
605 void *ops_arg, int *ops_result,
606 dlm_lockspace_t **lockspace)
610 mutex_lock(&ls_lock);
612 error = threads_start();
616 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
617 ops_result, lockspace);
623 dlm_midcomms_shutdown();
627 mutex_unlock(&ls_lock);
631 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
632 int lvblen, const struct dlm_lockspace_ops *ops,
633 void *ops_arg, int *ops_result,
634 dlm_lockspace_t **lockspace)
636 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
637 ops, ops_arg, ops_result, lockspace);
640 int dlm_new_user_lockspace(const char *name, const char *cluster,
641 uint32_t flags, int lvblen,
642 const struct dlm_lockspace_ops *ops,
643 void *ops_arg, int *ops_result,
644 dlm_lockspace_t **lockspace)
646 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
647 ops_arg, ops_result, lockspace);
650 static int lkb_idr_is_local(int id, void *p, void *data)
652 struct dlm_lkb *lkb = p;
654 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
657 static int lkb_idr_is_any(int id, void *p, void *data)
662 static int lkb_idr_free(int id, void *p, void *data)
664 struct dlm_lkb *lkb = p;
666 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
667 dlm_free_lvb(lkb->lkb_lvbptr);
673 /* NOTE: We check the lkbidr here rather than the resource table.
674 This is because there may be LKBs queued as ASTs that have been unlinked
675 from their RSBs and are pending deletion once the AST has been delivered */
677 static int lockspace_busy(struct dlm_ls *ls, int force)
681 read_lock_bh(&ls->ls_lkbidr_lock);
683 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
684 } else if (force == 1) {
685 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
689 read_unlock_bh(&ls->ls_lkbidr_lock);
693 static void rhash_free_rsb(void *ptr, void *arg)
695 struct dlm_rsb *rsb = ptr;
700 static int release_lockspace(struct dlm_ls *ls, int force)
705 busy = lockspace_busy(ls, force);
707 spin_lock_bh(&lslist_lock);
708 if (ls->ls_create_count == 1) {
712 /* remove_lockspace takes ls off lslist */
713 ls->ls_create_count = 0;
716 } else if (ls->ls_create_count > 1) {
717 rv = --ls->ls_create_count;
721 spin_unlock_bh(&lslist_lock);
724 log_debug(ls, "release_lockspace no remove %d", rv);
729 dlm_midcomms_version_wait();
731 dlm_device_deregister(ls);
733 if (force < 3 && dlm_user_daemon_available())
736 dlm_recoverd_stop(ls);
738 /* clear the LSFL_RUNNING flag to fast up
739 * time_shutdown_sync(), we don't care anymore
741 clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 timer_shutdown_sync(&ls->ls_timer);
745 dlm_clear_members(ls);
746 dlm_midcomms_shutdown();
749 dlm_callback_stop(ls);
751 remove_lockspace(ls);
753 dlm_delete_debug_file(ls);
755 idr_destroy(&ls->ls_recover_idr);
756 kfree(ls->ls_recover_buf);
759 * Free all lkb's in idr
762 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
763 idr_destroy(&ls->ls_lkbidr);
766 * Free all rsb's on rsbtbl
768 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
770 while (!list_empty(&ls->ls_new_rsb)) {
771 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
773 list_del(&rsb->res_hashchain);
778 * Free structures on any other lists
781 dlm_purge_requestqueue(ls);
782 kfree(ls->ls_recover_args);
783 dlm_clear_members(ls);
784 dlm_clear_members_gone(ls);
785 kfree(ls->ls_node_array);
786 log_rinfo(ls, "release_lockspace final free");
787 kobject_put(&ls->ls_kobj);
788 /* The ls structure will be freed when the kobject is done with */
790 module_put(THIS_MODULE);
795 * Called when a system has released all its locks and is not going to use the
796 * lockspace any longer. We free everything we're managing for this lockspace.
797 * Remaining nodes will go through the recovery process as if we'd died. The
798 * lockspace must continue to function as usual, participating in recoveries,
799 * until this returns.
801 * Force has 4 possible values:
802 * 0 - don't destroy lockspace if it has any LKBs
803 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804 * 2 - destroy lockspace regardless of LKBs
805 * 3 - destroy lockspace as part of a forced shutdown
808 int dlm_release_lockspace(void *lockspace, int force)
813 ls = dlm_find_lockspace_local(lockspace);
816 dlm_put_lockspace(ls);
818 mutex_lock(&ls_lock);
819 error = release_lockspace(ls, force);
824 mutex_unlock(&ls_lock);
829 void dlm_stop_lockspaces(void)
836 spin_lock_bh(&lslist_lock);
837 list_for_each_entry(ls, &lslist, ls_list) {
838 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
842 spin_unlock_bh(&lslist_lock);
843 log_error(ls, "no userland control daemon, stopping lockspace");
847 spin_unlock_bh(&lslist_lock);
850 log_print("dlm user daemon left %d lockspaces", count);