]> Git Repo - J-linux.git/blobdiff - fs/dlm/lowcomms.c
HID: hid-sensor-custom: Fix big on-stack allocation in hid_sensor_custom_get_known()
[J-linux.git] / fs / dlm / lowcomms.c
index 59f64c596233b822f2b14c48c42800201e5771cf..8b80ca0cd65fdc8d9b38a754ca58185edbbedebd 100644 (file)
 
 #define NEEDED_RMEM (4*1024*1024)
 
-/* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
-#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
-
 struct connection {
        struct socket *sock;    /* NULL if not connected */
        uint32_t nodeid;        /* So we know who we are in the list */
-       struct mutex sock_mutex;
+       /* this semaphore is used to allow parallel recv/send in read
+        * lock mode. When we release a sock we need to held the write lock.
+        *
+        * However this is locking code and not nice. When we remove the
+        * othercon handling we can look into other mechanism to synchronize
+        * io handling to call sock_release() at the right time.
+        */
+       struct rw_semaphore sock_lock;
        unsigned long flags;
-#define CF_READ_PENDING 1
-#define CF_WRITE_PENDING 2
-#define CF_INIT_PENDING 4
+#define CF_APP_LIMITED 0
+#define CF_RECV_PENDING 1
+#define CF_SEND_PENDING 2
+#define CF_RECV_INTR 3
+#define CF_IO_STOP 4
 #define CF_IS_OTHERCON 5
-#define CF_CLOSE 6
-#define CF_APP_LIMITED 7
-#define CF_CLOSING 8
-#define CF_SHUTDOWN 9
-#define CF_CONNECTED 10
-#define CF_RECONNECT 11
-#define CF_DELAY_CONNECT 12
-#define CF_EOF 13
        struct list_head writequeue;  /* List of outgoing writequeue_entries */
        spinlock_t writequeue_lock;
-       atomic_t writequeue_cnt;
        int retries;
-#define MAX_CONNECT_RETRIES 3
        struct hlist_node list;
+       /* due some connect()/accept() races we currently have this cross over
+        * connection attempt second connection for one node.
+        *
+        * There is a solution to avoid the race by introducing a connect
+        * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
+        * connect. Otherside can connect but will only be considered that
+        * the other side wants to have a reconnect.
+        *
+        * However changing to this behaviour will break backwards compatible.
+        * In a DLM protocol major version upgrade we should remove this!
+        */
        struct connection *othercon;
-       struct connection *sendcon;
-       struct work_struct rwork; /* Receive workqueue */
-       struct work_struct swork; /* Send workqueue */
-       wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
-       unsigned char *rx_buf;
-       int rx_buflen;
+       struct work_struct rwork; /* receive worker */
+       struct work_struct swork; /* send worker */
+       unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
        int rx_leftover;
+       int mark;
+       int addr_count;
+       int curr_addr_index;
+       struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
+       spinlock_t addrs_lock;
        struct rcu_head rcu;
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -136,13 +144,12 @@ struct dlm_msg {
        struct kref ref;
 };
 
