]> Git Repo - linux.git/blob - fs/dlm/lowcomms.c
filelock: Remove locks reliably when fcntl/close race is detected
[linux.git] / fs / dlm / lowcomms.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 /*
13  * lowcomms.c
14  *
15  * This is the "low-level" comms layer.
16  *
17  * It is responsible for sending/receiving messages
18  * from other nodes in the cluster.
19  *
20  * Cluster nodes are referred to by their nodeids. nodeids are
21  * simply 32 bit numbers to the locking module - if they need to
22  * be expanded for the cluster infrastructure then that is its
23  * responsibility. It is this layer's
24  * responsibility to resolve these into IP address or
25  * whatever it needs for inter-node communication.
26  *
27  * The comms level is two kernel threads that deal mainly with
28  * the receiving of messages from other nodes and passing them
29  * up to the mid-level comms layer (which understands the
30  * message format) for execution by the locking core, and
31  * a send thread which does all the setting up of connections
32  * to remote nodes and the sending of data. Threads are not allowed
33  * to send their own data because it may cause them to wait in times
34  * of high load. Also, this way, the sending thread can collect together
35  * messages bound for one node and send them in one block.
36  *
37  * lowcomms will choose to use either TCP or SCTP as its transport layer
38  * depending on the configuration variable 'protocol'. This should be set
39  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40  * cluster-wide mechanism as it must be the same on all nodes of the cluster
41  * for the DLM to function.
42  *
43  */
44
45 #include <asm/ioctls.h>
46 #include <net/sock.h>
47 #include <net/tcp.h>
48 #include <linux/pagemap.h>
49 #include <linux/file.h>
50 #include <linux/mutex.h>
51 #include <linux/sctp.h>
52 #include <linux/slab.h>
53 #include <net/sctp/sctp.h>
54 #include <net/ipv6.h>
55
56 #include <trace/events/dlm.h>
57 #include <trace/events/sock.h>
58
59 #include "dlm_internal.h"
60 #include "lowcomms.h"
61 #include "midcomms.h"
62 #include "memory.h"
63 #include "config.h"
64
65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66 #define DLM_MAX_PROCESS_BUFFERS 24
67 #define NEEDED_RMEM (4*1024*1024)
68
69 struct connection {
70         struct socket *sock;    /* NULL if not connected */
71         uint32_t nodeid;        /* So we know who we are in the list */
72         /* this semaphore is used to allow parallel recv/send in read
73          * lock mode. When we release a sock we need to held the write lock.
74          *
75          * However this is locking code and not nice. When we remove the
76          * othercon handling we can look into other mechanism to synchronize
77          * io handling to call sock_release() at the right time.
78          */
79         struct rw_semaphore sock_lock;
80         unsigned long flags;
81 #define CF_APP_LIMITED 0
82 #define CF_RECV_PENDING 1
83 #define CF_SEND_PENDING 2
84 #define CF_RECV_INTR 3
85 #define CF_IO_STOP 4
86 #define CF_IS_OTHERCON 5
87         struct list_head writequeue;  /* List of outgoing writequeue_entries */
88         spinlock_t writequeue_lock;
89         int retries;
90         struct hlist_node list;
91         /* due some connect()/accept() races we currently have this cross over
92          * connection attempt second connection for one node.
93          *
94          * There is a solution to avoid the race by introducing a connect
95          * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
96          * connect. Otherside can connect but will only be considered that
97          * the other side wants to have a reconnect.
98          *
99          * However changing to this behaviour will break backwards compatible.
100          * In a DLM protocol major version upgrade we should remove this!
101          */
102         struct connection *othercon;
103         struct work_struct rwork; /* receive worker */
104         struct work_struct swork; /* send worker */
105         wait_queue_head_t shutdown_wait;
106         unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
107         int rx_leftover;
108         int mark;
109         int addr_count;
110         int curr_addr_index;
111         struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
112         spinlock_t addrs_lock;
113         struct rcu_head rcu;
114 };
115 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
116
117 struct listen_connection {
118         struct socket *sock;
119         struct work_struct rwork;
120 };
121
122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
124
125 /* An entry waiting to be sent */
126 struct writequeue_entry {
127         struct list_head list;
128         struct page *page;
129         int offset;
130         int len;
131         int end;
132         int users;
133         bool dirty;
134         struct connection *con;
135         struct list_head msgs;
136         struct kref ref;
137 };
138
139 struct dlm_msg {
140         struct writequeue_entry *entry;
141         struct dlm_msg *orig_msg;
142         bool retransmit;
143         void *ppc;
144         int len;
145         int idx; /* new()/commit() idx exchange */
146
147         struct list_head list;
148         struct kref ref;
149 };
150
151 struct processqueue_entry {
152         unsigned char *buf;
153         int nodeid;
154         int buflen;
155
156         struct list_head list;
157 };
158
159 struct dlm_proto_ops {
160         bool try_new_addr;
161         const char *name;
162         int proto;
163
164         int (*connect)(struct connection *con, struct socket *sock,
165                        struct sockaddr *addr, int addr_len);
166         void (*sockopts)(struct socket *sock);
167         int (*bind)(struct socket *sock);
168         int (*listen_validate)(void);
169         void (*listen_sockopts)(struct socket *sock);
170         int (*listen_bind)(struct socket *sock);
171 };
172
173 static struct listen_sock_callbacks {
174         void (*sk_error_report)(struct sock *);
175         void (*sk_data_ready)(struct sock *);
176         void (*sk_state_change)(struct sock *);
177         void (*sk_write_space)(struct sock *);
178 } listen_sock;
179
180 static struct listen_connection listen_con;
181 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
182 static int dlm_local_count;
183
184 /* Work queues */
185 static struct workqueue_struct *io_workqueue;
186 static struct workqueue_struct *process_workqueue;
187
188 static struct hlist_head connection_hash[CONN_HASH_SIZE];
189 static DEFINE_SPINLOCK(connections_lock);
190 DEFINE_STATIC_SRCU(connections_srcu);
191
192 static const struct dlm_proto_ops *dlm_proto_ops;
193
194 #define DLM_IO_SUCCESS 0
195 #define DLM_IO_END 1
196 #define DLM_IO_EOF 2
197 #define DLM_IO_RESCHED 3
198 #define DLM_IO_FLUSH 4
199
200 static void process_recv_sockets(struct work_struct *work);
201 static void process_send_sockets(struct work_struct *work);
202 static void process_dlm_messages(struct work_struct *work);
203
204 static DECLARE_WORK(process_work, process_dlm_messages);
205 static DEFINE_SPINLOCK(processqueue_lock);
206 static bool process_dlm_messages_pending;
207 static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
208 static atomic_t processqueue_count;
209 static LIST_HEAD(processqueue);
210
211 bool dlm_lowcomms_is_running(void)
212 {
213         return !!listen_con.sock;
214 }
215
216 static void lowcomms_queue_swork(struct connection *con)
217 {
218         assert_spin_locked(&con->writequeue_lock);
219
220         if (!test_bit(CF_IO_STOP, &con->flags) &&
221             !test_bit(CF_APP_LIMITED, &con->flags) &&
222             !test_and_set_bit(CF_SEND_PENDING, &con->flags))
223                 queue_work(io_workqueue, &con->swork);
224 }
225
226 static void lowcomms_queue_rwork(struct connection *con)
227 {
228 #ifdef CONFIG_LOCKDEP
229         WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
230 #endif
231
232         if (!test_bit(CF_IO_STOP, &con->flags) &&
233             !test_and_set_bit(CF_RECV_PENDING, &con->flags))
234                 queue_work(io_workqueue, &con->rwork);
235 }
236
237 static void writequeue_entry_ctor(void *data)
238 {
239         struct writequeue_entry *entry = data;
240
241         INIT_LIST_HEAD(&entry->msgs);
242 }
243
244 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
245 {
246         return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
247                                  0, 0, writequeue_entry_ctor);
248 }
249
250 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
251 {
252         return KMEM_CACHE(dlm_msg, 0);
253 }
254
255 /* need to held writequeue_lock */
256 static struct writequeue_entry *con_next_wq(struct connection *con)
257 {
258         struct writequeue_entry *e;
259
260         e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
261                                      list);
262         /* if len is zero nothing is to send, if there are users filling
263          * buffers we wait until the users are done so we can send more.
264          */
265         if (!e || e->users || e->len == 0)
266                 return NULL;
267
268         return e;
269 }
270
271 static struct connection *__find_con(int nodeid, int r)
272 {
273         struct connection *con;
274
275         hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
276                 if (con->nodeid == nodeid)
277                         return con;
278         }
279
280         return NULL;
281 }
282
283 static void dlm_con_init(struct connection *con, int nodeid)
284 {
285         con->nodeid = nodeid;
286         init_rwsem(&con->sock_lock);
287         INIT_LIST_HEAD(&con->writequeue);
288         spin_lock_init(&con->writequeue_lock);
289         INIT_WORK(&con->swork, process_send_sockets);
290         INIT_WORK(&con->rwork, process_recv_sockets);
291         spin_lock_init(&con->addrs_lock);
292         init_waitqueue_head(&con->shutdown_wait);
293 }
294
295 /*
296  * If 'allocation' is zero then we don't attempt to create a new
297  * connection structure for this node.
298  */
299 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
300 {
301         struct connection *con, *tmp;
302         int r;
303
304         r = nodeid_hash(nodeid);
305         con = __find_con(nodeid, r);
306         if (con || !alloc)
307                 return con;
308
309         con = kzalloc(sizeof(*con), alloc);
310         if (!con)
311                 return NULL;
312
313         dlm_con_init(con, nodeid);
314
315         spin_lock(&connections_lock);
316         /* Because multiple workqueues/threads calls this function it can
317          * race on multiple cpu's. Instead of locking hot path __find_con()
318          * we just check in rare cases of recently added nodes again
319          * under protection of connections_lock. If this is the case we
320          * abort our connection creation and return the existing connection.
321          */
322         tmp = __find_con(nodeid, r);
323         if (tmp) {
324                 spin_unlock(&connections_lock);
325                 kfree(con);
326                 return tmp;
327         }
328
329         hlist_add_head_rcu(&con->list, &connection_hash[r]);
330         spin_unlock(&connections_lock);
331
332         return con;
333 }
334
335 static int addr_compare(const struct sockaddr_storage *x,
336                         const struct sockaddr_storage *y)
337 {
338         switch (x->ss_family) {
339         case AF_INET: {
340                 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
341                 struct sockaddr_in *siny = (struct sockaddr_in *)y;
342                 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
343                         return 0;
344                 if (sinx->sin_port != siny->sin_port)
345                         return 0;
346                 break;
347         }
348         case AF_INET6: {
349                 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
350                 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
351                 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
352                         return 0;
353                 if (sinx->sin6_port != siny->sin6_port)
354                         return 0;
355                 break;
356         }
357         default:
358                 return 0;
359         }
360         return 1;
361 }
362
363 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
364                           struct sockaddr *sa_out, bool try_new_addr,
365                           unsigned int *mark)
366 {
367         struct sockaddr_storage sas;
368         struct connection *con;
369         int idx;
370
371         if (!dlm_local_count)
372                 return -1;
373
374         idx = srcu_read_lock(&connections_srcu);
375         con = nodeid2con(nodeid, 0);
376         if (!con) {
377                 srcu_read_unlock(&connections_srcu, idx);
378                 return -ENOENT;
379         }
380
381         spin_lock(&con->addrs_lock);
382         if (!con->addr_count) {
383                 spin_unlock(&con->addrs_lock);
384                 srcu_read_unlock(&connections_srcu, idx);
385                 return -ENOENT;
386         }
387
388         memcpy(&sas, &con->addr[con->curr_addr_index],
389                sizeof(struct sockaddr_storage));
390
391         if (try_new_addr) {
392                 con->curr_addr_index++;
393                 if (con->curr_addr_index == con->addr_count)
394                         con->curr_addr_index = 0;
395         }
396
397         *mark = con->mark;
398         spin_unlock(&con->addrs_lock);
399
400         if (sas_out)
401                 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
402
403         if (!sa_out) {
404                 srcu_read_unlock(&connections_srcu, idx);
405                 return 0;
406         }
407
408         if (dlm_local_addr[0].ss_family == AF_INET) {
409                 struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
410                 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
411                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
412         } else {
413                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
414                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
415                 ret6->sin6_addr = in6->sin6_addr;
416         }
417
418         srcu_read_unlock(&connections_srcu, idx);
419         return 0;
420 }
421
422 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
423                           unsigned int *mark)
424 {
425         struct connection *con;
426         int i, idx, addr_i;
427
428         idx = srcu_read_lock(&connections_srcu);
429         for (i = 0; i < CONN_HASH_SIZE; i++) {
430                 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
431                         WARN_ON_ONCE(!con->addr_count);
432
433                         spin_lock(&con->addrs_lock);
434                         for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
435                                 if (addr_compare(&con->addr[addr_i], addr)) {
436                                         *nodeid = con->nodeid;
437                                         *mark = con->mark;
438                                         spin_unlock(&con->addrs_lock);
439                                         srcu_read_unlock(&connections_srcu, idx);
440                                         return 0;
441                                 }
442                         }
443                         spin_unlock(&con->addrs_lock);
444                 }
445         }
446         srcu_read_unlock(&connections_srcu, idx);
447
448         return -ENOENT;
449 }
450
451 static bool dlm_lowcomms_con_has_addr(const struct connection *con,
452                                       const struct sockaddr_storage *addr)
453 {
454         int i;
455
456         for (i = 0; i < con->addr_count; i++) {
457                 if (addr_compare(&con->addr[i], addr))
458                         return true;
459         }
460
461         return false;
462 }
463
464 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
465 {
466         struct connection *con;
467         bool ret, idx;
468
469         idx = srcu_read_lock(&connections_srcu);
470         con = nodeid2con(nodeid, GFP_NOFS);
471         if (!con) {
472                 srcu_read_unlock(&connections_srcu, idx);
473                 return -ENOMEM;
474         }
475
476         spin_lock(&con->addrs_lock);
477         if (!con->addr_count) {
478                 memcpy(&con->addr[0], addr, sizeof(*addr));
479                 con->addr_count = 1;
480                 con->mark = dlm_config.ci_mark;
481                 spin_unlock(&con->addrs_lock);
482                 srcu_read_unlock(&connections_srcu, idx);
483                 return 0;
484         }
485
486         ret = dlm_lowcomms_con_has_addr(con, addr);
487         if (ret) {
488                 spin_unlock(&con->addrs_lock);
489                 srcu_read_unlock(&connections_srcu, idx);
490                 return -EEXIST;
491         }
492
493         if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
494                 spin_unlock(&con->addrs_lock);
495                 srcu_read_unlock(&connections_srcu, idx);
496                 return -ENOSPC;
497         }
498
499         memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
500         srcu_read_unlock(&connections_srcu, idx);
501         spin_unlock(&con->addrs_lock);
502         return 0;
503 }
504
505 /* Data available on socket or listen socket received a connect */
506 static void lowcomms_data_ready(struct sock *sk)
507 {
508         struct connection *con = sock2con(sk);
509
510         trace_sk_data_ready(sk);
511
512         set_bit(CF_RECV_INTR, &con->flags);
513         lowcomms_queue_rwork(con);
514 }
515
516 static void lowcomms_write_space(struct sock *sk)
517 {
518         struct connection *con = sock2con(sk);
519
520         clear_bit(SOCK_NOSPACE, &con->sock->flags);
521
522         spin_lock_bh(&con->writequeue_lock);
523         if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
524                 con->sock->sk->sk_write_pending--;
525                 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
526         }
527
528         lowcomms_queue_swork(con);
529         spin_unlock_bh(&con->writequeue_lock);
530 }
531
532 static void lowcomms_state_change(struct sock *sk)
533 {
534         /* SCTP layer is not calling sk_data_ready when the connection
535          * is done, so we catch the signal through here.
536          */
537         if (sk->sk_shutdown == RCV_SHUTDOWN)
538                 lowcomms_data_ready(sk);
539 }
540
541 static void lowcomms_listen_data_ready(struct sock *sk)
542 {
543         trace_sk_data_ready(sk);
544
545         queue_work(io_workqueue, &listen_con.rwork);
546 }
547
548 int dlm_lowcomms_connect_node(int nodeid)
549 {
550         struct connection *con;
551         int idx;
552
553         idx = srcu_read_lock(&connections_srcu);
554         con = nodeid2con(nodeid, 0);
555         if (WARN_ON_ONCE(!con)) {
556                 srcu_read_unlock(&connections_srcu, idx);
557                 return -ENOENT;
558         }
559
560         down_read(&con->sock_lock);
561         if (!con->sock) {
562                 spin_lock_bh(&con->writequeue_lock);
563                 lowcomms_queue_swork(con);
564                 spin_unlock_bh(&con->writequeue_lock);
565         }
566         up_read(&con->sock_lock);
567         srcu_read_unlock(&connections_srcu, idx);
568
569         cond_resched();
570         return 0;
571 }
572
573 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
574 {
575         struct connection *con;
576         int idx;
577
578         idx = srcu_read_lock(&connections_srcu);
579         con = nodeid2con(nodeid, 0);
580         if (!con) {
581                 srcu_read_unlock(&connections_srcu, idx);
582                 return -ENOENT;
583         }
584
585         spin_lock(&con->addrs_lock);
586         con->mark = mark;
587         spin_unlock(&con->addrs_lock);
588         srcu_read_unlock(&connections_srcu, idx);
589         return 0;
590 }
591
592 static void lowcomms_error_report(struct sock *sk)
593 {
594         struct connection *con = sock2con(sk);
595         struct inet_sock *inet;
596
597         inet = inet_sk(sk);
598         switch (sk->sk_family) {
599         case AF_INET:
600                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
601                                    "sending to node %d at %pI4, dport %d, "
602                                    "sk_err=%d/%d\n", dlm_our_nodeid(),
603                                    con->nodeid, &inet->inet_daddr,
604                                    ntohs(inet->inet_dport), sk->sk_err,
605                                    READ_ONCE(sk->sk_err_soft));
606                 break;
607 #if IS_ENABLED(CONFIG_IPV6)
608         case AF_INET6:
609                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
610                                    "sending to node %d at %pI6c, "
611                                    "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
612                                    con->nodeid, &sk->sk_v6_daddr,
613                                    ntohs(inet->inet_dport), sk->sk_err,
614                                    READ_ONCE(sk->sk_err_soft));
615                 break;
616 #endif
617         default:
618                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
619                                    "invalid socket family %d set, "
620                                    "sk_err=%d/%d\n", dlm_our_nodeid(),
621                                    sk->sk_family, sk->sk_err,
622                                    READ_ONCE(sk->sk_err_soft));
623                 break;
624         }
625
626         dlm_midcomms_unack_msg_resend(con->nodeid);
627
628         listen_sock.sk_error_report(sk);
629 }
630
631 static void restore_callbacks(struct sock *sk)
632 {
633 #ifdef CONFIG_LOCKDEP
634         WARN_ON_ONCE(!lockdep_sock_is_held(sk));
635 #endif
636
637         sk->sk_user_data = NULL;
638         sk->sk_data_ready = listen_sock.sk_data_ready;
639         sk->sk_state_change = listen_sock.sk_state_change;
640         sk->sk_write_space = listen_sock.sk_write_space;
641         sk->sk_error_report = listen_sock.sk_error_report;
642 }
643
644 /* Make a socket active */
645 static void add_sock(struct socket *sock, struct connection *con)
646 {
647         struct sock *sk = sock->sk;
648
649         lock_sock(sk);
650         con->sock = sock;
651
652         sk->sk_user_data = con;
653         sk->sk_data_ready = lowcomms_data_ready;
654         sk->sk_write_space = lowcomms_write_space;
655         if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
656                 sk->sk_state_change = lowcomms_state_change;
657         sk->sk_allocation = GFP_NOFS;
658         sk->sk_use_task_frag = false;
659         sk->sk_error_report = lowcomms_error_report;
660         release_sock(sk);
661 }
662
663 /* Add the port number to an IPv6 or 4 sockaddr and return the address
664    length */
665 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
666                           int *addr_len)
667 {
668         saddr->ss_family =  dlm_local_addr[0].ss_family;
669         if (saddr->ss_family == AF_INET) {
670                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
671                 in4_addr->sin_port = cpu_to_be16(port);
672                 *addr_len = sizeof(struct sockaddr_in);
673                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
674         } else {
675                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
676                 in6_addr->sin6_port = cpu_to_be16(port);
677                 *addr_len = sizeof(struct sockaddr_in6);
678         }
679         memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
680 }
681
682 static void dlm_page_release(struct kref *kref)
683 {
684         struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
685                                                   ref);
686
687         __free_page(e->page);
688         dlm_free_writequeue(e);
689 }
690
691 static void dlm_msg_release(struct kref *kref)
692 {
693         struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
694
695         kref_put(&msg->entry->ref, dlm_page_release);
696         dlm_free_msg(msg);
697 }
698
699 static void free_entry(struct writequeue_entry *e)
700 {
701         struct dlm_msg *msg, *tmp;
702
703         list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
704                 if (msg->orig_msg) {
705                         msg->orig_msg->retransmit = false;
706                         kref_put(&msg->orig_msg->ref, dlm_msg_release);
707                 }
708
709                 list_del(&msg->list);
710                 kref_put(&msg->ref, dlm_msg_release);
711         }
712
713         list_del(&e->list);
714         kref_put(&e->ref, dlm_page_release);
715 }
716
717 static void dlm_close_sock(struct socket **sock)
718 {
719         lock_sock((*sock)->sk);
720         restore_callbacks((*sock)->sk);
721         release_sock((*sock)->sk);
722
723         sock_release(*sock);
724         *sock = NULL;
725 }
726
727 static void allow_connection_io(struct connection *con)
728 {
729         if (con->othercon)
730                 clear_bit(CF_IO_STOP, &con->othercon->flags);
731         clear_bit(CF_IO_STOP, &con->flags);
732 }
733
734 static void stop_connection_io(struct connection *con)
735 {
736         if (con->othercon)
737                 stop_connection_io(con->othercon);
738
739         spin_lock_bh(&con->writequeue_lock);
740         set_bit(CF_IO_STOP, &con->flags);
741         spin_unlock_bh(&con->writequeue_lock);
742
743         down_write(&con->sock_lock);
744         if (con->sock) {
745                 lock_sock(con->sock->sk);
746                 restore_callbacks(con->sock->sk);
747                 release_sock(con->sock->sk);
748         }
749         up_write(&con->sock_lock);
750
751         cancel_work_sync(&con->swork);
752         cancel_work_sync(&con->rwork);
753 }
754
755 /* Close a remote connection and tidy up */
756 static void close_connection(struct connection *con, bool and_other)
757 {
758         struct writequeue_entry *e;
759
760         if (con->othercon && and_other)
761                 close_connection(con->othercon, false);
762
763         down_write(&con->sock_lock);
764         if (!con->sock) {
765                 up_write(&con->sock_lock);
766                 return;
767         }
768
769         dlm_close_sock(&con->sock);
770
771         /* if we send a writequeue entry only a half way, we drop the
772          * whole entry because reconnection and that we not start of the
773          * middle of a msg which will confuse the other end.
774          *
775          * we can always drop messages because retransmits, but what we
776          * cannot allow is to transmit half messages which may be processed
777          * at the other side.
778          *
779          * our policy is to start on a clean state when disconnects, we don't
780          * know what's send/received on transport layer in this case.
781          */
782         spin_lock_bh(&con->writequeue_lock);
783         if (!list_empty(&con->writequeue)) {
784                 e = list_first_entry(&con->writequeue, struct writequeue_entry,
785                                      list);
786                 if (e->dirty)
787                         free_entry(e);
788         }
789         spin_unlock_bh(&con->writequeue_lock);
790
791         con->rx_leftover = 0;
792         con->retries = 0;
793         clear_bit(CF_APP_LIMITED, &con->flags);
794         clear_bit(CF_RECV_PENDING, &con->flags);
795         clear_bit(CF_SEND_PENDING, &con->flags);
796         up_write(&con->sock_lock);
797 }
798
799 static void shutdown_connection(struct connection *con, bool and_other)
800 {
801         int ret;
802
803         if (con->othercon && and_other)
804                 shutdown_connection(con->othercon, false);
805
806         flush_workqueue(io_workqueue);
807         down_read(&con->sock_lock);
808         /* nothing to shutdown */
809         if (!con->sock) {
810                 up_read(&con->sock_lock);
811                 return;
812         }
813
814         ret = kernel_sock_shutdown(con->sock, SHUT_WR);
815         up_read(&con->sock_lock);
816         if (ret) {
817                 log_print("Connection %p failed to shutdown: %d will force close",
818                           con, ret);
819                 goto force_close;
820         } else {
821                 ret = wait_event_timeout(con->shutdown_wait, !con->sock,
822                                          DLM_SHUTDOWN_WAIT_TIMEOUT);
823                 if (ret == 0) {
824                         log_print("Connection %p shutdown timed out, will force close",
825                                   con);
826                         goto force_close;
827                 }
828         }
829
830         return;
831
832 force_close:
833         close_connection(con, false);
834 }
835
836 static struct processqueue_entry *new_processqueue_entry(int nodeid,
837                                                          int buflen)
838 {
839         struct processqueue_entry *pentry;
840
841         pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
842         if (!pentry)
843                 return NULL;
844
845         pentry->buf = kmalloc(buflen, GFP_NOFS);
846         if (!pentry->buf) {
847                 kfree(pentry);
848                 return NULL;
849         }
850
851         pentry->nodeid = nodeid;
852         return pentry;
853 }
854
855 static void free_processqueue_entry(struct processqueue_entry *pentry)
856 {
857         kfree(pentry->buf);
858         kfree(pentry);
859 }
860
861 struct dlm_processed_nodes {
862         int nodeid;
863
864         struct list_head list;
865 };
866
867 static void process_dlm_messages(struct work_struct *work)
868 {
869         struct processqueue_entry *pentry;
870
871         spin_lock_bh(&processqueue_lock);
872         pentry = list_first_entry_or_null(&processqueue,
873                                           struct processqueue_entry, list);
874         if (WARN_ON_ONCE(!pentry)) {
875                 process_dlm_messages_pending = false;
876                 spin_unlock_bh(&processqueue_lock);
877                 return;
878         }
879
880         list_del(&pentry->list);
881         if (atomic_dec_and_test(&processqueue_count))
882                 wake_up(&processqueue_wq);
883         spin_unlock_bh(&processqueue_lock);
884
885         for (;;) {
886                 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
887                                             pentry->buflen);
888                 free_processqueue_entry(pentry);
889
890                 spin_lock_bh(&processqueue_lock);
891                 pentry = list_first_entry_or_null(&processqueue,
892                                                   struct processqueue_entry, list);
893                 if (!pentry) {
894                         process_dlm_messages_pending = false;
895                         spin_unlock_bh(&processqueue_lock);
896                         break;
897                 }
898
899                 list_del(&pentry->list);
900                 if (atomic_dec_and_test(&processqueue_count))
901                         wake_up(&processqueue_wq);
902                 spin_unlock_bh(&processqueue_lock);
903         }
904 }
905
906 /* Data received from remote end */
907 static int receive_from_sock(struct connection *con, int buflen)
908 {
909         struct processqueue_entry *pentry;
910         int ret, buflen_real;
911         struct msghdr msg;
912         struct kvec iov;
913
914         pentry = new_processqueue_entry(con->nodeid, buflen);
915         if (!pentry)
916                 return DLM_IO_RESCHED;
917
918         memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
919
920         /* calculate new buffer parameter regarding last receive and
921          * possible leftover bytes
922          */
923         iov.iov_base = pentry->buf + con->rx_leftover;
924         iov.iov_len = buflen - con->rx_leftover;
925
926         memset(&msg, 0, sizeof(msg));
927         msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
928         clear_bit(CF_RECV_INTR, &con->flags);
929 again:
930         ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
931                              msg.msg_flags);
932         trace_dlm_recv(con->nodeid, ret);
933         if (ret == -EAGAIN) {
934                 lock_sock(con->sock->sk);
935                 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
936                         release_sock(con->sock->sk);
937                         goto again;
938                 }
939
940                 clear_bit(CF_RECV_PENDING, &con->flags);
941                 release_sock(con->sock->sk);
942                 free_processqueue_entry(pentry);
943                 return DLM_IO_END;
944         } else if (ret == 0) {
945                 /* close will clear CF_RECV_PENDING */
946                 free_processqueue_entry(pentry);
947                 return DLM_IO_EOF;
948         } else if (ret < 0) {
949                 free_processqueue_entry(pentry);
950                 return ret;
951         }
952
953         /* new buflen according readed bytes and leftover from last receive */
954         buflen_real = ret + con->rx_leftover;
955         ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
956                                            buflen_real);
957         if (ret < 0) {
958                 free_processqueue_entry(pentry);
959                 return ret;
960         }
961
962         pentry->buflen = ret;
963
964         /* calculate leftover bytes from process and put it into begin of
965          * the receive buffer, so next receive we have the full message
966          * at the start address of the receive buffer.
967          */
968         con->rx_leftover = buflen_real - ret;
969         memmove(con->rx_leftover_buf, pentry->buf + ret,
970                 con->rx_leftover);
971
972         spin_lock_bh(&processqueue_lock);
973         ret = atomic_inc_return(&processqueue_count);
974         list_add_tail(&pentry->list, &processqueue);
975         if (!process_dlm_messages_pending) {
976                 process_dlm_messages_pending = true;
977                 queue_work(process_workqueue, &process_work);
978         }
979         spin_unlock_bh(&processqueue_lock);
980
981         if (ret > DLM_MAX_PROCESS_BUFFERS)
982                 return DLM_IO_FLUSH;
983
984         return DLM_IO_SUCCESS;
985 }
986
987 /* Listening socket is busy, accept a connection */
988 static int accept_from_sock(void)
989 {
990         struct sockaddr_storage peeraddr;
991         int len, idx, result, nodeid;
992         struct connection *newcon;
993         struct socket *newsock;
994         unsigned int mark;
995
996         result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
997         if (result == -EAGAIN)
998                 return DLM_IO_END;
999         else if (result < 0)
1000                 goto accept_err;
1001
1002         /* Get the connected socket's peer */
1003         memset(&peeraddr, 0, sizeof(peeraddr));
1004         len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
1005         if (len < 0) {
1006                 result = -ECONNABORTED;
1007                 goto accept_err;
1008         }
1009
1010         /* Get the new node's NODEID */
1011         make_sockaddr(&peeraddr, 0, &len);
1012         if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1013                 switch (peeraddr.ss_family) {
1014                 case AF_INET: {
1015                         struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1016
1017                         log_print("connect from non cluster IPv4 node %pI4",
1018                                   &sin->sin_addr);
1019                         break;
1020                 }
1021 #if IS_ENABLED(CONFIG_IPV6)
1022                 case AF_INET6: {
1023                         struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1024
1025                         log_print("connect from non cluster IPv6 node %pI6c",
1026                                   &sin6->sin6_addr);
1027                         break;
1028                 }
1029 #endif
1030                 default:
1031                         log_print("invalid family from non cluster node");
1032                         break;
1033                 }
1034
1035                 sock_release(newsock);
1036                 return -1;
1037         }
1038
1039         log_print("got connection from %d", nodeid);
1040
1041         /*  Check to see if we already have a connection to this node. This
1042          *  could happen if the two nodes initiate a connection at roughly
1043          *  the same time and the connections cross on the wire.
1044          *  In this case we store the incoming one in "othercon"
1045          */
1046         idx = srcu_read_lock(&connections_srcu);
1047         newcon = nodeid2con(nodeid, 0);
1048         if (WARN_ON_ONCE(!newcon)) {
1049                 srcu_read_unlock(&connections_srcu, idx);
1050                 result = -ENOENT;
1051                 goto accept_err;
1052         }
1053
1054         sock_set_mark(newsock->sk, mark);
1055
1056         down_write(&newcon->sock_lock);
1057         if (newcon->sock) {
1058                 struct connection *othercon = newcon->othercon;
1059
1060                 if (!othercon) {
1061                         othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1062                         if (!othercon) {
1063                                 log_print("failed to allocate incoming socket");
1064                                 up_write(&newcon->sock_lock);
1065                                 srcu_read_unlock(&connections_srcu, idx);
1066                                 result = -ENOMEM;
1067                                 goto accept_err;
1068                         }
1069
1070                         dlm_con_init(othercon, nodeid);
1071                         lockdep_set_subclass(&othercon->sock_lock, 1);
1072                         newcon->othercon = othercon;
1073                         set_bit(CF_IS_OTHERCON, &othercon->flags);
1074                 } else {
1075                         /* close other sock con if we have something new */
1076                         close_connection(othercon, false);
1077                 }
1078
1079                 down_write(&othercon->sock_lock);
1080                 add_sock(newsock, othercon);
1081
1082                 /* check if we receved something while adding */
1083                 lock_sock(othercon->sock->sk);
1084                 lowcomms_queue_rwork(othercon);
1085                 release_sock(othercon->sock->sk);
1086                 up_write(&othercon->sock_lock);
1087         }
1088         else {
1089                 /* accept copies the sk after we've saved the callbacks, so we
1090                    don't want to save them a second time or comm errors will
1091                    result in calling sk_error_report recursively. */
1092                 add_sock(newsock, newcon);
1093
1094                 /* check if we receved something while adding */
1095                 lock_sock(newcon->sock->sk);
1096                 lowcomms_queue_rwork(newcon);
1097                 release_sock(newcon->sock->sk);
1098         }
1099         up_write(&newcon->sock_lock);
1100         srcu_read_unlock(&connections_srcu, idx);
1101
1102         return DLM_IO_SUCCESS;
1103
1104 accept_err:
1105         if (newsock)
1106                 sock_release(newsock);
1107
1108         return result;
1109 }
1110
1111 /*
1112  * writequeue_entry_complete - try to delete and free write queue entry
1113  * @e: write queue entry to try to delete
1114  * @completed: bytes completed
1115  *
1116  * writequeue_lock must be held.
1117  */
1118 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1119 {
1120         e->offset += completed;
1121         e->len -= completed;
1122         /* signal that page was half way transmitted */
1123         e->dirty = true;
1124
1125         if (e->len == 0 && e->users == 0)
1126                 free_entry(e);
1127 }
1128
1129 /*
1130  * sctp_bind_addrs - bind a SCTP socket to all our addresses
1131  */
1132 static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1133 {
1134         struct sockaddr_storage localaddr;
1135         struct sockaddr *addr = (struct sockaddr *)&localaddr;
1136         int i, addr_len, result = 0;
1137
1138         for (i = 0; i < dlm_local_count; i++) {
1139                 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
1140                 make_sockaddr(&localaddr, port, &addr_len);
1141
1142                 if (!i)
1143                         result = kernel_bind(sock, addr, addr_len);
1144                 else
1145                         result = sock_bind_add(sock->sk, addr, addr_len);
1146
1147                 if (result < 0) {
1148                         log_print("Can't bind to %d addr number %d, %d.\n",
1149                                   port, i + 1, result);
1150                         break;
1151                 }
1152         }
1153         return result;
1154 }
1155
1156 /* Get local addresses */
1157 static void init_local(void)
1158 {
1159         struct sockaddr_storage sas;
1160         int i;
1161
1162         dlm_local_count = 0;
1163         for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1164                 if (dlm_our_addr(&sas, i))
1165                         break;
1166
1167                 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
1168         }
1169 }
1170
1171 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1172 {
1173         struct writequeue_entry *entry;
1174
1175         entry = dlm_allocate_writequeue();
1176         if (!entry)
1177                 return NULL;
1178
1179         entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1180         if (!entry->page) {
1181                 dlm_free_writequeue(entry);
1182                 return NULL;
1183         }
1184
1185         entry->offset = 0;
1186         entry->len = 0;
1187         entry->end = 0;
1188         entry->dirty = false;
1189         entry->con = con;
1190         entry->users = 1;
1191         kref_init(&entry->ref);
1192         return entry;
1193 }
1194
1195 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1196                                              char **ppc, void (*cb)(void *data),
1197                                              void *data)
1198 {
1199         struct writequeue_entry *e;
1200
1201         spin_lock_bh(&con->writequeue_lock);
1202         if (!list_empty(&con->writequeue)) {
1203                 e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1204                 if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1205                         kref_get(&e->ref);
1206
1207                         *ppc = page_address(e->page) + e->end;
1208                         if (cb)
1209                                 cb(data);
1210
1211                         e->end += len;
1212                         e->users++;
1213                         goto out;
1214                 }
1215         }
1216
1217         e = new_writequeue_entry(con);
1218         if (!e)
1219                 goto out;
1220
1221         kref_get(&e->ref);
1222         *ppc = page_address(e->page);
1223         e->end += len;
1224         if (cb)
1225                 cb(data);
1226
1227         list_add_tail(&e->list, &con->writequeue);
1228
1229 out:
1230         spin_unlock_bh(&con->writequeue_lock);
1231         return e;
1232 };
1233
1234 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1235                                                 char **ppc, void (*cb)(void *data),
1236                                                 void *data)
1237 {
1238         struct writequeue_entry *e;
1239         struct dlm_msg *msg;
1240
1241         msg = dlm_allocate_msg();
1242         if (!msg)
1243                 return NULL;
1244
1245         kref_init(&msg->ref);
1246
1247         e = new_wq_entry(con, len, ppc, cb, data);
1248         if (!e) {
1249                 dlm_free_msg(msg);
1250                 return NULL;
1251         }
1252
1253         msg->retransmit = false;
1254         msg->orig_msg = NULL;
1255         msg->ppc = *ppc;
1256         msg->len = len;
1257         msg->entry = e;
1258
1259         return msg;
1260 }
1261
1262 /* avoid false positive for nodes_srcu, unlock happens in
1263  * dlm_lowcomms_commit_msg which is a must call if success
1264  */
1265 #ifndef __CHECKER__
1266 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
1267                                      void (*cb)(void *data), void *data)
1268 {
1269         struct connection *con;
1270         struct dlm_msg *msg;
1271         int idx;
1272
1273         if (len > DLM_MAX_SOCKET_BUFSIZE ||
1274             len < sizeof(struct dlm_header)) {
1275                 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1276                 log_print("failed to allocate a buffer of size %d", len);
1277                 WARN_ON_ONCE(1);
1278                 return NULL;
1279         }
1280
1281         idx = srcu_read_lock(&connections_srcu);
1282         con = nodeid2con(nodeid, 0);
1283         if (WARN_ON_ONCE(!con)) {
1284                 srcu_read_unlock(&connections_srcu, idx);
1285                 return NULL;
1286         }
1287
1288         msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
1289         if (!msg) {
1290                 srcu_read_unlock(&connections_srcu, idx);
1291                 return NULL;
1292         }
1293
1294         /* for dlm_lowcomms_commit_msg() */
1295         kref_get(&msg->ref);
1296         /* we assume if successful commit must called */
1297         msg->idx = idx;
1298         return msg;
1299 }
1300 #endif
1301
1302 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1303 {
1304         struct writequeue_entry *e = msg->entry;
1305         struct connection *con = e->con;
1306         int users;
1307
1308         spin_lock_bh(&con->writequeue_lock);
1309         kref_get(&msg->ref);
1310         list_add(&msg->list, &e->msgs);
1311
1312         users = --e->users;
1313         if (users)
1314                 goto out;
1315
1316         e->len = DLM_WQ_LENGTH_BYTES(e);
1317
1318         lowcomms_queue_swork(con);
1319
1320 out:
1321         spin_unlock_bh(&con->writequeue_lock);
1322         return;
1323 }
1324
1325 /* avoid false positive for nodes_srcu, lock was happen in
1326  * dlm_lowcomms_new_msg
1327  */
1328 #ifndef __CHECKER__
1329 void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1330 {
1331         _dlm_lowcomms_commit_msg(msg);
1332         srcu_read_unlock(&connections_srcu, msg->idx);
1333         /* because dlm_lowcomms_new_msg() */
1334         kref_put(&msg->ref, dlm_msg_release);
1335 }
1336 #endif
1337
1338 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1339 {
1340         kref_put(&msg->ref, dlm_msg_release);
1341 }
1342
1343 /* does not held connections_srcu, usage lowcomms_error_report only */
1344 int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1345 {
1346         struct dlm_msg *msg_resend;
1347         char *ppc;
1348
1349         if (msg->retransmit)
1350                 return 1;
1351
1352         msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
1353                                               NULL, NULL);
1354         if (!msg_resend)
1355                 return -ENOMEM;
1356
1357         msg->retransmit = true;
1358         kref_get(&msg->ref);
1359         msg_resend->orig_msg = msg;
1360
1361         memcpy(ppc, msg->ppc, msg->len);
1362         _dlm_lowcomms_commit_msg(msg_resend);
1363         dlm_lowcomms_put_msg(msg_resend);
1364
1365         return 0;
1366 }
1367
1368 /* Send a message */
1369 static int send_to_sock(struct connection *con)
1370 {
1371         struct writequeue_entry *e;
1372         struct bio_vec bvec;
1373         struct msghdr msg = {
1374                 .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
1375         };
1376         int len, offset, ret;
1377
1378         spin_lock_bh(&con->writequeue_lock);
1379         e = con_next_wq(con);
1380         if (!e) {
1381                 clear_bit(CF_SEND_PENDING, &con->flags);
1382                 spin_unlock_bh(&con->writequeue_lock);
1383                 return DLM_IO_END;
1384         }
1385
1386         len = e->len;
1387         offset = e->offset;
1388         WARN_ON_ONCE(len == 0 && e->users == 0);
1389         spin_unlock_bh(&con->writequeue_lock);
1390
1391         bvec_set_page(&bvec, e->page, len, offset);
1392         iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
1393         ret = sock_sendmsg(con->sock, &msg);
1394         trace_dlm_send(con->nodeid, ret);
1395         if (ret == -EAGAIN || ret == 0) {
1396                 lock_sock(con->sock->sk);
1397                 spin_lock_bh(&con->writequeue_lock);
1398                 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1399                     !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1400                         /* Notify TCP that we're limited by the
1401                          * application window size.
1402                          */
1403                         set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1404                         con->sock->sk->sk_write_pending++;
1405
1406                         clear_bit(CF_SEND_PENDING, &con->flags);
1407                         spin_unlock_bh(&con->writequeue_lock);
1408                         release_sock(con->sock->sk);
1409
1410                         /* wait for write_space() event */
1411                         return DLM_IO_END;
1412                 }
1413                 spin_unlock_bh(&con->writequeue_lock);
1414                 release_sock(con->sock->sk);
1415
1416                 return DLM_IO_RESCHED;
1417         } else if (ret < 0) {
1418                 return ret;
1419         }
1420
1421         spin_lock_bh(&con->writequeue_lock);
1422         writequeue_entry_complete(e, ret);
1423         spin_unlock_bh(&con->writequeue_lock);
1424
1425         return DLM_IO_SUCCESS;
1426 }
1427
1428 static void clean_one_writequeue(struct connection *con)
1429 {
1430         struct writequeue_entry *e, *safe;
1431
1432         spin_lock_bh(&con->writequeue_lock);
1433         list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1434                 free_entry(e);
1435         }
1436         spin_unlock_bh(&con->writequeue_lock);
1437 }
1438
1439 static void connection_release(struct rcu_head *rcu)
1440 {
1441         struct connection *con = container_of(rcu, struct connection, rcu);
1442
1443         WARN_ON_ONCE(!list_empty(&con->writequeue));
1444         WARN_ON_ONCE(con->sock);
1445         kfree(con);
1446 }
1447
1448 /* Called from recovery when it knows that a node has
1449    left the cluster */
1450 int dlm_lowcomms_close(int nodeid)
1451 {
1452         struct connection *con;
1453         int idx;
1454
1455         log_print("closing connection to node %d", nodeid);
1456
1457         idx = srcu_read_lock(&connections_srcu);
1458         con = nodeid2con(nodeid, 0);
1459         if (WARN_ON_ONCE(!con)) {
1460                 srcu_read_unlock(&connections_srcu, idx);
1461                 return -ENOENT;
1462         }
1463
1464         stop_connection_io(con);
1465         log_print("io handling for node: %d stopped", nodeid);
1466         close_connection(con, true);
1467
1468         spin_lock(&connections_lock);
1469         hlist_del_rcu(&con->list);
1470         spin_unlock(&connections_lock);
1471
1472         clean_one_writequeue(con);
1473         call_srcu(&connections_srcu, &con->rcu, connection_release);
1474         if (con->othercon) {
1475                 clean_one_writequeue(con->othercon);
1476                 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1477         }
1478         srcu_read_unlock(&connections_srcu, idx);
1479
1480         /* for debugging we print when we are done to compare with other
1481          * messages in between. This function need to be correctly synchronized
1482          * with io handling
1483          */
1484         log_print("closing connection to node %d done", nodeid);
1485
1486         return 0;
1487 }
1488
1489 /* Receive worker function */
1490 static void process_recv_sockets(struct work_struct *work)
1491 {
1492         struct connection *con = container_of(work, struct connection, rwork);
1493         int ret, buflen;
1494
1495         down_read(&con->sock_lock);
1496         if (!con->sock) {
1497                 up_read(&con->sock_lock);
1498                 return;
1499         }
1500
1501         buflen = READ_ONCE(dlm_config.ci_buffer_size);
1502         do {
1503                 ret = receive_from_sock(con, buflen);
1504         } while (ret == DLM_IO_SUCCESS);
1505         up_read(&con->sock_lock);
1506
1507         switch (ret) {
1508         case DLM_IO_END:
1509                 /* CF_RECV_PENDING cleared */
1510                 break;
1511         case DLM_IO_EOF:
1512                 close_connection(con, false);
1513                 wake_up(&con->shutdown_wait);
1514                 /* CF_RECV_PENDING cleared */
1515                 break;
1516         case DLM_IO_FLUSH:
1517                 /* we can't flush the process_workqueue here because a
1518                  * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
1519                  * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
1520                  * we have a waitqueue to wait until all messages are
1521                  * processed.
1522                  *
1523                  * This handling is only necessary to backoff the sender and
1524                  * not queue all messages from the socket layer into DLM
1525                  * processqueue. When DLM is capable to parse multiple messages
1526                  * on an e.g. per socket basis this handling can might be
1527                  * removed. Especially in a message burst we are too slow to
1528                  * process messages and the queue will fill up memory.
1529                  */
1530                 wait_event(processqueue_wq, !atomic_read(&processqueue_count));
1531                 fallthrough;
1532         case DLM_IO_RESCHED:
1533                 cond_resched();
1534                 queue_work(io_workqueue, &con->rwork);
1535                 /* CF_RECV_PENDING not cleared */
1536                 break;
1537         default:
1538                 if (ret < 0) {
1539                         if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1540                                 close_connection(con, false);
1541                         } else {
1542                                 spin_lock_bh(&con->writequeue_lock);
1543                                 lowcomms_queue_swork(con);
1544                                 spin_unlock_bh(&con->writequeue_lock);
1545                         }
1546
1547                         /* CF_RECV_PENDING cleared for othercon
1548                          * we trigger send queue if not already done
1549                          * and process_send_sockets will handle it
1550                          */
1551                         break;
1552                 }
1553
1554                 WARN_ON_ONCE(1);
1555                 break;
1556         }
1557 }
1558
1559 static void process_listen_recv_socket(struct work_struct *work)
1560 {
1561         int ret;
1562
1563         if (WARN_ON_ONCE(!listen_con.sock))
1564                 return;
1565
1566         do {
1567                 ret = accept_from_sock();
1568         } while (ret == DLM_IO_SUCCESS);
1569
1570         if (ret < 0)
1571                 log_print("critical error accepting connection: %d", ret);
1572 }
1573
1574 static int dlm_connect(struct connection *con)
1575 {
1576         struct sockaddr_storage addr;
1577         int result, addr_len;
1578         struct socket *sock;
1579         unsigned int mark;
1580
1581         memset(&addr, 0, sizeof(addr));
1582         result = nodeid_to_addr(con->nodeid, &addr, NULL,
1583                                 dlm_proto_ops->try_new_addr, &mark);
1584         if (result < 0) {
1585                 log_print("no address for nodeid %d", con->nodeid);
1586                 return result;
1587         }
1588
1589         /* Create a socket to communicate with */
1590         result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1591                                   SOCK_STREAM, dlm_proto_ops->proto, &sock);
1592         if (result < 0)
1593                 return result;
1594
1595         sock_set_mark(sock->sk, mark);
1596         dlm_proto_ops->sockopts(sock);
1597
1598         result = dlm_proto_ops->bind(sock);
1599         if (result < 0) {
1600                 sock_release(sock);
1601                 return result;
1602         }
1603
1604         add_sock(sock, con);
1605
1606         log_print_ratelimited("connecting to %d", con->nodeid);
1607         make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1608         result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1609                                         addr_len);
1610         switch (result) {
1611         case -EINPROGRESS:
1612                 /* not an error */
1613                 fallthrough;
1614         case 0:
1615                 break;
1616         default:
1617                 if (result < 0)
1618                         dlm_close_sock(&con->sock);
1619
1620                 break;
1621         }
1622
1623         return result;
1624 }
1625
1626 /* Send worker function */
1627 static void process_send_sockets(struct work_struct *work)
1628 {
1629         struct connection *con = container_of(work, struct connection, swork);
1630         int ret;
1631
1632         WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1633
1634         down_read(&con->sock_lock);
1635         if (!con->sock) {
1636                 up_read(&con->sock_lock);
1637                 down_write(&con->sock_lock);
1638                 if (!con->sock) {
1639                         ret = dlm_connect(con);
1640                         switch (ret) {
1641                         case 0:
1642                                 break;
1643                         case -EINPROGRESS:
1644                                 /* avoid spamming resched on connection
1645                                  * we might can switch to a state_change
1646                                  * event based mechanism if established
1647                                  */
1648                                 msleep(100);
1649                                 break;
1650                         default:
1651                                 /* CF_SEND_PENDING not cleared */
1652                                 up_write(&con->sock_lock);
1653                                 log_print("connect to node %d try %d error %d",
1654                                           con->nodeid, con->retries++, ret);
1655                                 msleep(1000);
1656                                 /* For now we try forever to reconnect. In
1657                                  * future we should send a event to cluster
1658                                  * manager to fence itself after certain amount
1659                                  * of retries.
1660                                  */
1661                                 queue_work(io_workqueue, &con->swork);
1662                                 return;
1663                         }
1664                 }
1665                 downgrade_write(&con->sock_lock);
1666         }
1667
1668         do {
1669                 ret = send_to_sock(con);
1670         } while (ret == DLM_IO_SUCCESS);
1671         up_read(&con->sock_lock);
1672
1673         switch (ret) {
1674         case DLM_IO_END:
1675                 /* CF_SEND_PENDING cleared */
1676                 break;
1677         case DLM_IO_RESCHED:
1678                 /* CF_SEND_PENDING not cleared */
1679                 cond_resched();
1680                 queue_work(io_workqueue, &con->swork);
1681                 break;
1682         default:
1683                 if (ret < 0) {
1684                         close_connection(con, false);
1685
1686                         /* CF_SEND_PENDING cleared */
1687                         spin_lock_bh(&con->writequeue_lock);
1688                         lowcomms_queue_swork(con);
1689                         spin_unlock_bh(&con->writequeue_lock);
1690                         break;
1691                 }
1692
1693                 WARN_ON_ONCE(1);
1694                 break;
1695         }
1696 }
1697
1698 static void work_stop(void)
1699 {
1700         if (io_workqueue) {
1701                 destroy_workqueue(io_workqueue);
1702                 io_workqueue = NULL;
1703         }
1704
1705         if (process_workqueue) {
1706                 destroy_workqueue(process_workqueue);
1707                 process_workqueue = NULL;
1708         }
1709 }
1710
1711 static int work_start(void)
1712 {
1713         io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
1714                                        WQ_UNBOUND, 0);
1715         if (!io_workqueue) {
1716                 log_print("can't start dlm_io");
1717                 return -ENOMEM;
1718         }
1719
1720         process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
1721         if (!process_workqueue) {
1722                 log_print("can't start dlm_process");
1723                 destroy_workqueue(io_workqueue);
1724                 io_workqueue = NULL;
1725                 return -ENOMEM;
1726         }
1727
1728         return 0;
1729 }
1730
1731 void dlm_lowcomms_shutdown(void)
1732 {
1733         struct connection *con;
1734         int i, idx;
1735
1736         /* stop lowcomms_listen_data_ready calls */
1737         lock_sock(listen_con.sock->sk);
1738         listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
1739         release_sock(listen_con.sock->sk);
1740
1741         cancel_work_sync(&listen_con.rwork);
1742         dlm_close_sock(&listen_con.sock);
1743
1744         idx = srcu_read_lock(&connections_srcu);
1745         for (i = 0; i < CONN_HASH_SIZE; i++) {
1746                 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1747                         shutdown_connection(con, true);
1748                         stop_connection_io(con);
1749                         flush_workqueue(process_workqueue);
1750                         close_connection(con, true);
1751
1752                         clean_one_writequeue(con);
1753                         if (con->othercon)
1754                                 clean_one_writequeue(con->othercon);
1755                         allow_connection_io(con);
1756                 }
1757         }
1758         srcu_read_unlock(&connections_srcu, idx);
1759 }
1760
1761 void dlm_lowcomms_stop(void)
1762 {
1763         work_stop();
1764         dlm_proto_ops = NULL;
1765 }
1766
1767 static int dlm_listen_for_all(void)
1768 {
1769         struct socket *sock;
1770         int result;
1771
1772         log_print("Using %s for communications",
1773                   dlm_proto_ops->name);
1774
1775         result = dlm_proto_ops->listen_validate();
1776         if (result < 0)
1777                 return result;
1778
1779         result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1780                                   SOCK_STREAM, dlm_proto_ops->proto, &sock);
1781         if (result < 0) {
1782                 log_print("Can't create comms socket: %d", result);
1783                 return result;
1784         }
1785
1786         sock_set_mark(sock->sk, dlm_config.ci_mark);
1787         dlm_proto_ops->listen_sockopts(sock);
1788
1789         result = dlm_proto_ops->listen_bind(sock);
1790         if (result < 0)
1791                 goto out;
1792
1793         lock_sock(sock->sk);
1794         listen_sock.sk_data_ready = sock->sk->sk_data_ready;
1795         listen_sock.sk_write_space = sock->sk->sk_write_space;
1796         listen_sock.sk_error_report = sock->sk->sk_error_report;
1797         listen_sock.sk_state_change = sock->sk->sk_state_change;
1798
1799         listen_con.sock = sock;
1800
1801         sock->sk->sk_allocation = GFP_NOFS;
1802         sock->sk->sk_use_task_frag = false;
1803         sock->sk->sk_data_ready = lowcomms_listen_data_ready;
1804         release_sock(sock->sk);
1805
1806         result = sock->ops->listen(sock, 128);
1807         if (result < 0) {
1808                 dlm_close_sock(&listen_con.sock);
1809                 return result;
1810         }
1811
1812         return 0;
1813
1814 out:
1815         sock_release(sock);
1816         return result;
1817 }
1818
1819 static int dlm_tcp_bind(struct socket *sock)
1820 {
1821         struct sockaddr_storage src_addr;
1822         int result, addr_len;
1823
1824         /* Bind to our cluster-known address connecting to avoid
1825          * routing problems.
1826          */
1827         memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
1828         make_sockaddr(&src_addr, 0, &addr_len);
1829
1830         result = kernel_bind(sock, (struct sockaddr *)&src_addr,
1831                              addr_len);
1832         if (result < 0) {
1833                 /* This *may* not indicate a critical error */
1834                 log_print("could not bind for connect: %d", result);
1835         }
1836
1837         return 0;
1838 }
1839
1840 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1841                            struct sockaddr *addr, int addr_len)
1842 {
1843         return kernel_connect(sock, addr, addr_len, O_NONBLOCK);
1844 }
1845
1846 static int dlm_tcp_listen_validate(void)
1847 {
1848         /* We don't support multi-homed hosts */
1849         if (dlm_local_count > 1) {
1850                 log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1851                 return -EINVAL;
1852         }
1853
1854         return 0;
1855 }
1856
1857 static void dlm_tcp_sockopts(struct socket *sock)
1858 {
1859         /* Turn off Nagle's algorithm */
1860         tcp_sock_set_nodelay(sock->sk);
1861 }
1862
1863 static void dlm_tcp_listen_sockopts(struct socket *sock)
1864 {
1865         dlm_tcp_sockopts(sock);
1866         sock_set_reuseaddr(sock->sk);
1867 }
1868
1869 static int dlm_tcp_listen_bind(struct socket *sock)
1870 {
1871         int addr_len;
1872
1873         /* Bind to our port */
1874         make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1875         return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0],
1876                            addr_len);
1877 }
1878
1879 static const struct dlm_proto_ops dlm_tcp_ops = {
1880         .name = "TCP",
1881         .proto = IPPROTO_TCP,
1882         .connect = dlm_tcp_connect,
1883         .sockopts = dlm_tcp_sockopts,
1884         .bind = dlm_tcp_bind,
1885         .listen_validate = dlm_tcp_listen_validate,
1886         .listen_sockopts = dlm_tcp_listen_sockopts,
1887         .listen_bind = dlm_tcp_listen_bind,
1888 };
1889
1890 static int dlm_sctp_bind(struct socket *sock)
1891 {
1892         return sctp_bind_addrs(sock, 0);
1893 }
1894
1895 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1896                             struct sockaddr *addr, int addr_len)
1897 {
1898         int ret;
1899
1900         /*
1901          * Make kernel_connect() function return in specified time,
1902          * since O_NONBLOCK argument in connect() function does not work here,
1903          * then, we should restore the default value of this attribute.
1904          */
1905         sock_set_sndtimeo(sock->sk, 5);
1906         ret = kernel_connect(sock, addr, addr_len, 0);
1907         sock_set_sndtimeo(sock->sk, 0);
1908         return ret;
1909 }
1910
1911 static int dlm_sctp_listen_validate(void)
1912 {
1913         if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1914                 log_print("SCTP is not enabled by this kernel");
1915                 return -EOPNOTSUPP;
1916         }
1917
1918         request_module("sctp");
1919         return 0;
1920 }
1921
1922 static int dlm_sctp_bind_listen(struct socket *sock)
1923 {
1924         return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1925 }
1926
1927 static void dlm_sctp_sockopts(struct socket *sock)
1928 {
1929         /* Turn off Nagle's algorithm */
1930         sctp_sock_set_nodelay(sock->sk);
1931         sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1932 }
1933
1934 static const struct dlm_proto_ops dlm_sctp_ops = {
1935         .name = "SCTP",
1936         .proto = IPPROTO_SCTP,
1937         .try_new_addr = true,
1938         .connect = dlm_sctp_connect,
1939         .sockopts = dlm_sctp_sockopts,
1940         .bind = dlm_sctp_bind,
1941         .listen_validate = dlm_sctp_listen_validate,
1942         .listen_sockopts = dlm_sctp_sockopts,
1943         .listen_bind = dlm_sctp_bind_listen,
1944 };
1945
1946 int dlm_lowcomms_start(void)
1947 {
1948         int error;
1949
1950         init_local();
1951         if (!dlm_local_count) {
1952                 error = -ENOTCONN;
1953                 log_print("no local IP address has been set");
1954                 goto fail;
1955         }
1956
1957         error = work_start();
1958         if (error)
1959                 goto fail;
1960
1961         /* Start listening */
1962         switch (dlm_config.ci_protocol) {
1963         case DLM_PROTO_TCP:
1964                 dlm_proto_ops = &dlm_tcp_ops;
1965                 break;
1966         case DLM_PROTO_SCTP:
1967                 dlm_proto_ops = &dlm_sctp_ops;
1968                 break;
1969         default:
1970                 log_print("Invalid protocol identifier %d set",
1971                           dlm_config.ci_protocol);
1972                 error = -EINVAL;
1973                 goto fail_proto_ops;
1974         }
1975
1976         error = dlm_listen_for_all();
1977         if (error)
1978                 goto fail_listen;
1979
1980         return 0;
1981
1982 fail_listen:
1983         dlm_proto_ops = NULL;
1984 fail_proto_ops:
1985         work_stop();
1986 fail:
1987         return error;
1988 }
1989
1990 void dlm_lowcomms_init(void)
1991 {
1992         int i;
1993
1994         for (i = 0; i < CONN_HASH_SIZE; i++)
1995                 INIT_HLIST_HEAD(&connection_hash[i]);
1996
1997         INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1998 }
1999
2000 void dlm_lowcomms_exit(void)
2001 {
2002         struct connection *con;
2003         int i, idx;
2004
2005         idx = srcu_read_lock(&connections_srcu);
2006         for (i = 0; i < CONN_HASH_SIZE; i++) {
2007                 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
2008                         spin_lock(&connections_lock);
2009                         hlist_del_rcu(&con->list);
2010                         spin_unlock(&connections_lock);
2011
2012                         if (con->othercon)
2013                                 call_srcu(&connections_srcu, &con->othercon->rcu,
2014                                           connection_release);
2015                         call_srcu(&connections_srcu, &con->rcu, connection_release);
2016                 }
2017         }
2018         srcu_read_unlock(&connections_srcu, idx);
2019 }
This page took 0.147231 seconds and 4 git commands to generate.