struct connection *con;
int idx;
- if (nodeid == dlm_our_nodeid())
- return 0;
-
idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, 0);
if (WARN_ON_ONCE(!con)) {
if (con->othercon)
stop_connection_io(con->othercon);
+ spin_lock_bh(&con->writequeue_lock);
+ set_bit(CF_IO_STOP, &con->flags);
+ spin_unlock_bh(&con->writequeue_lock);
+
down_write(&con->sock_lock);
if (con->sock) {
lock_sock(con->sock->sk);
restore_callbacks(con->sock->sk);
-
- spin_lock_bh(&con->writequeue_lock);
- set_bit(CF_IO_STOP, &con->flags);
- spin_unlock_bh(&con->writequeue_lock);
release_sock(con->sock->sk);
- } else {
- spin_lock_bh(&con->writequeue_lock);
- set_bit(CF_IO_STOP, &con->flags);
- spin_unlock_bh(&con->writequeue_lock);
}
up_write(&con->sock_lock);
struct list_head list;
};
- static void add_processed_node(int nodeid, struct list_head *processed_nodes)
- {
- struct dlm_processed_nodes *n;
-
- list_for_each_entry(n, processed_nodes, list) {
- /* we already remembered this node */
- if (n->nodeid == nodeid)
- return;
- }
-
- /* if it's fails in worst case we simple don't send an ack back.
- * We try it next time.
- */
- n = kmalloc(sizeof(*n), GFP_NOFS);
- if (!n)
- return;
-
- n->nodeid = nodeid;
- list_add(&n->list, processed_nodes);
- }
-
static void process_dlm_messages(struct work_struct *work)
{
- struct dlm_processed_nodes *n, *n_tmp;
struct processqueue_entry *pentry;
LIST_HEAD(processed_nodes);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
+ process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
return;
}
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
- add_processed_node(pentry->nodeid, &processed_nodes);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
list_del(&pentry->list);
spin_unlock(&processqueue_lock);
}
-
- /* send ack back after we processed couple of messages */
- list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
- list_del(&n->list);
- dlm_midcomms_receive_done(n->nodeid);
- kfree(n);
- }
}
/* Data received from remote end */
/* Send a message */
static int send_to_sock(struct connection *con)
{
- const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
+ struct bio_vec bvec;
+ struct msghdr msg = {
+ .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
+ };
int len, offset, ret;
spin_lock_bh(&con->writequeue_lock);
WARN_ON_ONCE(len == 0 && e->users == 0);
spin_unlock_bh(&con->writequeue_lock);
- ret = kernel_sendpage(con->sock, e->page, offset, len,
- msg_flags);
+ bvec_set_page(&bvec, e->page, len, offset);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
+ ret = sock_sendmsg(con->sock, &msg);
trace_dlm_send(con->nodeid, ret);
if (ret == -EAGAIN || ret == 0) {
lock_sock(con->sock->sk);
call_srcu(&connections_srcu, &con->rcu, connection_release);
if (con->othercon) {
clean_one_writequeue(con->othercon);
- if (con->othercon)
- call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
+ call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
}
srcu_read_unlock(&connections_srcu, idx);