-struct dlm_node_addr {
-       struct list_head list;
+struct processqueue_entry {
+       unsigned char *buf;
        int nodeid;
-       int mark;
-       int addr_count;
-       int curr_addr_index;
-       struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
+       int buflen;
+
+       struct list_head list;
 };
 
 struct dlm_proto_ops {
@@ -157,10 +164,6 @@ struct dlm_proto_ops {
        int (*listen_validate)(void);
        void (*listen_sockopts)(struct socket *sock);
        int (*listen_bind)(struct socket *sock);
-       /* What to do to shutdown */
-       void (*shutdown_action)(struct connection *con);
-       /* What to do to eof check */
-       bool (*eof_condition)(struct connection *con);
 };
 
 static struct listen_sock_callbacks {
@@ -170,17 +173,13 @@ static struct listen_sock_callbacks {
        void (*sk_write_space)(struct sock *);
 } listen_sock;
 
-static LIST_HEAD(dlm_node_addrs);
-static DEFINE_SPINLOCK(dlm_node_addrs_spin);
-
 static struct listen_connection listen_con;
-static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
+static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
 static int dlm_local_count;
-int dlm_allow_conn;
 
 /* Work queues */
-static struct workqueue_struct *recv_workqueue;
-static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *io_workqueue;
+static struct workqueue_struct *process_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -188,8 +187,45 @@ DEFINE_STATIC_SRCU(connections_srcu);
 
 static const struct dlm_proto_ops *dlm_proto_ops;
 
+#define DLM_IO_SUCCESS 0
+#define DLM_IO_END 1
+#define DLM_IO_EOF 2
+#define DLM_IO_RESCHED 3
+
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
+static void process_dlm_messages(struct work_struct *work);
+
+static DECLARE_WORK(process_work, process_dlm_messages);
+static DEFINE_SPINLOCK(processqueue_lock);
+static bool process_dlm_messages_pending;
+static LIST_HEAD(processqueue);
+
+bool dlm_lowcomms_is_running(void)
+{
+       return !!listen_con.sock;
+}
+
+static void lowcomms_queue_swork(struct connection *con)
+{
+       assert_spin_locked(&con->writequeue_lock);
+
+       if (!test_bit(CF_IO_STOP, &con->flags) &&
+           !test_bit(CF_APP_LIMITED, &con->flags) &&
+           !test_and_set_bit(CF_SEND_PENDING, &con->flags))
+               queue_work(io_workqueue, &con->swork);
+}
+
+static void lowcomms_queue_rwork(struct connection *con)
+{
+#ifdef CONFIG_LOCKDEP
+       WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
+#endif
+
+       if (!test_bit(CF_IO_STOP, &con->flags) &&
+           !test_and_set_bit(CF_RECV_PENDING, &con->flags))
+               queue_work(io_workqueue, &con->rwork);
+}
 
 static void writequeue_entry_ctor(void *data)
 {
@@ -214,15 +250,12 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
 {
        struct writequeue_entry *e;
 
-       if (list_empty(&con->writequeue))
-               return NULL;
-
-       e = list_first_entry(&con->writequeue, struct writequeue_entry,
-                            list);
+       e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
+                                    list);
        /* if len is zero nothing is to send, if there are users filling
         * buffers we wait until the users are done so we can send more.
         */
-       if (e->users || e->len == 0)
+       if (!e || e->users || e->len == 0)
                return NULL;
 
        return e;
@@ -240,28 +273,15 @@ static struct connection *__find_con(int nodeid, int r)
        return NULL;
 }
 
-static bool tcp_eof_condition(struct connection *con)
-{
-       return atomic_read(&con->writequeue_cnt);
-}
-
-static int dlm_con_init(struct connection *con, int nodeid)
+static void dlm_con_init(struct connection *con, int nodeid)
 {
-       con->rx_buflen = dlm_config.ci_buffer_size;
-       con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
-       if (!con->rx_buf)
-               return -ENOMEM;
-
        con->nodeid = nodeid;
-       mutex_init(&con->sock_mutex);
+       init_rwsem(&con->sock_lock);
        INIT_LIST_HEAD(&con->writequeue);
        spin_lock_init(&con->writequeue_lock);
-       atomic_set(&con->writequeue_cnt, 0);
        INIT_WORK(&con->swork, process_send_sockets);
        INIT_WORK(&con->rwork, process_recv_sockets);
-       init_waitqueue_head(&con->shutdown_wait);
-
-       return 0;
+       spin_lock_init(&con->addrs_lock);
 }
 
 /*
@@ -271,7 +291,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 {
        struct connection *con, *tmp;
-       int r, ret;
+       int r;
 
        r = nodeid_hash(nodeid);
        con = __find_con(nodeid, r);
@@ -282,11 +302,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
        if (!con)
                return NULL;
 
-       ret = dlm_con_init(con, nodeid);
-       if (ret) {
-               kfree(con);
-               return NULL;
-       }
+       dlm_con_init(con, nodeid);
 
        spin_lock(&connections_lock);
        /* Because multiple workqueues/threads calls this function it can
@@ -298,7 +314,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
        tmp = __find_con(nodeid, r);
        if (tmp) {
                spin_unlock(&connections_lock);
-               kfree(con->rx_buf);
                kfree(con);
                return tmp;
        }
@@ -309,29 +324,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
        return con;
 }
 
-/* Loop round all connections */
-static void foreach_conn(void (*conn_func)(struct connection *c))
-{
-       int i;
-       struct connection *con;
-
-       for (i = 0; i < CONN_HASH_SIZE; i++) {
-               hlist_for_each_entry_rcu(con, &connection_hash[i], list)
-                       conn_func(con);
-       }
-}
-
-static struct dlm_node_addr *find_node_addr(int nodeid)
-{
-       struct dlm_node_addr *na;
-
-       list_for_each_entry(na, &dlm_node_addrs, list) {
-               if (na->nodeid == nodeid)
-                       return na;
-       }
-       return NULL;
-}
-
 static int addr_compare(const struct sockaddr_storage *x,
                        const struct sockaddr_storage *y)
 {
@@ -365,40 +357,47 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
                          unsigned int *mark)
 {
        struct sockaddr_storage sas;
-       struct dlm_node_addr *na;
+       struct connection *con;
+       int idx;
 
        if (!dlm_local_count)
                return -1;
 
-       spin_lock(&dlm_node_addrs_spin);
-       na = find_node_addr(nodeid);
-       if (na && na->addr_count) {
-               memcpy(&sas, na->addr[na->curr_addr_index],
-                      sizeof(struct sockaddr_storage));
+       idx = srcu_read_lock(&connections_srcu);
+       con = nodeid2con(nodeid, 0);
+       if (!con) {
+               srcu_read_unlock(&connections_srcu, idx);
+               return -ENOENT;
+       }
 
-               if (try_new_addr) {
-                       na->curr_addr_index++;
-                       if (na->curr_addr_index == na->addr_count)
-                               na->curr_addr_index = 0;
-               }
+       spin_lock(&con->addrs_lock);
+       if (!con->addr_count) {
+               spin_unlock(&con->addrs_lock);
+               srcu_read_unlock(&connections_srcu, idx);
+               return -ENOENT;
        }
-       spin_unlock(&dlm_node_addrs_spin);
 
-       if (!na)
-               return -EEXIST;
+       memcpy(&sas, &con->addr[con->curr_addr_index],
+              sizeof(struct sockaddr_storage));
 
-       if (!na->addr_count)
-               return -ENOENT;
+       if (try_new_addr) {
+               con->curr_addr_index++;
+               if (con->curr_addr_index == con->addr_count)
+                       con->curr_addr_index = 0;
+       }
 
-       *mark = na->mark;
+       *mark = con->mark;
+       spin_unlock(&con->addrs_lock);
 
        if (sas_out)
                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 
-       if (!sa_out)
+       if (!sa_out) {
+               srcu_read_unlock(&connections_srcu, idx);
                return 0;
+       }
 
-       if (dlm_local_addr[0]->ss_family == AF_INET) {
+       if (dlm_local_addr[0].ss_family == AF_INET) {
                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
@@ -408,43 +407,46 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
                ret6->sin6_addr = in6->sin6_addr;
        }
 
+       srcu_read_unlock(&connections_srcu, idx);
        return 0;
 }
 
 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
                          unsigned int *mark)
 {
-       struct dlm_node_addr *na;
-       int rv = -EEXIST;
-       int addr_i;
-
-       spin_lock(&dlm_node_addrs_spin);
-       list_for_each_entry(na, &dlm_node_addrs, list) {
-               if (!na->addr_count)
-                       continue;
-
-               for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
-                       if (addr_compare(na->addr[addr_i], addr)) {
-                               *nodeid = na->nodeid;
-                               *mark = na->mark;
-                               rv = 0;
-                               goto unlock;
+       struct connection *con;
+       int i, idx, addr_i;
+
+       idx = srcu_read_lock(&connections_srcu);
+       for (i = 0; i < CONN_HASH_SIZE; i++) {
+               hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+                       WARN_ON_ONCE(!con->addr_count);
+
+                       spin_lock(&con->addrs_lock);
+                       for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
+                               if (addr_compare(&con->addr[addr_i], addr)) {
+                                       *nodeid = con->nodeid;
+                                       *mark = con->mark;
+                                       spin_unlock(&con->addrs_lock);
+                                       srcu_read_unlock(&connections_srcu, idx);
+                                       return 0;
+                               }
                        }
+                       spin_unlock(&con->addrs_lock);
                }
        }
-unlock:
-       spin_unlock(&dlm_node_addrs_spin);
-       return rv;
+       srcu_read_unlock(&connections_srcu, idx);
+
+       return -ENOENT;
 }
 
-/* caller need to held dlm_node_addrs_spin lock */
-static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
-                                    const struct sockaddr_storage *addr)
+static bool dlm_lowcomms_con_has_addr(const struct connection *con,
+                                     const struct sockaddr_storage *addr)
 {
        int i;
 
-       for (i = 0; i < na->addr_count; i++) {
-               if (addr_compare(na->addr[i], addr))
+       for (i = 0; i < con->addr_count; i++) {
+               if (addr_compare(&con->addr[i], addr))
                        return true;
        }
 
@@ -453,118 +455,82 @@ static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
 
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 {
-       struct sockaddr_storage *new_addr;
-       struct dlm_node_addr *new_node, *na;
-       bool ret;
-
-       new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
-       if (!new_node)
-               return -ENOMEM;
+       struct connection *con;
+       bool ret, idx;
 
-       new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
-       if (!new_addr) {
-               kfree(new_node);
+       idx = srcu_read_lock(&connections_srcu);
+       con = nodeid2con(nodeid, GFP_NOFS);
+       if (!con) {
+               srcu_read_unlock(&connections_srcu, idx);
                return -ENOMEM;
        }
 
-       memcpy(new_addr, addr, len);
-
-       spin_lock(&dlm_node_addrs_spin);
-       na = find_node_addr(nodeid);
-       if (!na) {
-               new_node->nodeid = nodeid;
-               new_node->addr[0] = new_addr;
-               new_node->addr_count = 1;
-               new_node->mark = dlm_config.ci_mark;
-               list_add(&new_node->list, &dlm_node_addrs);
-               spin_unlock(&dlm_node_addrs_spin);
+       spin_lock(&con->addrs_lock);
+       if (!con->addr_count) {
+               memcpy(&con->addr[0], addr, sizeof(*addr));
+               con->addr_count = 1;
+               con->mark = dlm_config.ci_mark;
+               spin_unlock(&con->addrs_lock);
+               srcu_read_unlock(&connections_srcu, idx);
                return 0;
        }
 
-       ret = dlm_lowcomms_na_has_addr(na, addr);
+       ret = dlm_lowcomms_con_has_addr(con, addr);
        if (ret) {
-               spin_unlock(&dlm_node_addrs_spin);
-               kfree(new_addr);
-               kfree(new_node);
+               spin_unlock(&con->addrs_lock);
+               srcu_read_unlock(&connections_srcu, idx);
                return -EEXIST;
        }
 
-       if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
-               spin_unlock(&dlm_node_addrs_spin);
-               kfree(new_addr);
-               kfree(new_node);
+       if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
+               spin_unlock(&con->addrs_lock);
+               srcu_read_unlock(&connections_srcu, idx);
                return -ENOSPC;
        }
 
-       na->addr[na->addr_count++] = new_addr;
-       spin_unlock(&dlm_node_addrs_spin);
-       kfree(new_node);
+       memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
+       srcu_read_unlock(&connections_srcu, idx);
+       spin_unlock(&con->addrs_lock);
        return 0;
 }
 
 /* Data available on socket or listen socket received a connect */
 static void lowcomms_data_ready(struct sock *sk)
 {
-       struct connection *con;
-
-       con = sock2con(sk);
-       if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
-               queue_work(recv_workqueue, &con->rwork);
-}
-
-static void lowcomms_listen_data_ready(struct sock *sk)
-{
-       if (!dlm_allow_conn)
-               return;
+       struct connection *con = sock2con(sk);
 
-       queue_work(recv_workqueue, &listen_con.rwork);
+       set_bit(CF_RECV_INTR, &con->flags);
+       lowcomms_queue_rwork(con);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
-       struct connection *con;
-
-       con = sock2con(sk);
-       if (!con)
-               return;
-
-       if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
-               log_print("connected to node %d", con->nodeid);
-               queue_work(send_workqueue, &con->swork);
-               return;
-       }
+       struct connection *con = sock2con(sk);
 
        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 
+       spin_lock_bh(&con->writequeue_lock);
        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
                con->sock->sk->sk_write_pending--;
                clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
        }
 
-       queue_work(send_workqueue, &con->swork);
-}
-
-static inline void lowcomms_connect_sock(struct connection *con)
-{
-       if (test_bit(CF_CLOSE, &con->flags))
-               return;
-       queue_work(send_workqueue, &con->swork);
-       cond_resched();
+       lowcomms_queue_swork(con);
+       spin_unlock_bh(&con->writequeue_lock);
 }
 
 static void lowcomms_state_change(struct sock *sk)
 {
        /* SCTP layer is not calling sk_data_ready when the connection
-        * is done, so we catch the signal through here. Also, it
-        * doesn't switch socket state when entering shutdown, so we
-        * skip the write in that case.
+        * is done, so we catch the signal through here.
         */
-       if (sk->sk_shutdown) {
-               if (sk->sk_shutdown == RCV_SHUTDOWN)
-                       lowcomms_data_ready(sk);
-       } else if (sk->sk_state == TCP_ESTABLISHED) {
-               lowcomms_write_space(sk);
-       }
+       if (sk->sk_shutdown == RCV_SHUTDOWN)
+               lowcomms_data_ready(sk);
+}
+
+static void lowcomms_listen_data_ready(struct sock *sk)
+{
+       queue_work(io_workqueue, &listen_con.rwork);
 }
 
 int dlm_lowcomms_connect_node(int nodeid)
@@ -576,47 +542,49 @@ int dlm_lowcomms_connect_node(int nodeid)
                return 0;
 
        idx = srcu_read_lock(&connections_srcu);
-       con = nodeid2con(nodeid, GFP_NOFS);
-       if (!con) {
+       con = nodeid2con(nodeid, 0);
+       if (WARN_ON_ONCE(!con)) {
                srcu_read_unlock(&connections_srcu, idx);
-               return -ENOMEM;
+               return -ENOENT;
        }
 
-       lowcomms_connect_sock(con);
+       down_read(&con->sock_lock);
+       if (!con->sock) {
+               spin_lock_bh(&con->writequeue_lock);
+               lowcomms_queue_swork(con);
+               spin_unlock_bh(&con->writequeue_lock);
+       }
+       up_read(&con->sock_lock);
        srcu_read_unlock(&connections_srcu, idx);
 
+       cond_resched();
        return 0;
 }
 
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
 {
-       struct dlm_node_addr *na;
+       struct connection *con;
+       int idx;
 
-       spin_lock(&dlm_node_addrs_spin);
-       na = find_node_addr(nodeid);
-       if (!na) {
-               spin_unlock(&dlm_node_addrs_spin);
+       idx = srcu_read_lock(&connections_srcu);
+       con = nodeid2con(nodeid, 0);
+       if (!con) {
+               srcu_read_unlock(&connections_srcu, idx);
                return -ENOENT;
        }
 
-       na->mark = mark;
-       spin_unlock(&dlm_node_addrs_spin);
-
+       spin_lock(&con->addrs_lock);
+       con->mark = mark;
+       spin_unlock(&con->addrs_lock);
+       srcu_read_unlock(&connections_srcu, idx);
        return 0;
 }
 
 static void lowcomms_error_report(struct sock *sk)
 {
-       struct connection *con;
-       void (*orig_report)(struct sock *) = NULL;
+       struct connection *con = sock2con(sk);
        struct inet_sock *inet;
 
-       con = sock2con(sk);
-       if (con == NULL)
-               goto out;
-
-       orig_report = listen_sock.sk_error_report;
-
        inet = inet_sk(sk);
        switch (sk->sk_family) {
        case AF_INET:
@@ -642,66 +610,25 @@ static void lowcomms_error_report(struct sock *sk)
                                   "invalid socket family %d set, "
                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
                                   sk->sk_family, sk->sk_err, sk->sk_err_soft);
-               goto out;
-       }
-
-       /* below sendcon only handling */
-       if (test_bit(CF_IS_OTHERCON, &con->flags))
-               con = con->sendcon;
-
-       switch (sk->sk_err) {
-       case ECONNREFUSED:
-               set_bit(CF_DELAY_CONNECT, &con->flags);
-               break;
-       default:
                break;
        }
 
-       if (!test_and_set_bit(CF_RECONNECT, &con->flags))
-               queue_work(send_workqueue, &con->swork);
+       dlm_midcomms_unack_msg_resend(con->nodeid);
 
-out:
-       if (orig_report)
-               orig_report(sk);
+       listen_sock.sk_error_report(sk);
 }
 
-/* Note: sk_callback_lock must be locked before calling this function. */
-static void save_listen_callbacks(struct socket *sock)
+static void restore_callbacks(struct sock *sk)
 {
-       struct sock *sk = sock->sk;
-
-       listen_sock.sk_data_ready = sk->sk_data_ready;
-       listen_sock.sk_state_change = sk->sk_state_change;
-       listen_sock.sk_write_space = sk->sk_write_space;
-       listen_sock.sk_error_report = sk->sk_error_report;
-}
-
-static void restore_callbacks(struct socket *sock)
-{
-       struct sock *sk = sock->sk;
+#ifdef CONFIG_LOCKDEP
+       WARN_ON_ONCE(!lockdep_sock_is_held(sk));
+#endif
 
-       lock_sock(sk);
        sk->sk_user_data = NULL;
        sk->sk_data_ready = listen_sock.sk_data_ready;
        sk->sk_state_change = listen_sock.sk_state_change;
        sk->sk_write_space = listen_sock.sk_write_space;
        sk->sk_error_report = listen_sock.sk_error_report;
-       release_sock(sk);
-}
-
-static void add_listen_sock(struct socket *sock, struct listen_connection *con)
-{
-       struct sock *sk = sock->sk;
-
-       lock_sock(sk);
-       save_listen_callbacks(sock);
-       con->sock = sock;
-
-       sk->sk_user_data = con;
-       sk->sk_allocation = GFP_NOFS;
-       /* Install a data_ready callback */
-       sk->sk_data_ready = lowcomms_listen_data_ready;
-       release_sock(sk);
 }
 
 /* Make a socket active */
@@ -713,10 +640,10 @@ static void add_sock(struct socket *sock, struct connection *con)
        con->sock = sock;
 
        sk->sk_user_data = con;
-       /* Install a data_ready callback */
        sk->sk_data_ready = lowcomms_data_ready;
        sk->sk_write_space = lowcomms_write_space;
-       sk->sk_state_change = lowcomms_state_change;
+       if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
+               sk->sk_state_change = lowcomms_state_change;
        sk->sk_allocation = GFP_NOFS;
        sk->sk_error_report = lowcomms_error_report;
        release_sock(sk);
@@ -727,7 +654,7 @@ static void add_sock(struct socket *sock, struct connection *con)
 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
                          int *addr_len)
 {
-       saddr->ss_family =  dlm_local_addr[0]->ss_family;
+       saddr->ss_family =  dlm_local_addr[0].ss_family;
        if (saddr->ss_family == AF_INET) {
                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
                in4_addr->sin_port = cpu_to_be16(port);
@@ -773,43 +700,67 @@ static void free_entry(struct writequeue_entry *e)
        }
 
        list_del(&e->list);
-       atomic_dec(&e->con->writequeue_cnt);
        kref_put(&e->ref, dlm_page_release);
 }
 
 static void dlm_close_sock(struct socket **sock)
 {
-       if (*sock) {
-               restore_callbacks(*sock);
-               sock_release(*sock);
-               *sock = NULL;
+       lock_sock((*sock)->sk);
+       restore_callbacks((*sock)->sk);
+       release_sock((*sock)->sk);
+
+       sock_release(*sock);
+       *sock = NULL;
+}
+
+static void allow_connection_io(struct connection *con)
+{
+       if (con->othercon)
+               clear_bit(CF_IO_STOP, &con->othercon->flags);
+       clear_bit(CF_IO_STOP, &con->flags);
+}
+
+static void stop_connection_io(struct connection *con)
+{
+       if (con->othercon)
+               stop_connection_io(con->othercon);
+
+       down_write(&con->sock_lock);
+       if (con->sock) {
+               lock_sock(con->sock->sk);
+               restore_callbacks(con->sock->sk);
+
+               spin_lock_bh(&con->writequeue_lock);
+               set_bit(CF_IO_STOP, &con->flags);
+               spin_unlock_bh(&con->writequeue_lock);
+               release_sock(con->sock->sk);
+       } else {
+               spin_lock_bh(&con->writequeue_lock);
+               set_bit(CF_IO_STOP, &con->flags);
+               spin_unlock_bh(&con->writequeue_lock);
        }
+       up_write(&con->sock_lock);
+
+       cancel_work_sync(&con->swork);
+       cancel_work_sync(&con->rwork);
 }
 
 /* Close a remote connection and tidy up */
-static void close_connection(struct connection *con, bool and_other,
-                            bool tx, bool rx)
+static void close_connection(struct connection *con, bool and_other)
 {
-       bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
        struct writequeue_entry *e;
 
-       if (tx && !closing && cancel_work_sync(&con->swork)) {
-               log_print("canceled swork for node %d", con->nodeid);
-               clear_bit(CF_WRITE_PENDING, &con->flags);
-       }
-       if (rx && !closing && cancel_work_sync(&con->rwork)) {
-               log_print("canceled rwork for node %d", con->nodeid);
-               clear_bit(CF_READ_PENDING, &con->flags);
+       if (con->othercon && and_other)
+               close_connection(con->othercon, false);
+
+       down_write(&con->sock_lock);
+       if (!con->sock) {
+               up_write(&con->sock_lock);
+               return;
        }
 
-       mutex_lock(&con->sock_mutex);
        dlm_close_sock(&con->sock);
 
-       if (con->othercon && and_other) {
-               /* Will only re-enter once. */
-               close_connection(con->othercon, false, tx, rx);
-       }
-
        /* if we send a writequeue entry only a half way, we drop the
         * whole entry because reconnection and that we not start of the
         * middle of a msg which will confuse the other end.
@@ -821,200 +772,209 @@ static void close_connection(struct connection *con, bool and_other,
         * our policy is to start on a clean state when disconnects, we don't
         * know what's send/received on transport layer in this case.
         */
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        if (!list_empty(&con->writequeue)) {
                e = list_first_entry(&con->writequeue, struct writequeue_entry,
                                     list);
                if (e->dirty)
                        free_entry(e);
        }
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 
        con->rx_leftover = 0;
        con->retries = 0;
        clear_bit(CF_APP_LIMITED, &con->flags);
-       clear_bit(CF_CONNECTED, &con->flags);
-       clear_bit(CF_DELAY_CONNECT, &con->flags);
-       clear_bit(CF_RECONNECT, &con->flags);
-       clear_bit(CF_EOF, &con->flags);
-       mutex_unlock(&con->sock_mutex);
-       clear_bit(CF_CLOSING, &con->flags);
+       clear_bit(CF_RECV_PENDING, &con->flags);
+       clear_bit(CF_SEND_PENDING, &con->flags);
+       up_write(&con->sock_lock);
 }
 
-static void shutdown_connection(struct connection *con)
+static struct processqueue_entry *new_processqueue_entry(int nodeid,
+                                                        int buflen)
 {
-       int ret;
-
-       flush_work(&con->swork);
+       struct processqueue_entry *pentry;
 
-       mutex_lock(&con->sock_mutex);
-       /* nothing to shutdown */
-       if (!con->sock) {
-               mutex_unlock(&con->sock_mutex);
-               return;
-       }
+       pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
+       if (!pentry)
+               return NULL;
 
-       set_bit(CF_SHUTDOWN, &con->flags);
-       ret = kernel_sock_shutdown(con->sock, SHUT_WR);
-       mutex_unlock(&con->sock_mutex);
-       if (ret) {
-               log_print("Connection %p failed to shutdown: %d will force close",
-                         con, ret);
-               goto force_close;
-       } else {
-               ret = wait_event_timeout(con->shutdown_wait,
-                                        !test_bit(CF_SHUTDOWN, &con->flags),
-                                        DLM_SHUTDOWN_WAIT_TIMEOUT);
-               if (ret == 0) {
-                       log_print("Connection %p shutdown timed out, will force close",
-                                 con);
-                       goto force_close;
-               }
+       pentry->buf = kmalloc(buflen, GFP_NOFS);
+       if (!pentry->buf) {
+               kfree(pentry);
+               return NULL;
        }
 
-       return;
+       pentry->nodeid = nodeid;
+       return pentry;
+}
 
-force_close:
-       clear_bit(CF_SHUTDOWN, &con->flags);
-       close_connection(con, false, true, true);
+static void free_processqueue_entry(struct processqueue_entry *pentry)
+{
+       kfree(pentry->buf);
+       kfree(pentry);
 }
 
-static void dlm_tcp_shutdown(struct connection *con)
+struct dlm_processed_nodes {
+       int nodeid;
+
+       struct list_head list;
+};
+
+static void add_processed_node(int nodeid, struct list_head *processed_nodes)
 {
-       if (con->othercon)
-               shutdown_connection(con->othercon);
-       shutdown_connection(con);
+       struct dlm_processed_nodes *n;
+
+       list_for_each_entry(n, processed_nodes, list) {
+               /* we already remembered this node */
+               if (n->nodeid == nodeid)
+                       return;
+       }
+
+       /* if it's fails in worst case we simple don't send an ack back.
+        * We try it next time.
+        */
+       n = kmalloc(sizeof(*n), GFP_NOFS);
+       if (!n)
+               return;
+
+       n->nodeid = nodeid;
+       list_add(&n->list, processed_nodes);
 }
 
-static int con_realloc_receive_buf(struct connection *con, int newlen)
+static void process_dlm_messages(struct work_struct *work)
 {
-       unsigned char *newbuf;
+       struct dlm_processed_nodes *n, *n_tmp;
+       struct processqueue_entry *pentry;
+       LIST_HEAD(processed_nodes);
 
-       newbuf = kmalloc(newlen, GFP_NOFS);
-       if (!newbuf)
-               return -ENOMEM;
+       spin_lock(&processqueue_lock);
+       pentry = list_first_entry_or_null(&processqueue,
+                                         struct processqueue_entry, list);
+       if (WARN_ON_ONCE(!pentry)) {
+               spin_unlock(&processqueue_lock);
+               return;
+       }
 
-       /* copy any leftover from last receive */
-       if (con->rx_leftover)
-               memmove(newbuf, con->rx_buf, con->rx_leftover);
+       list_del(&pentry->list);
+       spin_unlock(&processqueue_lock);
 
-       /* swap to new buffer space */
-       kfree(con->rx_buf);
-       con->rx_buflen = newlen;
-       con->rx_buf = newbuf;
+       for (;;) {
+               dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
+                                           pentry->buflen);
+               add_processed_node(pentry->nodeid, &processed_nodes);
+               free_processqueue_entry(pentry);
+
+               spin_lock(&processqueue_lock);
+               pentry = list_first_entry_or_null(&processqueue,
+                                                 struct processqueue_entry, list);
+               if (!pentry) {
+                       process_dlm_messages_pending = false;
+                       spin_unlock(&processqueue_lock);
+                       break;
+               }
 
-       return 0;
+               list_del(&pentry->list);
+               spin_unlock(&processqueue_lock);
+       }
+
+       /* send ack back after we processed couple of messages */
+       list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
+               list_del(&n->list);
+               dlm_midcomms_receive_done(n->nodeid);
+               kfree(n);
+       }
 }
 
 /* Data received from remote end */
-static int receive_from_sock(struct connection *con)
+static int receive_from_sock(struct connection *con, int buflen)
 {
+       struct processqueue_entry *pentry;
+       int ret, buflen_real;
        struct msghdr msg;
        struct kvec iov;
-       int ret, buflen;
 
-       mutex_lock(&con->sock_mutex);
+       pentry = new_processqueue_entry(con->nodeid, buflen);
+       if (!pentry)
+               return DLM_IO_RESCHED;
 
-       if (con->sock == NULL) {
-               ret = -EAGAIN;
-               goto out_close;
-       }
-
-       /* realloc if we get new buffer size to read out */
-       buflen = dlm_config.ci_buffer_size;
-       if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
-               ret = con_realloc_receive_buf(con, buflen);
-               if (ret < 0)
-                       goto out_resched;
-       }
+       memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
 
-       for (;;) {
-               /* calculate new buffer parameter regarding last receive and
-                * possible leftover bytes
-                */
-               iov.iov_base = con->rx_buf + con->rx_leftover;
-               iov.iov_len = con->rx_buflen - con->rx_leftover;
-
-               memset(&msg, 0, sizeof(msg));
-               msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
-               ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
-                                    msg.msg_flags);
-               trace_dlm_recv(con->nodeid, ret);
-               if (ret == -EAGAIN)
-                       break;
-               else if (ret <= 0)
-                       goto out_close;
-
-               /* new buflen according readed bytes and leftover from last receive */
-               buflen = ret + con->rx_leftover;
-               ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
-               if (ret < 0)
-                       goto out_close;
-
-               /* calculate leftover bytes from process and put it into begin of
-                * the receive buffer, so next receive we have the full message
-                * at the start address of the receive buffer.
-                */
-               con->rx_leftover = buflen - ret;
-               if (con->rx_leftover) {
-                       memmove(con->rx_buf, con->rx_buf + ret,
-                               con->rx_leftover);
+       /* calculate new buffer parameter regarding last receive and
+        * possible leftover bytes
+        */
+       iov.iov_base = pentry->buf + con->rx_leftover;
+       iov.iov_len = buflen - con->rx_leftover;
+
+       memset(&msg, 0, sizeof(msg));
+       msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+       clear_bit(CF_RECV_INTR, &con->flags);
+again:
+       ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+                            msg.msg_flags);
+       trace_dlm_recv(con->nodeid, ret);
+       if (ret == -EAGAIN) {
+               lock_sock(con->sock->sk);
+               if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
+                       release_sock(con->sock->sk);
+                       goto again;
                }
+
+               clear_bit(CF_RECV_PENDING, &con->flags);
+               release_sock(con->sock->sk);
+               free_processqueue_entry(pentry);
+               return DLM_IO_END;
+       } else if (ret == 0) {
+               /* close will clear CF_RECV_PENDING */
+               free_processqueue_entry(pentry);
+               return DLM_IO_EOF;
+       } else if (ret < 0) {
+               free_processqueue_entry(pentry);
+               return ret;
        }
 
-       dlm_midcomms_receive_done(con->nodeid);
-       mutex_unlock(&con->sock_mutex);
-       return 0;
+       /* new buflen according readed bytes and leftover from last receive */
+       buflen_real = ret + con->rx_leftover;
+       ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
+                                          buflen_real);
+       if (ret < 0) {
+               free_processqueue_entry(pentry);
+               return ret;
+       }
 
-out_resched:
-       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
-               queue_work(recv_workqueue, &con->rwork);
-       mutex_unlock(&con->sock_mutex);
-       return -EAGAIN;
-
-out_close:
-       if (ret == 0) {
-               log_print("connection %p got EOF from %d",
-                         con, con->nodeid);
-
-               if (dlm_proto_ops->eof_condition &&
-                   dlm_proto_ops->eof_condition(con)) {
-                       set_bit(CF_EOF, &con->flags);
-                       mutex_unlock(&con->sock_mutex);
-               } else {
-                       mutex_unlock(&con->sock_mutex);
-                       close_connection(con, false, true, false);
+       pentry->buflen = ret;
 
-                       /* handling for tcp shutdown */
-                       clear_bit(CF_SHUTDOWN, &con->flags);
-                       wake_up(&con->shutdown_wait);
-               }
+       /* calculate leftover bytes from process and put it into begin of
+        * the receive buffer, so next receive we have the full message
+        * at the start address of the receive buffer.
+        */
+       con->rx_leftover = buflen_real - ret;
+       memmove(con->rx_leftover_buf, pentry->buf + ret,
+               con->rx_leftover);
 
-               /* signal to breaking receive worker */
-               ret = -1;
-       } else {
-               mutex_unlock(&con->sock_mutex);
+       spin_lock(&processqueue_lock);
+       list_add_tail(&pentry->list, &processqueue);
+       if (!process_dlm_messages_pending) {
+               process_dlm_messages_pending = true;
+               queue_work(process_workqueue, &process_work);
        }
-       return ret;
+       spin_unlock(&processqueue_lock);
+
+       return DLM_IO_SUCCESS;
 }
 
 /* Listening socket is busy, accept a connection */
-static int accept_from_sock(struct listen_connection *con)
+static int accept_from_sock(void)
 {
-       int result;
        struct sockaddr_storage peeraddr;
-       struct socket *newsock;
-       int len, idx;
-       int nodeid;
+       int len, idx, result, nodeid;
        struct connection *newcon;
-       struct connection *addcon;
+       struct socket *newsock;
        unsigned int mark;
 
-       if (!con->sock)
-               return -ENOTCONN;
-
-       result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
-       if (result < 0)
+       result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
+       if (result == -EAGAIN)
+               return DLM_IO_END;
+       else if (result < 0)
                goto accept_err;
 
        /* Get the connected socket's peer */
@@ -1062,16 +1022,16 @@ static int accept_from_sock(struct listen_connection *con)
         *  In this case we store the incoming one in "othercon"
         */
        idx = srcu_read_lock(&connections_srcu);
-       newcon = nodeid2con(nodeid, GFP_NOFS);
-       if (!newcon) {
+       newcon = nodeid2con(nodeid, 0);
+       if (WARN_ON_ONCE(!newcon)) {
                srcu_read_unlock(&connections_srcu, idx);
-               result = -ENOMEM;
+               result = -ENOENT;
                goto accept_err;
        }
 
        sock_set_mark(newsock->sk, mark);
 
-       mutex_lock(&newcon->sock_mutex);
+       down_write(&newcon->sock_lock);
        if (newcon->sock) {
                struct connection *othercon = newcon->othercon;
 
@@ -1079,63 +1039,50 @@ static int accept_from_sock(struct listen_connection *con)
                        othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
                        if (!othercon) {
                                log_print("failed to allocate incoming socket");
-                               mutex_unlock(&newcon->sock_mutex);
+                               up_write(&newcon->sock_lock);
                                srcu_read_unlock(&connections_srcu, idx);
                                result = -ENOMEM;
                                goto accept_err;
                        }
 
-                       result = dlm_con_init(othercon, nodeid);
-                       if (result < 0) {
-                               kfree(othercon);
-                               mutex_unlock(&newcon->sock_mutex);
-                               srcu_read_unlock(&connections_srcu, idx);
-                               goto accept_err;
-                       }
-
-                       lockdep_set_subclass(&othercon->sock_mutex, 1);
-                       set_bit(CF_IS_OTHERCON, &othercon->flags);
+                       dlm_con_init(othercon, nodeid);
+                       lockdep_set_subclass(&othercon->sock_lock, 1);
                        newcon->othercon = othercon;
-                       othercon->sendcon = newcon;
+                       set_bit(CF_IS_OTHERCON, &othercon->flags);
                } else {
                        /* close other sock con if we have something new */
-                       close_connection(othercon, false, true, false);
+                       close_connection(othercon, false);
                }
 
-               mutex_lock(&othercon->sock_mutex);
+               down_write(&othercon->sock_lock);
                add_sock(newsock, othercon);
-               addcon = othercon;
-               mutex_unlock(&othercon->sock_mutex);
+
+               /* check if we receved something while adding */
+               lock_sock(othercon->sock->sk);
+               lowcomms_queue_rwork(othercon);
+               release_sock(othercon->sock->sk);
+               up_write(&othercon->sock_lock);
        }
        else {
                /* accept copies the sk after we've saved the callbacks, so we
                   don't want to save them a second time or comm errors will
                   result in calling sk_error_report recursively. */
                add_sock(newsock, newcon);
-               addcon = newcon;
-       }
-
-       set_bit(CF_CONNECTED, &addcon->flags);
-       mutex_unlock(&newcon->sock_mutex);
-
-       /*
-        * Add it to the active queue in case we got data
-        * between processing the accept adding the socket
-        * to the read_sockets list
-        */
-       if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
-               queue_work(recv_workqueue, &addcon->rwork);
 
+               /* check if we receved something while adding */
+               lock_sock(newcon->sock->sk);
+               lowcomms_queue_rwork(newcon);
+               release_sock(newcon->sock->sk);
+       }
+       up_write(&newcon->sock_lock);
        srcu_read_unlock(&connections_srcu, idx);
 
-       return 0;
+       return DLM_IO_SUCCESS;
 
 accept_err:
        if (newsock)
                sock_release(newsock);
 
-       if (result != -EAGAIN)
-               log_print("error accepting connection from node: %d", result);
        return result;
 }
 
@@ -1167,7 +1114,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
        int i, addr_len, result = 0;
 
        for (i = 0; i < dlm_local_count; i++) {
-               memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
+               memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
                make_sockaddr(&localaddr, port, &addr_len);
 
                if (!i)
@@ -1187,7 +1134,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
 /* Get local addresses */
 static void init_local(void)
 {
-       struct sockaddr_storage sas, *addr;
+       struct sockaddr_storage sas;
        int i;
 
        dlm_local_count = 0;
@@ -1195,21 +1142,10 @@ static void init_local(void)
                if (dlm_our_addr(&sas, i))
                        break;
 
-               addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
-               if (!addr)
-                       break;
-               dlm_local_addr[dlm_local_count++] = addr;
+               memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
        }
 }
 
-static void deinit_local(void)
-{
-       int i;
-
-       for (i = 0; i < dlm_local_count; i++)
-               kfree(dlm_local_addr[i]);
-}
-
 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
 {
        struct writequeue_entry *entry;
@@ -1240,7 +1176,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 {
        struct writequeue_entry *e;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        if (!list_empty(&con->writequeue)) {
                e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
                if (DLM_WQ_REMAIN_BYTES(e) >= len) {
@@ -1263,14 +1199,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
        kref_get(&e->ref);
        *ppc = page_address(e->page);
        e->end += len;
-       atomic_inc(&con->writequeue_cnt);
        if (cb)
                cb(data);
 
        list_add_tail(&e->list, &con->writequeue);
 
 out:
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
        return e;
 };
 
@@ -1319,13 +1254,13 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
            len < sizeof(struct dlm_header)) {
                BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
                log_print("failed to allocate a buffer of size %d", len);
-               WARN_ON(1);
+               WARN_ON_ONCE(1);
                return NULL;
        }
 
        idx = srcu_read_lock(&connections_srcu);
-       con = nodeid2con(nodeid, allocation);
-       if (!con) {
+       con = nodeid2con(nodeid, 0);
+       if (WARN_ON_ONCE(!con)) {
                srcu_read_unlock(&connections_srcu, idx);
                return NULL;
        }
@@ -1350,7 +1285,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
        struct connection *con = e->con;
        int users;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        kref_get(&msg->ref);
        list_add(&msg->list, &e->msgs);
 
@@ -1359,13 +1294,11 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
                goto out;
 
        e->len = DLM_WQ_LENGTH_BYTES(e);
-       spin_unlock(&con->writequeue_lock);
 
-       queue_work(send_workqueue, &con->swork);
-       return;
+       lowcomms_queue_swork(con);
 
 out:
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
        return;
 }
 
@@ -1387,7 +1320,7 @@ void dlm_lowcomms_put_msg(struct dlm_msg *msg)
        kref_put(&msg->ref, dlm_msg_release);
 }
 
-/* does not held connections_srcu, usage workqueue only */
+/* does not held connections_srcu, usage lowcomms_error_report only */
 int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 {
        struct dlm_msg *msg_resend;
@@ -1413,90 +1346,79 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
 }
 
 /* Send a message */
-static void send_to_sock(struct connection *con)
+static int send_to_sock(struct connection *con)
 {
        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
        struct writequeue_entry *e;
        int len, offset, ret;
-       int count = 0;
 
-       mutex_lock(&con->sock_mutex);
-       if (con->sock == NULL)
-               goto out_connect;
-
-       spin_lock(&con->writequeue_lock);
-       for (;;) {
-               e = con_next_wq(con);
-               if (!e)
-                       break;
+       spin_lock_bh(&con->writequeue_lock);
+       e = con_next_wq(con);
+       if (!e) {
+               clear_bit(CF_SEND_PENDING, &con->flags);
+               spin_unlock_bh(&con->writequeue_lock);
+               return DLM_IO_END;
+       }
 
-               len = e->len;
-               offset = e->offset;
-               BUG_ON(len == 0 && e->users == 0);
-               spin_unlock(&con->writequeue_lock);
-
-               ret = kernel_sendpage(con->sock, e->page, offset, len,
-                                     msg_flags);
-               trace_dlm_send(con->nodeid, ret);
-               if (ret == -EAGAIN || ret == 0) {
-                       if (ret == -EAGAIN &&
-                           test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
-                           !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
-                               /* Notify TCP that we're limited by the
-                                * application window size.
-                                */
-                               set_bit(SOCK_NOSPACE, &con->sock->flags);
-                               con->sock->sk->sk_write_pending++;
-                       }
-                       cond_resched();
-                       goto out;
-               } else if (ret < 0)
-                       goto out;
+       len = e->len;
+       offset = e->offset;
+       WARN_ON_ONCE(len == 0 && e->users == 0);
+       spin_unlock_bh(&con->writequeue_lock);
 
-               /* Don't starve people filling buffers */
-               if (++count >= MAX_SEND_MSG_COUNT) {
-                       cond_resched();
-                       count = 0;
+       ret = kernel_sendpage(con->sock, e->page, offset, len,
+                             msg_flags);
+       trace_dlm_send(con->nodeid, ret);
+       if (ret == -EAGAIN || ret == 0) {
+               lock_sock(con->sock->sk);
+               spin_lock_bh(&con->writequeue_lock);
+               if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+                   !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+                       /* Notify TCP that we're limited by the
+                        * application window size.
+                        */
+                       set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
+                       con->sock->sk->sk_write_pending++;
+
+                       clear_bit(CF_SEND_PENDING, &con->flags);
+                       spin_unlock_bh(&con->writequeue_lock);
+                       release_sock(con->sock->sk);
+
+                       /* wait for write_space() event */
+                       return DLM_IO_END;
                }
+               spin_unlock_bh(&con->writequeue_lock);
+               release_sock(con->sock->sk);
 
-               spin_lock(&con->writequeue_lock);
-               writequeue_entry_complete(e, ret);
-       }
-       spin_unlock(&con->writequeue_lock);
-
-       /* close if we got EOF */
-       if (test_and_clear_bit(CF_EOF, &con->flags)) {
-               mutex_unlock(&con->sock_mutex);
-               close_connection(con, false, false, true);
-
-               /* handling for tcp shutdown */
-               clear_bit(CF_SHUTDOWN, &con->flags);
-               wake_up(&con->shutdown_wait);
-       } else {
-               mutex_unlock(&con->sock_mutex);
+               return DLM_IO_RESCHED;
+       } else if (ret < 0) {
+               return ret;
        }
 
-       return;
-
-out:
-       mutex_unlock(&con->sock_mutex);
-       return;
+       spin_lock_bh(&con->writequeue_lock);
+       writequeue_entry_complete(e, ret);
+       spin_unlock_bh(&con->writequeue_lock);
 
-out_connect:
-       mutex_unlock(&con->sock_mutex);
-       queue_work(send_workqueue, &con->swork);
-       cond_resched();
+       return DLM_IO_SUCCESS;
 }
 
 static void clean_one_writequeue(struct connection *con)
 {
        struct writequeue_entry *e, *safe;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
                free_entry(e);
        }
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
+}
+
+static void connection_release(struct rcu_head *rcu)
+{
+       struct connection *con = container_of(rcu, struct connection, rcu);
+
+       WARN_ON_ONCE(!list_empty(&con->writequeue));
+       WARN_ON_ONCE(con->sock);
+       kfree(con);
 }
 
 /* Called from recovery when it knows that a node has
@@ -1504,286 +1426,311 @@ static void clean_one_writequeue(struct connection *con)
 int dlm_lowcomms_close(int nodeid)
 {
        struct connection *con;
-       struct dlm_node_addr *na;
        int idx;
 
        log_print("closing connection to node %d", nodeid);
+
        idx = srcu_read_lock(&connections_srcu);
        con = nodeid2con(nodeid, 0);
-       if (con) {
-               set_bit(CF_CLOSE, &con->flags);
-               close_connection(con, true, true, true);
-               clean_one_writequeue(con);
+       if (WARN_ON_ONCE(!con)) {
+               srcu_read_unlock(&connections_srcu, idx);
+               return -ENOENT;
+       }
+
+       stop_connection_io(con);
+       log_print("io handling for node: %d stopped", nodeid);
+       close_connection(con, true);
+
+       spin_lock(&connections_lock);
+       hlist_del_rcu(&con->list);
+       spin_unlock(&connections_lock);
+
+       clean_one_writequeue(con);
+       call_srcu(&connections_srcu, &con->rcu, connection_release);
+       if (con->othercon) {
+               clean_one_writequeue(con->othercon);
                if (con->othercon)
-                       clean_one_writequeue(con->othercon);
+                       call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
        }
        srcu_read_unlock(&connections_srcu, idx);
 
-       spin_lock(&dlm_node_addrs_spin);
-       na = find_node_addr(nodeid);
-       if (na) {
-               list_del(&na->list);
-               while (na->addr_count--)
-                       kfree(na->addr[na->addr_count]);
-               kfree(na);
-       }
-       spin_unlock(&dlm_node_addrs_spin);
+       /* for debugging we print when we are done to compare with other
+        * messages in between. This function need to be correctly synchronized
+        * with io handling
+        */
+       log_print("closing connection to node %d done", nodeid);
 
        return 0;
 }
 
-/* Receive workqueue function */
+/* Receive worker function */
 static void process_recv_sockets(struct work_struct *work)
 {
        struct connection *con = container_of(work, struct connection, rwork);
+       int ret, buflen;
+
+       down_read(&con->sock_lock);
+       if (!con->sock) {
+               up_read(&con->sock_lock);
+               return;
+       }
+
+       buflen = READ_ONCE(dlm_config.ci_buffer_size);
+       do {
+               ret = receive_from_sock(con, buflen);
+       } while (ret == DLM_IO_SUCCESS);
+       up_read(&con->sock_lock);
 
-       clear_bit(CF_READ_PENDING, &con->flags);
-       receive_from_sock(con);
+       switch (ret) {
+       case DLM_IO_END:
+               /* CF_RECV_PENDING cleared */
+               break;
+       case DLM_IO_EOF:
+               close_connection(con, false);
+               /* CF_RECV_PENDING cleared */
+               break;
+       case DLM_IO_RESCHED:
+               cond_resched();
+               queue_work(io_workqueue, &con->rwork);
+               /* CF_RECV_PENDING not cleared */
+               break;
+       default:
+               if (ret < 0) {
+                       if (test_bit(CF_IS_OTHERCON, &con->flags)) {
+                               close_connection(con, false);
+                       } else {
+                               spin_lock_bh(&con->writequeue_lock);
+                               lowcomms_queue_swork(con);
+                               spin_unlock_bh(&con->writequeue_lock);
+                       }
+
+                       /* CF_RECV_PENDING cleared for othercon
+                        * we trigger send queue if not already done
+                        * and process_send_sockets will handle it
+                        */
+                       break;
+               }
+
+               WARN_ON_ONCE(1);
+               break;
+       }
 }
 
 static void process_listen_recv_socket(struct work_struct *work)
 {
-       accept_from_sock(&listen_con);
+       int ret;
+
+       if (WARN_ON_ONCE(!listen_con.sock))
+               return;
+
+       do {
+               ret = accept_from_sock();
+       } while (ret == DLM_IO_SUCCESS);
+
+       if (ret < 0)
+               log_print("critical error accepting connection: %d", ret);
 }
 
-static void dlm_connect(struct connection *con)
+static int dlm_connect(struct connection *con)
 {
        struct sockaddr_storage addr;
        int result, addr_len;
        struct socket *sock;
        unsigned int mark;
 
-       /* Some odd races can cause double-connects, ignore them */
-       if (con->retries++ > MAX_CONNECT_RETRIES)
-               return;
-
-       if (con->sock) {
-               log_print("node %d already connected.", con->nodeid);
-               return;
-       }
-
        memset(&addr, 0, sizeof(addr));
        result = nodeid_to_addr(con->nodeid, &addr, NULL,
                                dlm_proto_ops->try_new_addr, &mark);
        if (result < 0) {
                log_print("no address for nodeid %d", con->nodeid);
-               return;
+               return result;
        }
 
        /* Create a socket to communicate with */
-       result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+       result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
                                  SOCK_STREAM, dlm_proto_ops->proto, &sock);
        if (result < 0)
-               goto socket_err;
+               return result;
 
        sock_set_mark(sock->sk, mark);
        dlm_proto_ops->sockopts(sock);
 
-       add_sock(sock, con);
-
        result = dlm_proto_ops->bind(sock);
-       if (result < 0)
-               goto add_sock_err;
+       if (result < 0) {
+               sock_release(sock);
+               return result;
+       }
+
+       add_sock(sock, con);
 
        log_print_ratelimited("connecting to %d", con->nodeid);
        make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
        result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
                                        addr_len);
