]> Git Repo - J-linux.git/blob - drivers/md/md-cluster.c
Merge tag 'vfs-6.13-rc7.fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs
[J-linux.git] / drivers / md / md-cluster.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * Copyright (C) 2015, SUSE
4  */
5
6
7 #include <linux/module.h>
8 #include <linux/kthread.h>
9 #include <linux/dlm.h>
10 #include <linux/sched.h>
11 #include <linux/raid/md_p.h>
12 #include "md.h"
13 #include "md-bitmap.h"
14 #include "md-cluster.h"
15
16 #define LVB_SIZE        64
17 #define NEW_DEV_TIMEOUT 5000
18 #define WAIT_DLM_LOCK_TIMEOUT (30 * HZ)
19
20 struct dlm_lock_resource {
21         dlm_lockspace_t *ls;
22         struct dlm_lksb lksb;
23         char *name; /* lock name. */
24         uint32_t flags; /* flags to pass to dlm_lock() */
25         wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
26         bool sync_locking_done;
27         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
28         struct mddev *mddev; /* pointing back to mddev. */
29         int mode;
30 };
31
32 struct resync_info {
33         __le64 lo;
34         __le64 hi;
35 };
36
37 /* md_cluster_info flags */
38 #define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
39 #define         MD_CLUSTER_SUSPEND_READ_BALANCING       2
40 #define         MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
41
42 /* Lock the send communication. This is done through
43  * bit manipulation as opposed to a mutex in order to
44  * accommodate lock and hold. See next comment.
45  */
46 #define         MD_CLUSTER_SEND_LOCK                    4
47 /* If cluster operations (such as adding a disk) must lock the
48  * communication channel, so as to perform extra operations
49  * (update metadata) and no other operation is allowed on the
50  * MD. Token needs to be locked and held until the operation
51  * completes witha md_update_sb(), which would eventually release
52  * the lock.
53  */
54 #define         MD_CLUSTER_SEND_LOCKED_ALREADY          5
55 /* We should receive message after node joined cluster and
56  * set up all the related infos such as bitmap and personality */
57 #define         MD_CLUSTER_ALREADY_IN_CLUSTER           6
58 #define         MD_CLUSTER_PENDING_RECV_EVENT           7
59 #define         MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD              8
60 #define         MD_CLUSTER_WAITING_FOR_SYNC             9
61
62 struct md_cluster_info {
63         struct mddev *mddev; /* the md device which md_cluster_info belongs to */
64         /* dlm lock space and resources for clustered raid. */
65         dlm_lockspace_t *lockspace;
66         int slot_number;
67         struct completion completion;
68         struct mutex recv_mutex;
69         struct dlm_lock_resource *bitmap_lockres;
70         struct dlm_lock_resource **other_bitmap_lockres;
71         struct dlm_lock_resource *resync_lockres;
72         struct list_head suspend_list;
73
74         spinlock_t suspend_lock;
75         /* record the region which write should be suspended */
76         sector_t suspend_lo;
77         sector_t suspend_hi;
78         int suspend_from; /* the slot which broadcast suspend_lo/hi */
79
80         struct md_thread __rcu *recovery_thread;
81         unsigned long recovery_map;
82         /* communication loc resources */
83         struct dlm_lock_resource *ack_lockres;
84         struct dlm_lock_resource *message_lockres;
85         struct dlm_lock_resource *token_lockres;
86         struct dlm_lock_resource *no_new_dev_lockres;
87         struct md_thread __rcu *recv_thread;
88         struct completion newdisk_completion;
89         wait_queue_head_t wait;
90         unsigned long state;
91         /* record the region in RESYNCING message */
92         sector_t sync_low;
93         sector_t sync_hi;
94 };
95
96 /* For compatibility, add the new msg_type at the end. */
97 enum msg_type {
98         METADATA_UPDATED = 0,
99         RESYNCING,
100         NEWDISK,
101         REMOVE,
102         RE_ADD,
103         BITMAP_NEEDS_SYNC,
104         CHANGE_CAPACITY,
105         BITMAP_RESIZE,
106         RESYNCING_START,
107 };
108
109 struct cluster_msg {
110         __le32 type;
111         __le32 slot;
112         /* TODO: Unionize this for smaller footprint */
113         __le64 low;
114         __le64 high;
115         char uuid[16];
116         __le32 raid_slot;
117 };
118
119 static void sync_ast(void *arg)
120 {
121         struct dlm_lock_resource *res;
122
123         res = arg;
124         res->sync_locking_done = true;
125         wake_up(&res->sync_locking);
126 }
127
128 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
129 {
130         int ret = 0;
131
132         ret = dlm_lock(res->ls, mode, &res->lksb,
133                         res->flags, res->name, strlen(res->name),
134                         0, sync_ast, res, res->bast);
135         if (ret)
136                 return ret;
137         ret = wait_event_timeout(res->sync_locking, res->sync_locking_done,
138                                 WAIT_DLM_LOCK_TIMEOUT);
139         res->sync_locking_done = false;
140         if (!ret) {
141                 pr_err("locking DLM '%s' timeout!\n", res->name);
142                 return -EBUSY;
143         }
144         if (res->lksb.sb_status == 0)
145                 res->mode = mode;
146         return res->lksb.sb_status;
147 }
148
149 static int dlm_unlock_sync(struct dlm_lock_resource *res)
150 {
151         return dlm_lock_sync(res, DLM_LOCK_NL);
152 }
153
154 /*
155  * An variation of dlm_lock_sync, which make lock request could
156  * be interrupted
157  */
158 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
159                                        struct mddev *mddev)
160 {
161         int ret = 0;
162
163         ret = dlm_lock(res->ls, mode, &res->lksb,
164                         res->flags, res->name, strlen(res->name),
165                         0, sync_ast, res, res->bast);
166         if (ret)
167                 return ret;
168
169         wait_event(res->sync_locking, res->sync_locking_done
170                                       || kthread_should_stop()
171                                       || test_bit(MD_CLOSING, &mddev->flags));
172         if (!res->sync_locking_done) {
173                 /*
174                  * the convert queue contains the lock request when request is
175                  * interrupted, and sync_ast could still be run, so need to
176                  * cancel the request and reset completion
177                  */
178                 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
179                         &res->lksb, res);
180                 res->sync_locking_done = false;
181                 if (unlikely(ret != 0))
182                         pr_info("failed to cancel previous lock request "
183                                  "%s return %d\n", res->name, ret);
184                 return -EPERM;
185         } else
186                 res->sync_locking_done = false;
187         if (res->lksb.sb_status == 0)
188                 res->mode = mode;
189         return res->lksb.sb_status;
190 }
191
192 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
193                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
194 {
195         struct dlm_lock_resource *res = NULL;
196         int ret, namelen;
197         struct md_cluster_info *cinfo = mddev->cluster_info;
198
199         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
200         if (!res)
201                 return NULL;
202         init_waitqueue_head(&res->sync_locking);
203         res->sync_locking_done = false;
204         res->ls = cinfo->lockspace;
205         res->mddev = mddev;
206         res->mode = DLM_LOCK_IV;
207         namelen = strlen(name);
208         res->name = kzalloc(namelen + 1, GFP_KERNEL);
209         if (!res->name) {
210                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
211                 goto out_err;
212         }
213         strscpy(res->name, name, namelen + 1);
214         if (with_lvb) {
215                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
216                 if (!res->lksb.sb_lvbptr) {
217                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
218                         goto out_err;
219                 }
220                 res->flags = DLM_LKF_VALBLK;
221         }
222
223         if (bastfn)
224                 res->bast = bastfn;
225
226         res->flags |= DLM_LKF_EXPEDITE;
227
228         ret = dlm_lock_sync(res, DLM_LOCK_NL);
229         if (ret) {
230                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
231                 goto out_err;
232         }
233         res->flags &= ~DLM_LKF_EXPEDITE;
234         res->flags |= DLM_LKF_CONVERT;
235
236         return res;
237 out_err:
238         kfree(res->lksb.sb_lvbptr);
239         kfree(res->name);
240         kfree(res);
241         return NULL;
242 }
243
244 static void lockres_free(struct dlm_lock_resource *res)
245 {
246         int ret = 0;
247
248         if (!res)
249                 return;
250
251         /*
252          * use FORCEUNLOCK flag, so we can unlock even the lock is on the
253          * waiting or convert queue
254          */
255         ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
256                 &res->lksb, res);
257         if (unlikely(ret != 0))
258                 pr_err("failed to unlock %s return %d\n", res->name, ret);
259         else
260                 wait_event(res->sync_locking, res->sync_locking_done);
261
262         kfree(res->name);
263         kfree(res->lksb.sb_lvbptr);
264         kfree(res);
265 }
266
267 static void add_resync_info(struct dlm_lock_resource *lockres,
268                             sector_t lo, sector_t hi)
269 {
270         struct resync_info *ri;
271
272         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
273         ri->lo = cpu_to_le64(lo);
274         ri->hi = cpu_to_le64(hi);
275 }
276
277 static int read_resync_info(struct mddev *mddev,
278                             struct dlm_lock_resource *lockres)
279 {
280         struct resync_info ri;
281         struct md_cluster_info *cinfo = mddev->cluster_info;
282         int ret = 0;
283
284         dlm_lock_sync(lockres, DLM_LOCK_CR);
285         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
286         if (le64_to_cpu(ri.hi) > 0) {
287                 cinfo->suspend_hi = le64_to_cpu(ri.hi);
288                 cinfo->suspend_lo = le64_to_cpu(ri.lo);
289                 ret = 1;
290         }
291         dlm_unlock_sync(lockres);
292         return ret;
293 }
294
295 static void recover_bitmaps(struct md_thread *thread)
296 {
297         struct mddev *mddev = thread->mddev;
298         struct md_cluster_info *cinfo = mddev->cluster_info;
299         struct dlm_lock_resource *bm_lockres;
300         char str[64];
301         int slot, ret;
302         sector_t lo, hi;
303
304         while (cinfo->recovery_map) {
305                 slot = fls64((u64)cinfo->recovery_map) - 1;
306
307                 snprintf(str, 64, "bitmap%04d", slot);
308                 bm_lockres = lockres_init(mddev, str, NULL, 1);
309                 if (!bm_lockres) {
310                         pr_err("md-cluster: Cannot initialize bitmaps\n");
311                         goto clear_bit;
312                 }
313
314                 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
315                 if (ret) {
316                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
317                                         str, ret);
318                         goto clear_bit;
319                 }
320                 ret = mddev->bitmap_ops->copy_from_slot(mddev, slot, &lo, &hi, true);
321                 if (ret) {
322                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
323                         goto clear_bit;
324                 }
325
326                 /* Clear suspend_area associated with the bitmap */
327                 spin_lock_irq(&cinfo->suspend_lock);
328                 cinfo->suspend_hi = 0;
329                 cinfo->suspend_lo = 0;
330                 cinfo->suspend_from = -1;
331                 spin_unlock_irq(&cinfo->suspend_lock);
332
333                 /* Kick off a reshape if needed */
334                 if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
335                     test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
336                     mddev->reshape_position != MaxSector)
337                         md_wakeup_thread(mddev->sync_thread);
338
339                 if (hi > 0) {
340                         if (lo < mddev->recovery_cp)
341                                 mddev->recovery_cp = lo;
342                         /* wake up thread to continue resync in case resync
343                          * is not finished */
344                         if (mddev->recovery_cp != MaxSector) {
345                                 /*
346                                  * clear the REMOTE flag since we will launch
347                                  * resync thread in current node.
348                                  */
349                                 clear_bit(MD_RESYNCING_REMOTE,
350                                           &mddev->recovery);
351                                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
352                                 md_wakeup_thread(mddev->thread);
353                         }
354                 }
355 clear_bit:
356                 lockres_free(bm_lockres);
357                 clear_bit(slot, &cinfo->recovery_map);
358         }
359 }
360
361 static void recover_prep(void *arg)
362 {
363         struct mddev *mddev = arg;
364         struct md_cluster_info *cinfo = mddev->cluster_info;
365         set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
366 }
367
368 static void __recover_slot(struct mddev *mddev, int slot)
369 {
370         struct md_cluster_info *cinfo = mddev->cluster_info;
371
372         set_bit(slot, &cinfo->recovery_map);
373         if (!cinfo->recovery_thread) {
374                 rcu_assign_pointer(cinfo->recovery_thread,
375                         md_register_thread(recover_bitmaps, mddev, "recover"));
376                 if (!cinfo->recovery_thread) {
377                         pr_warn("md-cluster: Could not create recovery thread\n");
378                         return;
379                 }
380         }
381         md_wakeup_thread(cinfo->recovery_thread);
382 }
383
384 static void recover_slot(void *arg, struct dlm_slot *slot)
385 {
386         struct mddev *mddev = arg;
387         struct md_cluster_info *cinfo = mddev->cluster_info;
388
389         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
390                         mddev->bitmap_info.cluster_name,
391                         slot->nodeid, slot->slot,
392                         cinfo->slot_number);
393         /* deduct one since dlm slot starts from one while the num of
394          * cluster-md begins with 0 */
395         __recover_slot(mddev, slot->slot - 1);
396 }
397
398 static void recover_done(void *arg, struct dlm_slot *slots,
399                 int num_slots, int our_slot,
400                 uint32_t generation)
401 {
402         struct mddev *mddev = arg;
403         struct md_cluster_info *cinfo = mddev->cluster_info;
404
405         cinfo->slot_number = our_slot;
406         /* completion is only need to be complete when node join cluster,
407          * it doesn't need to run during another node's failure */
408         if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
409                 complete(&cinfo->completion);
410                 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
411         }
412         clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
413 }
414
415 /* the ops is called when node join the cluster, and do lock recovery
416  * if node failure occurs */
417 static const struct dlm_lockspace_ops md_ls_ops = {
418         .recover_prep = recover_prep,
419         .recover_slot = recover_slot,
420         .recover_done = recover_done,
421 };
422
423 /*
424  * The BAST function for the ack lock resource
425  * This function wakes up the receive thread in
426  * order to receive and process the message.
427  */
428 static void ack_bast(void *arg, int mode)
429 {
430         struct dlm_lock_resource *res = arg;
431         struct md_cluster_info *cinfo = res->mddev->cluster_info;
432
433         if (mode == DLM_LOCK_EX) {
434                 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
435                         md_wakeup_thread(cinfo->recv_thread);
436                 else
437                         set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
438         }
439 }
440
441 static void remove_suspend_info(struct mddev *mddev, int slot)
442 {
443         struct md_cluster_info *cinfo = mddev->cluster_info;
444         mddev->pers->quiesce(mddev, 1);
445         spin_lock_irq(&cinfo->suspend_lock);
446         cinfo->suspend_hi = 0;
447         cinfo->suspend_lo = 0;
448         spin_unlock_irq(&cinfo->suspend_lock);
449         mddev->pers->quiesce(mddev, 0);
450 }
451
452 static void process_suspend_info(struct mddev *mddev,
453                 int slot, sector_t lo, sector_t hi)
454 {
455         struct md_cluster_info *cinfo = mddev->cluster_info;
456         struct mdp_superblock_1 *sb = NULL;
457         struct md_rdev *rdev;
458
459         if (!hi) {
460                 /*
461                  * clear the REMOTE flag since resync or recovery is finished
462                  * in remote node.
463                  */
464                 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
465                 remove_suspend_info(mddev, slot);
466                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
467                 clear_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
468                 md_wakeup_thread(mddev->thread);
469                 return;
470         }
471
472         rdev_for_each(rdev, mddev)
473                 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
474                         sb = page_address(rdev->sb_page);
475                         break;
476                 }
477
478         /*
479          * The bitmaps are not same for different nodes
480          * if RESYNCING is happening in one node, then
481          * the node which received the RESYNCING message
482          * probably will perform resync with the region
483          * [lo, hi] again, so we could reduce resync time
484          * a lot if we can ensure that the bitmaps among
485          * different nodes are match up well.
486          *
487          * sync_low/hi is used to record the region which
488          * arrived in the previous RESYNCING message,
489          *
490          * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
491          * and set RESYNC_MASK since  resync thread is running
492          * in another node, so we don't need to do the resync
493          * again with the same section.
494          *
495          * Skip md_bitmap_sync_with_cluster in case reshape
496          * happening, because reshaping region is small and
497          * we don't want to trigger lots of WARN.
498          */
499         if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
500                 mddev->bitmap_ops->sync_with_cluster(mddev, cinfo->sync_low,
501                                                      cinfo->sync_hi, lo, hi);
502         cinfo->sync_low = lo;
503         cinfo->sync_hi = hi;
504
505         mddev->pers->quiesce(mddev, 1);
506         spin_lock_irq(&cinfo->suspend_lock);
507         cinfo->suspend_from = slot;
508         cinfo->suspend_lo = lo;
509         cinfo->suspend_hi = hi;
510         spin_unlock_irq(&cinfo->suspend_lock);
511         mddev->pers->quiesce(mddev, 0);
512 }
513
514 static int process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
515 {
516         char disk_uuid[64];
517         struct md_cluster_info *cinfo = mddev->cluster_info;
518         char event_name[] = "EVENT=ADD_DEVICE";
519         char raid_slot[16];
520         char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
521         int len;
522         int res = 0;
523
524         len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
525         sprintf(disk_uuid + len, "%pU", cmsg->uuid);
526         snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
527         pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
528         init_completion(&cinfo->newdisk_completion);
529         set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
530         kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
531         if (!wait_for_completion_timeout(&cinfo->newdisk_completion,
532                                         NEW_DEV_TIMEOUT)) {
533                 pr_err("md-cluster(%s:%d): timeout on a new disk adding\n",
534                         __func__, __LINE__);
535                 res = -1;
536         }
537         clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
538         set_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
539         return res;
540 }
541
542
543 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
544 {
545         int got_lock = 0;
546         struct md_thread *thread;
547         struct md_cluster_info *cinfo = mddev->cluster_info;
548         mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
549
550         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
551
552         /* daemaon thread must exist */
553         thread = rcu_dereference_protected(mddev->thread, true);
554         wait_event(thread->wqueue,
555                    (got_lock = mddev_trylock(mddev)) ||
556                     test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
557         md_reload_sb(mddev, mddev->good_device_nr);
558         if (got_lock)
559                 mddev_unlock(mddev);
560 }
561
562 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
563 {
564         struct md_rdev *rdev;
565
566         rcu_read_lock();
567         rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
568         if (rdev) {
569                 set_bit(ClusterRemove, &rdev->flags);
570                 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
571                 md_wakeup_thread(mddev->thread);
572         }
573         else
574                 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
575                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
576         rcu_read_unlock();
577 }
578
579 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
580 {
581         struct md_rdev *rdev;
582
583         rcu_read_lock();
584         rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
585         if (rdev && test_bit(Faulty, &rdev->flags))
586                 clear_bit(Faulty, &rdev->flags);
587         else
588                 pr_warn("%s: %d Could not find disk(%d) which is faulty",
589                         __func__, __LINE__, le32_to_cpu(msg->raid_slot));
590         rcu_read_unlock();
591 }
592
593 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
594 {
595         int ret = 0;
596
597         if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
598                 "node %d received its own msg\n", le32_to_cpu(msg->slot)))
599                 return -1;
600         switch (le32_to_cpu(msg->type)) {
601         case METADATA_UPDATED:
602                 process_metadata_update(mddev, msg);
603                 break;
604         case CHANGE_CAPACITY:
605                 set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
606                 break;
607         case RESYNCING_START:
608                 clear_bit(MD_CLUSTER_WAITING_FOR_SYNC, &mddev->cluster_info->state);
609                 break;
610         case RESYNCING:
611                 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
612                 process_suspend_info(mddev, le32_to_cpu(msg->slot),
613                                      le64_to_cpu(msg->low),
614                                      le64_to_cpu(msg->high));
615                 break;
616         case NEWDISK:
617                 if (process_add_new_disk(mddev, msg))
618                         ret = -1;
619                 break;
620         case REMOVE:
621                 process_remove_disk(mddev, msg);
622                 break;
623         case RE_ADD:
624                 process_readd_disk(mddev, msg);
625                 break;
626         case BITMAP_NEEDS_SYNC:
627                 __recover_slot(mddev, le32_to_cpu(msg->slot));
628                 break;
629         case BITMAP_RESIZE:
630                 if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
631                         ret = mddev->bitmap_ops->resize(mddev,
632                                                         le64_to_cpu(msg->high),
633                                                         0, false);
634                 break;
635         default:
636                 ret = -1;
637                 pr_warn("%s:%d Received unknown message from %d\n",
638                         __func__, __LINE__, msg->slot);
639         }
640         return ret;
641 }
642
643 /*
644  * thread for receiving message
645  */
646 static void recv_daemon(struct md_thread *thread)
647 {
648         struct md_cluster_info *cinfo = thread->mddev->cluster_info;
649         struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
650         struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
651         struct cluster_msg msg;
652         int ret;
653
654         mutex_lock(&cinfo->recv_mutex);
655         /*get CR on Message*/
656         if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
657                 pr_err("md/raid1:failed to get CR on MESSAGE\n");
658                 mutex_unlock(&cinfo->recv_mutex);
659                 return;
660         }
661
662         /* read lvb and wake up thread to process this message_lockres */
663         memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
664         ret = process_recvd_msg(thread->mddev, &msg);
665         if (ret)
666                 goto out;
667
668         /*release CR on ack_lockres*/
669         ret = dlm_unlock_sync(ack_lockres);
670         if (unlikely(ret != 0))
671                 pr_info("unlock ack failed return %d\n", ret);
672         /*up-convert to PR on message_lockres*/
673         ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
674         if (unlikely(ret != 0))
675                 pr_info("lock PR on msg failed return %d\n", ret);
676         /*get CR on ack_lockres again*/
677         ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
678         if (unlikely(ret != 0))
679                 pr_info("lock CR on ack failed return %d\n", ret);
680 out:
681         /*release CR on message_lockres*/
682         ret = dlm_unlock_sync(message_lockres);
683         if (unlikely(ret != 0))
684                 pr_info("unlock msg failed return %d\n", ret);
685         mutex_unlock(&cinfo->recv_mutex);
686 }
687
688 /* lock_token()
689  * Takes the lock on the TOKEN lock resource so no other
690  * node can communicate while the operation is underway.
691  */
692 static int lock_token(struct md_cluster_info *cinfo)
693 {
694         int error;
695
696         error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
697         if (error) {
698                 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
699                                 __func__, __LINE__, error);
700         } else {
701                 /* Lock the receive sequence */
702                 mutex_lock(&cinfo->recv_mutex);
703         }
704         return error;
705 }
706
707 /* lock_comm()
708  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
709  */
710 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
711 {
712         int rv, set_bit = 0;
713         struct mddev *mddev = cinfo->mddev;
714
715         /*
716          * If resync thread run after raid1d thread, then process_metadata_update
717          * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
718          * since another node already got EX on Token and waiting the EX of Ack),
719          * so let resync wake up thread in case flag is set.
720          */
721         if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
722                                       &cinfo->state)) {
723                 rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
724                                               &cinfo->state);
725                 WARN_ON_ONCE(rv);
726                 md_wakeup_thread(mddev->thread);
727                 set_bit = 1;
728         }
729
730         wait_event(cinfo->wait,
731                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
732         rv = lock_token(cinfo);
733         if (set_bit)
734                 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
735         return rv;
736 }
737
738 static void unlock_comm(struct md_cluster_info *cinfo)
739 {
740         WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
741         mutex_unlock(&cinfo->recv_mutex);
742         dlm_unlock_sync(cinfo->token_lockres);
743         clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
744         wake_up(&cinfo->wait);
745 }
746
747 /* __sendmsg()
748  * This function performs the actual sending of the message. This function is
749  * usually called after performing the encompassing operation
750  * The function:
751  * 1. Grabs the message lockresource in EX mode
752  * 2. Copies the message to the message LVB
753  * 3. Downconverts message lockresource to CW
754  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
755  *    and the other nodes read the message. The thread will wait here until all other
756  *    nodes have released ack lock resource.
757  * 5. Downconvert ack lockresource to CR
758  */
759 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
760 {
761         int error, unlock_error;
762         int slot = cinfo->slot_number - 1;
763
764         cmsg->slot = cpu_to_le32(slot);
765         /*get EX on Message*/
766         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
767         if (error) {
768                 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
769                 return error;
770         }
771
772         memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
773                         sizeof(struct cluster_msg));
774         /*down-convert EX to CW on Message*/
775         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
776         if (error) {
777                 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
778                                 error);
779                 goto failed_ack;
780         }
781
782         /*up-convert CR to EX on Ack*/
783         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
784         if (error) {
785                 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
786                                 error);
787                 goto failed_ack;
788         }
789
790         /*down-convert EX to CR on Ack*/
791         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
792         if (error) {
793                 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
794                                 error);
795                 goto failed_ack;
796         }
797
798 failed_ack:
799         while ((unlock_error = dlm_unlock_sync(cinfo->message_lockres)))
800                 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
801                         unlock_error);
802
803         return error;
804 }
805
806 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
807                    bool mddev_locked)
808 {
809         int ret;
810
811         ret = lock_comm(cinfo, mddev_locked);
812         if (!ret) {
813                 ret = __sendmsg(cinfo, cmsg);
814                 unlock_comm(cinfo);
815         }
816         return ret;
817 }
818
819 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
820 {
821         struct md_cluster_info *cinfo = mddev->cluster_info;
822         int i, ret = 0;
823         struct dlm_lock_resource *bm_lockres;
824         char str[64];
825         sector_t lo, hi;
826
827
828         for (i = 0; i < total_slots; i++) {
829                 memset(str, '\0', 64);
830                 snprintf(str, 64, "bitmap%04d", i);
831                 bm_lockres = lockres_init(mddev, str, NULL, 1);
832                 if (!bm_lockres)
833                         return -ENOMEM;
834                 if (i == (cinfo->slot_number - 1)) {
835                         lockres_free(bm_lockres);
836                         continue;
837                 }
838
839                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
840                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
841                 if (ret == -EAGAIN) {
842                         if (read_resync_info(mddev, bm_lockres)) {
843                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
844                                                 __func__, __LINE__,
845                                         (unsigned long long) cinfo->suspend_lo,
846                                         (unsigned long long) cinfo->suspend_hi,
847                                         i);
848                                 cinfo->suspend_from = i;
849                         }
850                         ret = 0;
851                         lockres_free(bm_lockres);
852                         continue;
853                 }
854                 if (ret) {
855                         lockres_free(bm_lockres);
856                         goto out;
857                 }
858
859                 /* Read the disk bitmap sb and check if it needs recovery */
860                 ret = mddev->bitmap_ops->copy_from_slot(mddev, i, &lo, &hi, false);
861                 if (ret) {
862                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
863                         lockres_free(bm_lockres);
864                         continue;
865                 }
866                 if ((hi > 0) && (lo < mddev->recovery_cp)) {
867                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
868                         mddev->recovery_cp = lo;
869                         md_check_recovery(mddev);
870                 }
871
872                 lockres_free(bm_lockres);
873         }
874 out:
875         return ret;
876 }
877
878 static int join(struct mddev *mddev, int nodes)
879 {
880         struct md_cluster_info *cinfo;
881         int ret, ops_rv;
882         char str[64];
883
884         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
885         if (!cinfo)
886                 return -ENOMEM;
887
888         INIT_LIST_HEAD(&cinfo->suspend_list);
889         spin_lock_init(&cinfo->suspend_lock);
890         init_completion(&cinfo->completion);
891         set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
892         init_waitqueue_head(&cinfo->wait);
893         mutex_init(&cinfo->recv_mutex);
894
895         mddev->cluster_info = cinfo;
896         cinfo->mddev = mddev;
897
898         memset(str, 0, 64);
899         sprintf(str, "%pU", mddev->uuid);
900         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
901                                 DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev,
902                                 &ops_rv, &cinfo->lockspace);
903         if (ret)
904                 goto err;
905         wait_for_completion(&cinfo->completion);
906         if (nodes < cinfo->slot_number) {
907                 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
908                         cinfo->slot_number, nodes);
909                 ret = -ERANGE;
910                 goto err;
911         }
912         /* Initiate the communication resources */
913         ret = -ENOMEM;
914         rcu_assign_pointer(cinfo->recv_thread,
915                         md_register_thread(recv_daemon, mddev, "cluster_recv"));
916         if (!cinfo->recv_thread) {
917                 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
918                 goto err;
919         }
920         cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
921         if (!cinfo->message_lockres)
922                 goto err;
923         cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
924         if (!cinfo->token_lockres)
925                 goto err;
926         cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
927         if (!cinfo->no_new_dev_lockres)
928                 goto err;
929
930         ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
931         if (ret) {
932                 ret = -EAGAIN;
933                 pr_err("md-cluster: can't join cluster to avoid lock issue\n");
934                 goto err;
935         }
936         cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
937         if (!cinfo->ack_lockres) {
938                 ret = -ENOMEM;
939                 goto err;
940         }
941         /* get sync CR lock on ACK. */
942         if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
943                 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
944                                 ret);
945         dlm_unlock_sync(cinfo->token_lockres);
946         /* get sync CR lock on no-new-dev. */
947         if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
948                 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
949
950
951         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
952         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
953         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
954         if (!cinfo->bitmap_lockres) {
955                 ret = -ENOMEM;
956                 goto err;
957         }
958         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
959                 pr_err("Failed to get bitmap lock\n");
960                 ret = -EINVAL;
961                 goto err;
962         }
963
964         cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
965         if (!cinfo->resync_lockres) {
966                 ret = -ENOMEM;
967                 goto err;
968         }
969
970         return 0;
971 err:
972         set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
973         md_unregister_thread(mddev, &cinfo->recovery_thread);
974         md_unregister_thread(mddev, &cinfo->recv_thread);
975         lockres_free(cinfo->message_lockres);
976         lockres_free(cinfo->token_lockres);
977         lockres_free(cinfo->ack_lockres);
978         lockres_free(cinfo->no_new_dev_lockres);
979         lockres_free(cinfo->resync_lockres);
980         lockres_free(cinfo->bitmap_lockres);
981         if (cinfo->lockspace)
982                 dlm_release_lockspace(cinfo->lockspace, 2);
983         mddev->cluster_info = NULL;
984         kfree(cinfo);
985         return ret;
986 }
987
988 static void load_bitmaps(struct mddev *mddev, int total_slots)
989 {
990         struct md_cluster_info *cinfo = mddev->cluster_info;
991
992         /* load all the node's bitmap info for resync */
993         if (gather_all_resync_info(mddev, total_slots))
994                 pr_err("md-cluster: failed to gather all resyn infos\n");
995         set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
996         /* wake up recv thread in case something need to be handled */
997         if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
998                 md_wakeup_thread(cinfo->recv_thread);
999 }
1000
1001 static void resync_bitmap(struct mddev *mddev)
1002 {
1003         struct md_cluster_info *cinfo = mddev->cluster_info;
1004         struct cluster_msg cmsg = {0};
1005         int err;
1006
1007         cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
1008         err = sendmsg(cinfo, &cmsg, 1);
1009         if (err)
1010                 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
1011                         __func__, __LINE__, err);
1012 }
1013
1014 static void unlock_all_bitmaps(struct mddev *mddev);
1015 static int leave(struct mddev *mddev)
1016 {
1017         struct md_cluster_info *cinfo = mddev->cluster_info;
1018
1019         if (!cinfo)
1020                 return 0;
1021
1022         /*
1023          * BITMAP_NEEDS_SYNC message should be sent when node
1024          * is leaving the cluster with dirty bitmap, also we
1025          * can only deliver it when dlm connection is available.
1026          *
1027          * Also, we should send BITMAP_NEEDS_SYNC message in
1028          * case reshaping is interrupted.
1029          */
1030         if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
1031             (mddev->reshape_position != MaxSector &&
1032              test_bit(MD_CLOSING, &mddev->flags)))
1033                 resync_bitmap(mddev);
1034
1035         set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1036         md_unregister_thread(mddev, &cinfo->recovery_thread);
1037         md_unregister_thread(mddev, &cinfo->recv_thread);
1038         lockres_free(cinfo->message_lockres);
1039         lockres_free(cinfo->token_lockres);
1040         lockres_free(cinfo->ack_lockres);
1041         lockres_free(cinfo->no_new_dev_lockres);
1042         lockres_free(cinfo->resync_lockres);
1043         lockres_free(cinfo->bitmap_lockres);
1044         unlock_all_bitmaps(mddev);
1045         dlm_release_lockspace(cinfo->lockspace, 2);
1046         kfree(cinfo);
1047         return 0;
1048 }
1049
1050 /* slot_number(): Returns the MD slot number to use
1051  * DLM starts the slot numbers from 1, wheras cluster-md
1052  * wants the number to be from zero, so we deduct one
1053  */
1054 static int slot_number(struct mddev *mddev)
1055 {
1056         struct md_cluster_info *cinfo = mddev->cluster_info;
1057
1058         return cinfo->slot_number - 1;
1059 }
1060
1061 /*
1062  * Check if the communication is already locked, else lock the communication
1063  * channel.
1064  * If it is already locked, token is in EX mode, and hence lock_token()
1065  * should not be called.
1066  */
1067 static int metadata_update_start(struct mddev *mddev)
1068 {
1069         struct md_cluster_info *cinfo = mddev->cluster_info;
1070         int ret;
1071
1072         /*
1073          * metadata_update_start is always called with the protection of
1074          * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1075          */
1076         ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1077                                     &cinfo->state);
1078         WARN_ON_ONCE(ret);
1079         md_wakeup_thread(mddev->thread);
1080
1081         wait_event(cinfo->wait,
1082                    !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1083                    test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1084
1085         /* If token is already locked, return 0 */
1086         if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1087                 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1088                 return 0;
1089         }
1090
1091         ret = lock_token(cinfo);
1092         clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1093         return ret;
1094 }
1095
1096 static int metadata_update_finish(struct mddev *mddev)
1097 {
1098         struct md_cluster_info *cinfo = mddev->cluster_info;
1099         struct cluster_msg cmsg;
1100         struct md_rdev *rdev;
1101         int ret = 0;
1102         int raid_slot = -1;
1103
1104         memset(&cmsg, 0, sizeof(cmsg));
1105         cmsg.type = cpu_to_le32(METADATA_UPDATED);
1106         /* Pick up a good active device number to send.
1107          */
1108         rdev_for_each(rdev, mddev)
1109                 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1110                         raid_slot = rdev->desc_nr;
1111                         break;
1112                 }
1113         if (raid_slot >= 0) {
1114                 cmsg.raid_slot = cpu_to_le32(raid_slot);
1115                 ret = __sendmsg(cinfo, &cmsg);
1116         } else
1117                 pr_warn("md-cluster: No good device id found to send\n");
1118         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1119         unlock_comm(cinfo);
1120         return ret;
1121 }
1122
1123 static void metadata_update_cancel(struct mddev *mddev)
1124 {
1125         struct md_cluster_info *cinfo = mddev->cluster_info;
1126         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1127         unlock_comm(cinfo);
1128 }
1129
1130 static int update_bitmap_size(struct mddev *mddev, sector_t size)
1131 {
1132         struct md_cluster_info *cinfo = mddev->cluster_info;
1133         struct cluster_msg cmsg = {0};
1134         int ret;
1135
1136         cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1137         cmsg.high = cpu_to_le64(size);
1138         ret = sendmsg(cinfo, &cmsg, 0);
1139         if (ret)
1140                 pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1141                         __func__, __LINE__, ret);
1142         return ret;
1143 }
1144
1145 static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1146 {
1147         void *bitmap = mddev->bitmap;
1148         struct md_bitmap_stats stats;
1149         unsigned long my_pages;
1150         int i, rv;
1151
1152         rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1153         if (rv)
1154                 return rv;
1155
1156         my_pages = stats.pages;
1157         /*
1158          * We need to ensure all the nodes can grow to a larger
1159          * bitmap size before make the reshaping.
1160          */
1161         rv = update_bitmap_size(mddev, newsize);
1162         if (rv)
1163                 return rv;
1164
1165         for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1166                 struct dlm_lock_resource *bm_lockres;
1167                 char str[64];
1168
1169                 if (i == md_cluster_ops->slot_number(mddev))
1170                         continue;
1171
1172                 bitmap = mddev->bitmap_ops->get_from_slot(mddev, i);
1173                 if (IS_ERR(bitmap)) {
1174                         pr_err("can't get bitmap from slot %d\n", i);
1175                         bitmap = NULL;
1176                         goto out;
1177                 }
1178
1179                 rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1180                 if (rv)
1181                         goto out;
1182                 /*
1183                  * If we can hold the bitmap lock of one node then
1184                  * the slot is not occupied, update the pages.
1185                  */
1186                 snprintf(str, 64, "bitmap%04d", i);
1187                 bm_lockres = lockres_init(mddev, str, NULL, 1);
1188                 if (!bm_lockres) {
1189                         pr_err("Cannot initialize %s lock\n", str);
1190                         goto out;
1191                 }
1192                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
1193                 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1194                 if (!rv)
1195                         mddev->bitmap_ops->set_pages(bitmap, my_pages);
1196                 lockres_free(bm_lockres);
1197
1198                 if (my_pages != stats.pages)
1199                         /*
1200                          * Let's revert the bitmap size if one node
1201                          * can't resize bitmap
1202                          */
1203                         goto out;
1204                 mddev->bitmap_ops->free(bitmap);
1205         }
1206
1207         return 0;
1208 out:
1209         mddev->bitmap_ops->free(bitmap);
1210         update_bitmap_size(mddev, oldsize);
1211         return -1;
1212 }
1213
1214 /*
1215  * return 0 if all the bitmaps have the same sync_size
1216  */
1217 static int cluster_check_sync_size(struct mddev *mddev)
1218 {
1219         int current_slot = md_cluster_ops->slot_number(mddev);
1220         int node_num = mddev->bitmap_info.nodes;
1221         struct dlm_lock_resource *bm_lockres;
1222         struct md_bitmap_stats stats;
1223         void *bitmap = mddev->bitmap;
1224         unsigned long sync_size = 0;
1225         unsigned long my_sync_size;
1226         char str[64];
1227         int i, rv;
1228
1229         rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1230         if (rv)
1231                 return rv;
1232
1233         my_sync_size = stats.sync_size;
1234
1235         for (i = 0; i < node_num; i++) {
1236                 if (i == current_slot)
1237                         continue;
1238
1239                 bitmap = mddev->bitmap_ops->get_from_slot(mddev, i);
1240                 if (IS_ERR(bitmap)) {
1241                         pr_err("can't get bitmap from slot %d\n", i);
1242                         return -1;
1243                 }
1244
1245                 /*
1246                  * If we can hold the bitmap lock of one node then
1247                  * the slot is not occupied, update the sb.
1248                  */
1249                 snprintf(str, 64, "bitmap%04d", i);
1250                 bm_lockres = lockres_init(mddev, str, NULL, 1);
1251                 if (!bm_lockres) {
1252                         pr_err("md-cluster: Cannot initialize %s\n", str);
1253                         mddev->bitmap_ops->free(bitmap);
1254                         return -1;
1255                 }
1256                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
1257                 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1258                 if (!rv)
1259                         mddev->bitmap_ops->update_sb(bitmap);
1260                 lockres_free(bm_lockres);
1261
1262                 rv = mddev->bitmap_ops->get_stats(bitmap, &stats);
1263                 if (rv) {
1264                         mddev->bitmap_ops->free(bitmap);
1265                         return rv;
1266                 }
1267
1268                 if (sync_size == 0) {
1269                         sync_size = stats.sync_size;
1270                 } else if (sync_size != stats.sync_size) {
1271                         mddev->bitmap_ops->free(bitmap);
1272                         return -1;
1273                 }
1274                 mddev->bitmap_ops->free(bitmap);
1275         }
1276
1277         return (my_sync_size == sync_size) ? 0 : -1;
1278 }
1279
1280 /*
1281  * Update the size for cluster raid is a little more complex, we perform it
1282  * by the steps:
1283  * 1. hold token lock and update superblock in initiator node.
1284  * 2. send METADATA_UPDATED msg to other nodes.
1285  * 3. The initiator node continues to check each bitmap's sync_size, if all
1286  *    bitmaps have the same value of sync_size, then we can set capacity and
1287  *    let other nodes to perform it. If one node can't update sync_size
1288  *    accordingly, we need to revert to previous value.
1289  */
1290 static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1291 {
1292         struct md_cluster_info *cinfo = mddev->cluster_info;
1293         struct cluster_msg cmsg;
1294         struct md_rdev *rdev;
1295         int ret = 0;
1296         int raid_slot = -1;
1297
1298         md_update_sb(mddev, 1);
1299         if (lock_comm(cinfo, 1)) {
1300                 pr_err("%s: lock_comm failed\n", __func__);
1301                 return;
1302         }
1303
1304         memset(&cmsg, 0, sizeof(cmsg));
1305         cmsg.type = cpu_to_le32(METADATA_UPDATED);
1306         rdev_for_each(rdev, mddev)
1307                 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1308                         raid_slot = rdev->desc_nr;
1309                         break;
1310                 }
1311         if (raid_slot >= 0) {
1312                 cmsg.raid_slot = cpu_to_le32(raid_slot);
1313                 /*
1314                  * We can only change capiticy after all the nodes can do it,
1315                  * so need to wait after other nodes already received the msg
1316                  * and handled the change
1317                  */
1318                 ret = __sendmsg(cinfo, &cmsg);
1319                 if (ret) {
1320                         pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1321                                __func__, __LINE__);
1322                         unlock_comm(cinfo);
1323                         return;
1324                 }
1325         } else {
1326                 pr_err("md-cluster: No good device id found to send\n");
1327                 unlock_comm(cinfo);
1328                 return;
1329         }
1330
1331         /*
1332          * check the sync_size from other node's bitmap, if sync_size
1333          * have already updated in other nodes as expected, send an
1334          * empty metadata msg to permit the change of capacity
1335          */
1336         if (cluster_check_sync_size(mddev) == 0) {
1337                 memset(&cmsg, 0, sizeof(cmsg));
1338                 cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1339                 ret = __sendmsg(cinfo, &cmsg);
1340                 if (ret)
1341                         pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1342                                __func__, __LINE__);
1343                 set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
1344         } else {
1345                 /* revert to previous sectors */
1346                 ret = mddev->pers->resize(mddev, old_dev_sectors);
1347                 ret = __sendmsg(cinfo, &cmsg);
1348                 if (ret)
1349                         pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1350                                __func__, __LINE__);
1351         }
1352         unlock_comm(cinfo);
1353 }
1354
1355 static int resync_start(struct mddev *mddev)
1356 {
1357         struct md_cluster_info *cinfo = mddev->cluster_info;
1358         return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1359 }
1360
1361 static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
1362 {
1363         struct md_cluster_info *cinfo = mddev->cluster_info;
1364
1365         spin_lock_irq(&cinfo->suspend_lock);
1366         *lo = cinfo->suspend_lo;
1367         *hi = cinfo->suspend_hi;
1368         spin_unlock_irq(&cinfo->suspend_lock);
1369 }
1370
1371 static int resync_status_get(struct mddev *mddev)
1372 {
1373         struct md_cluster_info *cinfo = mddev->cluster_info;
1374
1375         return test_bit(MD_CLUSTER_WAITING_FOR_SYNC, &cinfo->state);
1376 }
1377
1378 static int resync_start_notify(struct mddev *mddev)
1379 {
1380         struct md_cluster_info *cinfo = mddev->cluster_info;
1381         struct cluster_msg cmsg = {0};
1382
1383         cmsg.type = cpu_to_le32(RESYNCING_START);
1384
1385         return sendmsg(cinfo, &cmsg, 0);
1386 }
1387
1388 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1389 {
1390         struct md_cluster_info *cinfo = mddev->cluster_info;
1391         struct resync_info ri;
1392         struct cluster_msg cmsg = {0};
1393
1394         /* do not send zero again, if we have sent before */
1395         if (hi == 0) {
1396                 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1397                 if (le64_to_cpu(ri.hi) == 0)
1398                         return 0;
1399         }
1400
1401         add_resync_info(cinfo->bitmap_lockres, lo, hi);
1402         /* Re-acquire the lock to refresh LVB */
1403         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1404         cmsg.type = cpu_to_le32(RESYNCING);
1405         cmsg.low = cpu_to_le64(lo);
1406         cmsg.high = cpu_to_le64(hi);
1407
1408         /*
1409          * mddev_lock is held if resync_info_update is called from
1410          * resync_finish (md_reap_sync_thread -> resync_finish)
1411          */
1412         if (lo == 0 && hi == 0)
1413                 return sendmsg(cinfo, &cmsg, 1);
1414         else
1415                 return sendmsg(cinfo, &cmsg, 0);
1416 }
1417
1418 static int resync_finish(struct mddev *mddev)
1419 {
1420         struct md_cluster_info *cinfo = mddev->cluster_info;
1421         int ret = 0;
1422
1423         clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1424
1425         /*
1426          * If resync thread is interrupted so we can't say resync is finished,
1427          * another node will launch resync thread to continue.
1428          */
1429         if (!test_bit(MD_CLOSING, &mddev->flags))
1430                 ret = resync_info_update(mddev, 0, 0);
1431         dlm_unlock_sync(cinfo->resync_lockres);
1432         return ret;
1433 }
1434
1435 static int area_resyncing(struct mddev *mddev, int direction,
1436                 sector_t lo, sector_t hi)
1437 {
1438         struct md_cluster_info *cinfo = mddev->cluster_info;
1439         int ret = 0;
1440
1441         if ((direction == READ) &&
1442                 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1443                 return 1;
1444
1445         spin_lock_irq(&cinfo->suspend_lock);
1446         if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
1447                 ret = 1;
1448         spin_unlock_irq(&cinfo->suspend_lock);
1449         return ret;
1450 }
1451
1452 /* add_new_disk() - initiates a disk add
1453  * However, if this fails before writing md_update_sb(),
1454  * add_new_disk_cancel() must be called to release token lock
1455  */
1456 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1457 {
1458         struct md_cluster_info *cinfo = mddev->cluster_info;
1459         struct cluster_msg cmsg;
1460         int ret = 0;
1461         struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1462         char *uuid = sb->device_uuid;
1463
1464         memset(&cmsg, 0, sizeof(cmsg));
1465         cmsg.type = cpu_to_le32(NEWDISK);
1466         memcpy(cmsg.uuid, uuid, 16);
1467         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1468         if (lock_comm(cinfo, 1))
1469                 return -EAGAIN;
1470         ret = __sendmsg(cinfo, &cmsg);
1471         if (ret) {
1472                 unlock_comm(cinfo);
1473                 return ret;
1474         }
1475         cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1476         ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1477         cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1478         /* Some node does not "see" the device */
1479         if (ret == -EAGAIN)
1480                 ret = -ENOENT;
1481         if (ret)
1482                 unlock_comm(cinfo);
1483         else {
1484                 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1485                 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1486                  * will run soon after add_new_disk, the below path will be
1487                  * invoked:
1488                  *   md_wakeup_thread(mddev->thread)
1489                  *      -> conf->thread (raid1d)
1490                  *      -> md_check_recovery -> md_update_sb
1491                  *      -> metadata_update_start/finish
1492                  * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1493                  *
1494                  * For other failure cases, metadata_update_cancel and
1495                  * add_new_disk_cancel also clear below bit as well.
1496                  * */
1497                 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1498                 wake_up(&cinfo->wait);
1499         }
1500         return ret;
1501 }
1502
1503 static void add_new_disk_cancel(struct mddev *mddev)
1504 {
1505         struct md_cluster_info *cinfo = mddev->cluster_info;
1506         clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1507         unlock_comm(cinfo);
1508 }
1509
1510 static int new_disk_ack(struct mddev *mddev, bool ack)
1511 {
1512         struct md_cluster_info *cinfo = mddev->cluster_info;
1513
1514         if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1515                 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1516                 return -EINVAL;
1517         }
1518
1519         if (ack)
1520                 dlm_unlock_sync(cinfo->no_new_dev_lockres);
1521         complete(&cinfo->newdisk_completion);
1522         return 0;
1523 }
1524
1525 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1526 {
1527         struct cluster_msg cmsg = {0};
1528         struct md_cluster_info *cinfo = mddev->cluster_info;
1529         cmsg.type = cpu_to_le32(REMOVE);
1530         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1531         return sendmsg(cinfo, &cmsg, 1);
1532 }
1533
1534 static int lock_all_bitmaps(struct mddev *mddev)
1535 {
1536         int slot, my_slot, ret, held = 1, i = 0;
1537         char str[64];
1538         struct md_cluster_info *cinfo = mddev->cluster_info;
1539
1540         cinfo->other_bitmap_lockres =
1541                 kcalloc(mddev->bitmap_info.nodes - 1,
1542                         sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1543         if (!cinfo->other_bitmap_lockres) {
1544                 pr_err("md: can't alloc mem for other bitmap locks\n");
1545                 return 0;
1546         }
1547
1548         my_slot = slot_number(mddev);
1549         for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1550                 if (slot == my_slot)
1551                         continue;
1552
1553                 memset(str, '\0', 64);
1554                 snprintf(str, 64, "bitmap%04d", slot);
1555                 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1556                 if (!cinfo->other_bitmap_lockres[i])
1557                         return -ENOMEM;
1558
1559                 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1560                 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1561                 if (ret)
1562                         held = -1;
1563                 i++;
1564         }
1565
1566         return held;
1567 }
1568
1569 static void unlock_all_bitmaps(struct mddev *mddev)
1570 {
1571         struct md_cluster_info *cinfo = mddev->cluster_info;
1572         int i;
1573
1574         /* release other node's bitmap lock if they are existed */
1575         if (cinfo->other_bitmap_lockres) {
1576                 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1577                         if (cinfo->other_bitmap_lockres[i]) {
1578                                 lockres_free(cinfo->other_bitmap_lockres[i]);
1579                         }
1580                 }
1581                 kfree(cinfo->other_bitmap_lockres);
1582                 cinfo->other_bitmap_lockres = NULL;
1583         }
1584 }
1585
1586 static int gather_bitmaps(struct md_rdev *rdev)
1587 {
1588         int sn, err;
1589         sector_t lo, hi;
1590         struct cluster_msg cmsg = {0};
1591         struct mddev *mddev = rdev->mddev;
1592         struct md_cluster_info *cinfo = mddev->cluster_info;
1593
1594         cmsg.type = cpu_to_le32(RE_ADD);
1595         cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1596         err = sendmsg(cinfo, &cmsg, 1);
1597         if (err)
1598                 goto out;
1599
1600         for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1601                 if (sn == (cinfo->slot_number - 1))
1602                         continue;
1603                 err = mddev->bitmap_ops->copy_from_slot(mddev, sn, &lo, &hi, false);
1604                 if (err) {
1605                         pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1606                         goto out;
1607                 }
1608                 if ((hi > 0) && (lo < mddev->recovery_cp))
1609                         mddev->recovery_cp = lo;
1610         }
1611 out:
1612         return err;
1613 }
1614
1615 static const struct md_cluster_operations cluster_ops = {
1616         .join   = join,
1617         .leave  = leave,
1618         .slot_number = slot_number,
1619         .resync_start = resync_start,
1620         .resync_finish = resync_finish,
1621         .resync_info_update = resync_info_update,
1622         .resync_start_notify = resync_start_notify,
1623         .resync_status_get = resync_status_get,
1624         .resync_info_get = resync_info_get,
1625         .metadata_update_start = metadata_update_start,
1626         .metadata_update_finish = metadata_update_finish,
1627         .metadata_update_cancel = metadata_update_cancel,
1628         .area_resyncing = area_resyncing,
1629         .add_new_disk = add_new_disk,
1630         .add_new_disk_cancel = add_new_disk_cancel,
1631         .new_disk_ack = new_disk_ack,
1632         .remove_disk = remove_disk,
1633         .load_bitmaps = load_bitmaps,
1634         .gather_bitmaps = gather_bitmaps,
1635         .resize_bitmaps = resize_bitmaps,
1636         .lock_all_bitmaps = lock_all_bitmaps,
1637         .unlock_all_bitmaps = unlock_all_bitmaps,
1638         .update_size = update_size,
1639 };
1640
1641 static int __init cluster_init(void)
1642 {
1643         pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1644         pr_info("Registering Cluster MD functions\n");
1645         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1646         return 0;
1647 }
1648
1649 static void cluster_exit(void)
1650 {
1651         unregister_md_cluster_operations();
1652 }
1653
1654 module_init(cluster_init);
1655 module_exit(cluster_exit);
1656 MODULE_AUTHOR("SUSE");
1657 MODULE_LICENSE("GPL");
1658 MODULE_DESCRIPTION("Clustering support for MD");
This page took 0.122999 seconds and 4 git commands to generate.