-       if (result < 0)
-               goto add_sock_err;
-
-       return;
-
-add_sock_err:
-       dlm_close_sock(&con->sock);
+       switch (result) {
+       case -EINPROGRESS:
+               /* not an error */
+               fallthrough;
+       case 0:
+               break;
+       default:
+               if (result < 0)
+                       dlm_close_sock(&con->sock);
 
-socket_err:
-       /*
-        * Some errors are fatal and this list might need adjusting. For other
-        * errors we try again until the max number of retries is reached.
-        */
-       if (result != -EHOSTUNREACH &&
-           result != -ENETUNREACH &&
-           result != -ENETDOWN &&
-           result != -EINVAL &&
-           result != -EPROTONOSUPPORT) {
-               log_print("connect %d try %d error %d", con->nodeid,
-                         con->retries, result);
-               msleep(1000);
-               lowcomms_connect_sock(con);
+               break;
        }
+
+       return result;
 }
 
-/* Send workqueue function */
+/* Send worker function */
 static void process_send_sockets(struct work_struct *work)
 {
        struct connection *con = container_of(work, struct connection, swork);
+       int ret;
 
-       WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
-
-       clear_bit(CF_WRITE_PENDING, &con->flags);
+       WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
 
-       if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
-               close_connection(con, false, false, true);
-               dlm_midcomms_unack_msg_resend(con->nodeid);
+       down_read(&con->sock_lock);
+       if (!con->sock) {
+               up_read(&con->sock_lock);
+               down_write(&con->sock_lock);
+               if (!con->sock) {
+                       ret = dlm_connect(con);
+                       switch (ret) {
+                       case 0:
+                               break;
+                       case -EINPROGRESS:
+                               /* avoid spamming resched on connection
+                                * we might can switch to a state_change
+                                * event based mechanism if established
+                                */
+                               msleep(100);
+                               break;
+                       default:
+                               /* CF_SEND_PENDING not cleared */
+                               up_write(&con->sock_lock);
+                               log_print("connect to node %d try %d error %d",
+                                         con->nodeid, con->retries++, ret);
+                               msleep(1000);
+                               /* For now we try forever to reconnect. In
+                                * future we should send a event to cluster
+                                * manager to fence itself after certain amount
+                                * of retries.
+                                */
+                               queue_work(io_workqueue, &con->swork);
+                               return;
+                       }
+               }
+               downgrade_write(&con->sock_lock);
        }
 
-       if (con->sock == NULL) {
-               if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
-                       msleep(1000);
+       do {
+               ret = send_to_sock(con);
+       } while (ret == DLM_IO_SUCCESS);
+       up_read(&con->sock_lock);
 
-               mutex_lock(&con->sock_mutex);
-               dlm_connect(con);
-               mutex_unlock(&con->sock_mutex);
-       }
+       switch (ret) {
+       case DLM_IO_END:
+               /* CF_SEND_PENDING cleared */
+               break;
+       case DLM_IO_RESCHED:
+               /* CF_SEND_PENDING not cleared */
+               cond_resched();
+               queue_work(io_workqueue, &con->swork);
+               break;
+       default:
+               if (ret < 0) {
+                       close_connection(con, false);
+
+                       /* CF_SEND_PENDING cleared */
+                       spin_lock_bh(&con->writequeue_lock);
+                       lowcomms_queue_swork(con);
+                       spin_unlock_bh(&con->writequeue_lock);
+                       break;
+               }
 
-       if (!list_empty(&con->writequeue))
-               send_to_sock(con);
+               WARN_ON_ONCE(1);
+               break;
+       }
 }
 
 static void work_stop(void)
 {
-       if (recv_workqueue) {
-               destroy_workqueue(recv_workqueue);
-               recv_workqueue = NULL;
+       if (io_workqueue) {
+               destroy_workqueue(io_workqueue);
+               io_workqueue = NULL;
        }
 
-       if (send_workqueue) {
-               destroy_workqueue(send_workqueue);
-               send_workqueue = NULL;
+       if (process_workqueue) {
+               destroy_workqueue(process_workqueue);
+               process_workqueue = NULL;
        }
 }
 
 static int work_start(void)
 {
-       recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
-       if (!recv_workqueue) {
-               log_print("can't start dlm_recv");
+       io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM,
+                                      0);
+       if (!io_workqueue) {
+               log_print("can't start dlm_io");
                return -ENOMEM;
        }
 
-       send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
-       if (!send_workqueue) {
-               log_print("can't start dlm_send");
-               destroy_workqueue(recv_workqueue);
-               recv_workqueue = NULL;
+       /* ordered dlm message process queue,
+        * should be converted to a tasklet
+        */
+       process_workqueue = alloc_ordered_workqueue("dlm_process",
+                                                   WQ_HIGHPRI | WQ_MEM_RECLAIM);
+       if (!process_workqueue) {
+               log_print("can't start dlm_process");
+               destroy_workqueue(io_workqueue);
+               io_workqueue = NULL;
                return -ENOMEM;
        }
 
        return 0;
 }
 
-static void shutdown_conn(struct connection *con)
-{
-       if (dlm_proto_ops->shutdown_action)
-               dlm_proto_ops->shutdown_action(con);
-}
-
 void dlm_lowcomms_shutdown(void)
 {
-       int idx;
-
-       /* Set all the flags to prevent any
-        * socket activity.
-        */
-       dlm_allow_conn = 0;
-
-       if (recv_workqueue)
-               flush_workqueue(recv_workqueue);
-       if (send_workqueue)
-               flush_workqueue(send_workqueue);
+       /* stop lowcomms_listen_data_ready calls */
+       lock_sock(listen_con.sock->sk);
+       listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
+       release_sock(listen_con.sock->sk);
 
+       cancel_work_sync(&listen_con.rwork);
        dlm_close_sock(&listen_con.sock);
 
-       idx = srcu_read_lock(&connections_srcu);
-       foreach_conn(shutdown_conn);
-       srcu_read_unlock(&connections_srcu, idx);
-}
-
-static void _stop_conn(struct connection *con, bool and_other)
-{
-       mutex_lock(&con->sock_mutex);
-       set_bit(CF_CLOSE, &con->flags);
-       set_bit(CF_READ_PENDING, &con->flags);
-       set_bit(CF_WRITE_PENDING, &con->flags);
-       if (con->sock && con->sock->sk) {
-               lock_sock(con->sock->sk);
-               con->sock->sk->sk_user_data = NULL;
-               release_sock(con->sock->sk);
-       }
-       if (con->othercon && and_other)
-               _stop_conn(con->othercon, false);
-       mutex_unlock(&con->sock_mutex);
-}
-
-static void stop_conn(struct connection *con)
-{
-       _stop_conn(con, true);
+       flush_workqueue(process_workqueue);
 }
 
-static void connection_release(struct rcu_head *rcu)
+void dlm_lowcomms_shutdown_node(int nodeid, bool force)
 {
-       struct connection *con = container_of(rcu, struct connection, rcu);
-
-       kfree(con->rx_buf);
-       kfree(con);
-}
+       struct connection *con;
+       int idx;
 
-static void free_conn(struct connection *con)
-{
-       close_connection(con, true, true, true);
-       spin_lock(&connections_lock);
-       hlist_del_rcu(&con->list);
-       spin_unlock(&connections_lock);
-       if (con->othercon) {
-               clean_one_writequeue(con->othercon);
-               call_srcu(&connections_srcu, &con->othercon->rcu,
-                         connection_release);
+       idx = srcu_read_lock(&connections_srcu);
+       con = nodeid2con(nodeid, 0);
+       if (WARN_ON_ONCE(!con)) {
+               srcu_read_unlock(&connections_srcu, idx);
+               return;
        }
-       clean_one_writequeue(con);
-       call_srcu(&connections_srcu, &con->rcu, connection_release);
-}
 
-static void work_flush(void)
-{
-       int ok;
-       int i;
-       struct connection *con;
-
-       do {
-               ok = 1;
-               foreach_conn(stop_conn);
-               if (recv_workqueue)
-                       flush_workqueue(recv_workqueue);
-               if (send_workqueue)
-                       flush_workqueue(send_workqueue);
-               for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
-                       hlist_for_each_entry_rcu(con, &connection_hash[i],
-                                                list) {
-                               ok &= test_bit(CF_READ_PENDING, &con->flags);
-                               ok &= test_bit(CF_WRITE_PENDING, &con->flags);
-                               if (con->othercon) {
-                                       ok &= test_bit(CF_READ_PENDING,
-                                                      &con->othercon->flags);
-                                       ok &= test_bit(CF_WRITE_PENDING,
-                                                      &con->othercon->flags);
-                               }
-                       }
-               }
-       } while (!ok);
+       flush_work(&con->swork);
+       stop_connection_io(con);
+       WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
+       close_connection(con, true);
+       clean_one_writequeue(con);
+       if (con->othercon)
+               clean_one_writequeue(con->othercon);
+       allow_connection_io(con);
+       srcu_read_unlock(&connections_srcu, idx);
 }
 
 void dlm_lowcomms_stop(void)
 {
-       int idx;
-
-       idx = srcu_read_lock(&connections_srcu);
-       work_flush();
-       foreach_conn(free_conn);
-       srcu_read_unlock(&connections_srcu, idx);
        work_stop();
-       deinit_local();
-
        dlm_proto_ops = NULL;
 }
 
@@ -1799,7 +1746,7 @@ static int dlm_listen_for_all(void)
        if (result < 0)
                return result;
 
-       result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
+       result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
                                  SOCK_STREAM, dlm_proto_ops->proto, &sock);
        if (result < 0) {
                log_print("Can't create comms socket: %d", result);
@@ -1813,14 +1760,22 @@ static int dlm_listen_for_all(void)
        if (result < 0)
                goto out;
 
-       save_listen_callbacks(sock);
-       add_listen_sock(sock, &listen_con);
+       lock_sock(sock->sk);
+       listen_sock.sk_data_ready = sock->sk->sk_data_ready;
+       listen_sock.sk_write_space = sock->sk->sk_write_space;
+       listen_sock.sk_error_report = sock->sk->sk_error_report;
+       listen_sock.sk_state_change = sock->sk->sk_state_change;
+
+       listen_con.sock = sock;
+
+       sock->sk->sk_allocation = GFP_NOFS;
+       sock->sk->sk_data_ready = lowcomms_listen_data_ready;
+       release_sock(sock->sk);
 
-       INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
        result = sock->ops->listen(sock, 5);
        if (result < 0) {
                dlm_close_sock(&listen_con.sock);
-               goto out;
+               return result;
        }
 
        return 0;
@@ -1838,7 +1793,7 @@ static int dlm_tcp_bind(struct socket *sock)
        /* Bind to our cluster-known address connecting to avoid
         * routing problems.
         */
-       memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
+       memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
        make_sockaddr(&src_addr, 0, &addr_len);
 
        result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
@@ -1854,17 +1809,7 @@ static int dlm_tcp_bind(struct socket *sock)
 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
                           struct sockaddr *addr, int addr_len)
 {
-       int ret;
-
-       ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
-       switch (ret) {
-       case -EINPROGRESS:
-               fallthrough;
-       case 0:
-               return 0;
-       }
-
-       return ret;
+       return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
 }
 
 static int dlm_tcp_listen_validate(void)
@@ -1895,8 +1840,8 @@ static int dlm_tcp_listen_bind(struct socket *sock)
        int addr_len;
 
        /* Bind to our port */
-       make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
-       return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
+       make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
+       return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0],
                               addr_len);
 }
 
@@ -1909,8 +1854,6 @@ static const struct dlm_proto_ops dlm_tcp_ops = {
        .listen_validate = dlm_tcp_listen_validate,
        .listen_sockopts = dlm_tcp_listen_sockopts,
        .listen_bind = dlm_tcp_listen_bind,
-       .shutdown_action = dlm_tcp_shutdown,
-       .eof_condition = tcp_eof_condition,
 };
 
 static int dlm_sctp_bind(struct socket *sock)
@@ -1931,13 +1874,7 @@ static int dlm_sctp_connect(struct connection *con, struct socket *sock,
        sock_set_sndtimeo(sock->sk, 5);
        ret = sock->ops->connect(sock, addr, addr_len, 0);
        sock_set_sndtimeo(sock->sk, 0);
-       if (ret < 0)
-               return ret;
-
-       if (!test_and_set_bit(CF_CONNECTED, &con->flags))
-               log_print("connected to node %d", con->nodeid);
-
-       return 0;
+       return ret;
 }
 
 static int dlm_sctp_listen_validate(void)
@@ -1977,11 +1914,7 @@ static const struct dlm_proto_ops dlm_sctp_ops = {
 
 int dlm_lowcomms_start(void)
 {
-       int error = -EINVAL;
-       int i;
-
-       for (i = 0; i < CONN_HASH_SIZE; i++)
-               INIT_HLIST_HEAD(&connection_hash[i]);
+       int error;
 
        init_local();
        if (!dlm_local_count) {
@@ -1990,13 +1923,9 @@ int dlm_lowcomms_start(void)
                goto fail;
        }
 
-       INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
-
        error = work_start();
        if (error)
-               goto fail_local;
-
-       dlm_allow_conn = 1;
+               goto fail;
 
        /* Start listening */
        switch (dlm_config.ci_protocol) {
@@ -2022,25 +1951,38 @@ int dlm_lowcomms_start(void)
 fail_listen:
        dlm_proto_ops = NULL;
 fail_proto_ops:
-       dlm_allow_conn = 0;
-       dlm_close_sock(&listen_con.sock);
        work_stop();
-fail_local:
-       deinit_local();
 fail:
        return error;
 }
 
+void dlm_lowcomms_init(void)
+{
+       int i;
+
+       for (i = 0; i < CONN_HASH_SIZE; i++)
+               INIT_HLIST_HEAD(&connection_hash[i]);
+
+       INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
+}
+
 void dlm_lowcomms_exit(void)
 {
-       struct dlm_node_addr *na, *safe;
+       struct connection *con;
+       int i, idx;
 
-       spin_lock(&dlm_node_addrs_spin);
-       list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
-               list_del(&na->list);
-               while (na->addr_count--)
-                       kfree(na->addr[na->addr_count]);
-               kfree(na);
+       idx = srcu_read_lock(&connections_srcu);
+       for (i = 0; i < CONN_HASH_SIZE; i++) {
+               hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
+                       spin_lock(&connections_lock);
+                       hlist_del_rcu(&con->list);
+                       spin_unlock(&connections_lock);
+
+                       if (con->othercon)
+                               call_srcu(&connections_srcu, &con->othercon->rcu,
+                                         connection_release);
+                       call_srcu(&connections_srcu, &con->rcu, connection_release);
+               }
        }
-       spin_unlock(&dlm_node_addrs_spin);
+       srcu_read_unlock(&connections_srcu, idx);
 }
This page took 0.09521 seconds and 4 git commands to generate.