]> Git Repo - linux.git/blob - net/tipc/socket.c
inet: get rid of central tcp/dccp listener timer
[linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/jhash.h>
39 #include "core.h"
40 #include "name_table.h"
41 #include "node.h"
42 #include "link.h"
43 #include "name_distr.h"
44 #include "socket.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @conn_timeout: the time we can wait for an unresponded setup request
73  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
74  * @link_cong: non-zero if owner must sleep because of link congestion
75  * @sent_unacked: # messages sent by socket, and not yet acked by peer
76  * @rcv_unacked: # messages read by user, but not yet acked back to peer
77  * @remote: 'connected' peer for dgram/rdm
78  * @node: hash table node
79  * @rcu: rcu struct for tipc_sock
80  */
81 struct tipc_sock {
82         struct sock sk;
83         int connected;
84         u32 conn_type;
85         u32 conn_instance;
86         int published;
87         u32 max_pkt;
88         u32 portid;
89         struct tipc_msg phdr;
90         struct list_head sock_list;
91         struct list_head publications;
92         u32 pub_count;
93         u32 probing_state;
94         unsigned long probing_intv;
95         uint conn_timeout;
96         atomic_t dupl_rcvcnt;
97         bool link_cong;
98         uint sent_unacked;
99         uint rcv_unacked;
100         struct sockaddr_tipc remote;
101         struct rhash_head node;
102         struct rcu_head rcu;
103 };
104
105 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
106 static void tipc_data_ready(struct sock *sk);
107 static void tipc_write_space(struct sock *sk);
108 static int tipc_release(struct socket *sock);
109 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
110 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
111 static void tipc_sk_timeout(unsigned long data);
112 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
113                            struct tipc_name_seq const *seq);
114 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
115                             struct tipc_name_seq const *seq);
116 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
117 static int tipc_sk_insert(struct tipc_sock *tsk);
118 static void tipc_sk_remove(struct tipc_sock *tsk);
119 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
120                               size_t dsz);
121 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
122
123 static const struct proto_ops packet_ops;
124 static const struct proto_ops stream_ops;
125 static const struct proto_ops msg_ops;
126 static struct proto tipc_proto;
127
128 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
129         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
130         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
131         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
132         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
133         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
134 };
135
136 /*
137  * Revised TIPC socket locking policy:
138  *
139  * Most socket operations take the standard socket lock when they start
140  * and hold it until they finish (or until they need to sleep).  Acquiring
141  * this lock grants the owner exclusive access to the fields of the socket
142  * data structures, with the exception of the backlog queue.  A few socket
143  * operations can be done without taking the socket lock because they only
144  * read socket information that never changes during the life of the socket.
145  *
146  * Socket operations may acquire the lock for the associated TIPC port if they
147  * need to perform an operation on the port.  If any routine needs to acquire
148  * both the socket lock and the port lock it must take the socket lock first
149  * to avoid the risk of deadlock.
150  *
151  * The dispatcher handling incoming messages cannot grab the socket lock in
152  * the standard fashion, since invoked it runs at the BH level and cannot block.
153  * Instead, it checks to see if the socket lock is currently owned by someone,
154  * and either handles the message itself or adds it to the socket's backlog
155  * queue; in the latter case the queued message is processed once the process
156  * owning the socket lock releases it.
157  *
158  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
159  * the problem of a blocked socket operation preventing any other operations
160  * from occurring.  However, applications must be careful if they have
161  * multiple threads trying to send (or receive) on the same socket, as these
162  * operations might interfere with each other.  For example, doing a connect
163  * and a receive at the same time might allow the receive to consume the
164  * ACK message meant for the connect.  While additional work could be done
165  * to try and overcome this, it doesn't seem to be worthwhile at the present.
166  *
167  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
168  * that another operation that must be performed in a non-blocking manner is
169  * not delayed for very long because the lock has already been taken.
170  *
171  * NOTE: This code assumes that certain fields of a port/socket pair are
172  * constant over its lifetime; such fields can be examined without taking
173  * the socket lock and/or port lock, and do not need to be re-read even
174  * after resuming processing after waiting.  These fields include:
175  *   - socket type
176  *   - pointer to socket sk structure (aka tipc_sock structure)
177  *   - pointer to port structure
178  *   - port reference
179  */
180
181 static u32 tsk_own_node(struct tipc_sock *tsk)
182 {
183         return msg_prevnode(&tsk->phdr);
184 }
185
186 static u32 tsk_peer_node(struct tipc_sock *tsk)
187 {
188         return msg_destnode(&tsk->phdr);
189 }
190
191 static u32 tsk_peer_port(struct tipc_sock *tsk)
192 {
193         return msg_destport(&tsk->phdr);
194 }
195
196 static  bool tsk_unreliable(struct tipc_sock *tsk)
197 {
198         return msg_src_droppable(&tsk->phdr) != 0;
199 }
200
201 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
202 {
203         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
204 }
205
206 static bool tsk_unreturnable(struct tipc_sock *tsk)
207 {
208         return msg_dest_droppable(&tsk->phdr) != 0;
209 }
210
211 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
212 {
213         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
214 }
215
216 static int tsk_importance(struct tipc_sock *tsk)
217 {
218         return msg_importance(&tsk->phdr);
219 }
220
221 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
222 {
223         if (imp > TIPC_CRITICAL_IMPORTANCE)
224                 return -EINVAL;
225         msg_set_importance(&tsk->phdr, (u32)imp);
226         return 0;
227 }
228
229 static struct tipc_sock *tipc_sk(const struct sock *sk)
230 {
231         return container_of(sk, struct tipc_sock, sk);
232 }
233
234 static int tsk_conn_cong(struct tipc_sock *tsk)
235 {
236         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
237 }
238
239 /**
240  * tsk_advance_rx_queue - discard first buffer in socket receive queue
241  *
242  * Caller must hold socket lock
243  */
244 static void tsk_advance_rx_queue(struct sock *sk)
245 {
246         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
247 }
248
249 /**
250  * tsk_rej_rx_queue - reject all buffers in socket receive queue
251  *
252  * Caller must hold socket lock
253  */
254 static void tsk_rej_rx_queue(struct sock *sk)
255 {
256         struct sk_buff *skb;
257         u32 dnode;
258         u32 own_node = tsk_own_node(tipc_sk(sk));
259
260         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
261                 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
262                         tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
263         }
264 }
265
266 /* tsk_peer_msg - verify if message was sent by connected port's peer
267  *
268  * Handles cases where the node's network address has changed from
269  * the default of <0.0.0> to its configured setting.
270  */
271 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
272 {
273         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
274         u32 peer_port = tsk_peer_port(tsk);
275         u32 orig_node;
276         u32 peer_node;
277
278         if (unlikely(!tsk->connected))
279                 return false;
280
281         if (unlikely(msg_origport(msg) != peer_port))
282                 return false;
283
284         orig_node = msg_orignode(msg);
285         peer_node = tsk_peer_node(tsk);
286
287         if (likely(orig_node == peer_node))
288                 return true;
289
290         if (!orig_node && (peer_node == tn->own_addr))
291                 return true;
292
293         if (!peer_node && (orig_node == tn->own_addr))
294                 return true;
295
296         return false;
297 }
298
299 /**
300  * tipc_sk_create - create a TIPC socket
301  * @net: network namespace (must be default network)
302  * @sock: pre-allocated socket structure
303  * @protocol: protocol indicator (must be 0)
304  * @kern: caused by kernel or by userspace?
305  *
306  * This routine creates additional data structures used by the TIPC socket,
307  * initializes them, and links them together.
308  *
309  * Returns 0 on success, errno otherwise
310  */
311 static int tipc_sk_create(struct net *net, struct socket *sock,
312                           int protocol, int kern)
313 {
314         struct tipc_net *tn;
315         const struct proto_ops *ops;
316         socket_state state;
317         struct sock *sk;
318         struct tipc_sock *tsk;
319         struct tipc_msg *msg;
320
321         /* Validate arguments */
322         if (unlikely(protocol != 0))
323                 return -EPROTONOSUPPORT;
324
325         switch (sock->type) {
326         case SOCK_STREAM:
327                 ops = &stream_ops;
328                 state = SS_UNCONNECTED;
329                 break;
330         case SOCK_SEQPACKET:
331                 ops = &packet_ops;
332                 state = SS_UNCONNECTED;
333                 break;
334         case SOCK_DGRAM:
335         case SOCK_RDM:
336                 ops = &msg_ops;
337                 state = SS_READY;
338                 break;
339         default:
340                 return -EPROTOTYPE;
341         }
342
343         /* Allocate socket's protocol area */
344         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
345         if (sk == NULL)
346                 return -ENOMEM;
347
348         tsk = tipc_sk(sk);
349         tsk->max_pkt = MAX_PKT_DEFAULT;
350         INIT_LIST_HEAD(&tsk->publications);
351         msg = &tsk->phdr;
352         tn = net_generic(sock_net(sk), tipc_net_id);
353         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
354                       NAMED_H_SIZE, 0);
355
356         /* Finish initializing socket data structures */
357         sock->ops = ops;
358         sock->state = state;
359         sock_init_data(sock, sk);
360         if (tipc_sk_insert(tsk)) {
361                 pr_warn("Socket create failed; port numbrer exhausted\n");
362                 return -EINVAL;
363         }
364         msg_set_origport(msg, tsk->portid);
365         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
366         sk->sk_backlog_rcv = tipc_backlog_rcv;
367         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
368         sk->sk_data_ready = tipc_data_ready;
369         sk->sk_write_space = tipc_write_space;
370         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
371         tsk->sent_unacked = 0;
372         atomic_set(&tsk->dupl_rcvcnt, 0);
373
374         if (sock->state == SS_READY) {
375                 tsk_set_unreturnable(tsk, true);
376                 if (sock->type == SOCK_DGRAM)
377                         tsk_set_unreliable(tsk, true);
378         }
379         return 0;
380 }
381
382 static void tipc_sk_callback(struct rcu_head *head)
383 {
384         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
385
386         sock_put(&tsk->sk);
387 }
388
389 /**
390  * tipc_release - destroy a TIPC socket
391  * @sock: socket to destroy
392  *
393  * This routine cleans up any messages that are still queued on the socket.
394  * For DGRAM and RDM socket types, all queued messages are rejected.
395  * For SEQPACKET and STREAM socket types, the first message is rejected
396  * and any others are discarded.  (If the first message on a STREAM socket
397  * is partially-read, it is discarded and the next one is rejected instead.)
398  *
399  * NOTE: Rejected messages are not necessarily returned to the sender!  They
400  * are returned or discarded according to the "destination droppable" setting
401  * specified for the message by the sender.
402  *
403  * Returns 0 on success, errno otherwise
404  */
405 static int tipc_release(struct socket *sock)
406 {
407         struct sock *sk = sock->sk;
408         struct net *net;
409         struct tipc_sock *tsk;
410         struct sk_buff *skb;
411         u32 dnode, probing_state;
412
413         /*
414          * Exit if socket isn't fully initialized (occurs when a failed accept()
415          * releases a pre-allocated child socket that was never used)
416          */
417         if (sk == NULL)
418                 return 0;
419
420         net = sock_net(sk);
421         tsk = tipc_sk(sk);
422         lock_sock(sk);
423
424         /*
425          * Reject all unreceived messages, except on an active connection
426          * (which disconnects locally & sends a 'FIN+' to peer)
427          */
428         dnode = tsk_peer_node(tsk);
429         while (sock->state != SS_DISCONNECTING) {
430                 skb = __skb_dequeue(&sk->sk_receive_queue);
431                 if (skb == NULL)
432                         break;
433                 if (TIPC_SKB_CB(skb)->handle != NULL)
434                         kfree_skb(skb);
435                 else {
436                         if ((sock->state == SS_CONNECTING) ||
437                             (sock->state == SS_CONNECTED)) {
438                                 sock->state = SS_DISCONNECTING;
439                                 tsk->connected = 0;
440                                 tipc_node_remove_conn(net, dnode, tsk->portid);
441                         }
442                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
443                                              TIPC_ERR_NO_PORT))
444                                 tipc_link_xmit_skb(net, skb, dnode, 0);
445                 }
446         }
447
448         tipc_sk_withdraw(tsk, 0, NULL);
449         probing_state = tsk->probing_state;
450         if (del_timer_sync(&sk->sk_timer) &&
451             probing_state != TIPC_CONN_PROBING)
452                 sock_put(sk);
453         tipc_sk_remove(tsk);
454         if (tsk->connected) {
455                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
456                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
457                                       tsk_own_node(tsk), tsk_peer_port(tsk),
458                                       tsk->portid, TIPC_ERR_NO_PORT);
459                 if (skb)
460                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
461                 tipc_node_remove_conn(net, dnode, tsk->portid);
462         }
463
464         /* Discard any remaining (connection-based) messages in receive queue */
465         __skb_queue_purge(&sk->sk_receive_queue);
466
467         /* Reject any messages that accumulated in backlog queue */
468         sock->state = SS_DISCONNECTING;
469         release_sock(sk);
470
471         call_rcu(&tsk->rcu, tipc_sk_callback);
472         sock->sk = NULL;
473
474         return 0;
475 }
476
477 /**
478  * tipc_bind - associate or disassocate TIPC name(s) with a socket
479  * @sock: socket structure
480  * @uaddr: socket address describing name(s) and desired operation
481  * @uaddr_len: size of socket address data structure
482  *
483  * Name and name sequence binding is indicated using a positive scope value;
484  * a negative scope value unbinds the specified name.  Specifying no name
485  * (i.e. a socket address length of 0) unbinds all names from the socket.
486  *
487  * Returns 0 on success, errno otherwise
488  *
489  * NOTE: This routine doesn't need to take the socket lock since it doesn't
490  *       access any non-constant socket information.
491  */
492 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
493                      int uaddr_len)
494 {
495         struct sock *sk = sock->sk;
496         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
497         struct tipc_sock *tsk = tipc_sk(sk);
498         int res = -EINVAL;
499
500         lock_sock(sk);
501         if (unlikely(!uaddr_len)) {
502                 res = tipc_sk_withdraw(tsk, 0, NULL);
503                 goto exit;
504         }
505
506         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
507                 res = -EINVAL;
508                 goto exit;
509         }
510         if (addr->family != AF_TIPC) {
511                 res = -EAFNOSUPPORT;
512                 goto exit;
513         }
514
515         if (addr->addrtype == TIPC_ADDR_NAME)
516                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
517         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
518                 res = -EAFNOSUPPORT;
519                 goto exit;
520         }
521
522         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
523             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
524             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
525                 res = -EACCES;
526                 goto exit;
527         }
528
529         res = (addr->scope > 0) ?
530                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
531                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
532 exit:
533         release_sock(sk);
534         return res;
535 }
536
537 /**
538  * tipc_getname - get port ID of socket or peer socket
539  * @sock: socket structure
540  * @uaddr: area for returned socket address
541  * @uaddr_len: area for returned length of socket address
542  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
543  *
544  * Returns 0 on success, errno otherwise
545  *
546  * NOTE: This routine doesn't need to take the socket lock since it only
547  *       accesses socket information that is unchanging (or which changes in
548  *       a completely predictable manner).
549  */
550 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
551                         int *uaddr_len, int peer)
552 {
553         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
554         struct tipc_sock *tsk = tipc_sk(sock->sk);
555         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
556
557         memset(addr, 0, sizeof(*addr));
558         if (peer) {
559                 if ((sock->state != SS_CONNECTED) &&
560                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
561                         return -ENOTCONN;
562                 addr->addr.id.ref = tsk_peer_port(tsk);
563                 addr->addr.id.node = tsk_peer_node(tsk);
564         } else {
565                 addr->addr.id.ref = tsk->portid;
566                 addr->addr.id.node = tn->own_addr;
567         }
568
569         *uaddr_len = sizeof(*addr);
570         addr->addrtype = TIPC_ADDR_ID;
571         addr->family = AF_TIPC;
572         addr->scope = 0;
573         addr->addr.name.domain = 0;
574
575         return 0;
576 }
577
578 /**
579  * tipc_poll - read and possibly block on pollmask
580  * @file: file structure associated with the socket
581  * @sock: socket for which to calculate the poll bits
582  * @wait: ???
583  *
584  * Returns pollmask value
585  *
586  * COMMENTARY:
587  * It appears that the usual socket locking mechanisms are not useful here
588  * since the pollmask info is potentially out-of-date the moment this routine
589  * exits.  TCP and other protocols seem to rely on higher level poll routines
590  * to handle any preventable race conditions, so TIPC will do the same ...
591  *
592  * TIPC sets the returned events as follows:
593  *
594  * socket state         flags set
595  * ------------         ---------
596  * unconnected          no read flags
597  *                      POLLOUT if port is not congested
598  *
599  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
600  *                      no write flags
601  *
602  * connected            POLLIN/POLLRDNORM if data in rx queue
603  *                      POLLOUT if port is not congested
604  *
605  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
606  *                      no write flags
607  *
608  * listening            POLLIN if SYN in rx queue
609  *                      no write flags
610  *
611  * ready                POLLIN/POLLRDNORM if data in rx queue
612  * [connectionless]     POLLOUT (since port cannot be congested)
613  *
614  * IMPORTANT: The fact that a read or write operation is indicated does NOT
615  * imply that the operation will succeed, merely that it should be performed
616  * and will not block.
617  */
618 static unsigned int tipc_poll(struct file *file, struct socket *sock,
619                               poll_table *wait)
620 {
621         struct sock *sk = sock->sk;
622         struct tipc_sock *tsk = tipc_sk(sk);
623         u32 mask = 0;
624
625         sock_poll_wait(file, sk_sleep(sk), wait);
626
627         switch ((int)sock->state) {
628         case SS_UNCONNECTED:
629                 if (!tsk->link_cong)
630                         mask |= POLLOUT;
631                 break;
632         case SS_READY:
633         case SS_CONNECTED:
634                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
635                         mask |= POLLOUT;
636                 /* fall thru' */
637         case SS_CONNECTING:
638         case SS_LISTENING:
639                 if (!skb_queue_empty(&sk->sk_receive_queue))
640                         mask |= (POLLIN | POLLRDNORM);
641                 break;
642         case SS_DISCONNECTING:
643                 mask = (POLLIN | POLLRDNORM | POLLHUP);
644                 break;
645         }
646
647         return mask;
648 }
649
650 /**
651  * tipc_sendmcast - send multicast message
652  * @sock: socket structure
653  * @seq: destination address
654  * @msg: message to send
655  * @dsz: total length of message data
656  * @timeo: timeout to wait for wakeup
657  *
658  * Called from function tipc_sendmsg(), which has done all sanity checks
659  * Returns the number of bytes sent on success, or errno
660  */
661 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
662                           struct msghdr *msg, size_t dsz, long timeo)
663 {
664         struct sock *sk = sock->sk;
665         struct tipc_sock *tsk = tipc_sk(sk);
666         struct net *net = sock_net(sk);
667         struct tipc_msg *mhdr = &tsk->phdr;
668         struct sk_buff_head *pktchain = &sk->sk_write_queue;
669         struct iov_iter save = msg->msg_iter;
670         uint mtu;
671         int rc;
672
673         msg_set_type(mhdr, TIPC_MCAST_MSG);
674         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
675         msg_set_destport(mhdr, 0);
676         msg_set_destnode(mhdr, 0);
677         msg_set_nametype(mhdr, seq->type);
678         msg_set_namelower(mhdr, seq->lower);
679         msg_set_nameupper(mhdr, seq->upper);
680         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
681
682 new_mtu:
683         mtu = tipc_bclink_get_mtu();
684         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
685         if (unlikely(rc < 0))
686                 return rc;
687
688         do {
689                 rc = tipc_bclink_xmit(net, pktchain);
690                 if (likely(rc >= 0)) {
691                         rc = dsz;
692                         break;
693                 }
694                 if (rc == -EMSGSIZE) {
695                         msg->msg_iter = save;
696                         goto new_mtu;
697                 }
698                 if (rc != -ELINKCONG)
699                         break;
700                 tipc_sk(sk)->link_cong = 1;
701                 rc = tipc_wait_for_sndmsg(sock, &timeo);
702                 if (rc)
703                         __skb_queue_purge(pktchain);
704         } while (!rc);
705         return rc;
706 }
707
708 /**
709  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
710  * @arrvq: queue with arriving messages, to be cloned after destination lookup
711  * @inputq: queue with cloned messages, delivered to socket after dest lookup
712  *
713  * Multi-threaded: parallel calls with reference to same queues may occur
714  */
715 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
716                        struct sk_buff_head *inputq)
717 {
718         struct tipc_msg *msg;
719         struct tipc_plist dports;
720         u32 portid;
721         u32 scope = TIPC_CLUSTER_SCOPE;
722         struct sk_buff_head tmpq;
723         uint hsz;
724         struct sk_buff *skb, *_skb;
725
726         __skb_queue_head_init(&tmpq);
727         tipc_plist_init(&dports);
728
729         skb = tipc_skb_peek(arrvq, &inputq->lock);
730         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
731                 msg = buf_msg(skb);
732                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
733
734                 if (in_own_node(net, msg_orignode(msg)))
735                         scope = TIPC_NODE_SCOPE;
736
737                 /* Create destination port list and message clones: */
738                 tipc_nametbl_mc_translate(net,
739                                           msg_nametype(msg), msg_namelower(msg),
740                                           msg_nameupper(msg), scope, &dports);
741                 portid = tipc_plist_pop(&dports);
742                 for (; portid; portid = tipc_plist_pop(&dports)) {
743                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
744                         if (_skb) {
745                                 msg_set_destport(buf_msg(_skb), portid);
746                                 __skb_queue_tail(&tmpq, _skb);
747                                 continue;
748                         }
749                         pr_warn("Failed to clone mcast rcv buffer\n");
750                 }
751                 /* Append to inputq if not already done by other thread */
752                 spin_lock_bh(&inputq->lock);
753                 if (skb_peek(arrvq) == skb) {
754                         skb_queue_splice_tail_init(&tmpq, inputq);
755                         kfree_skb(__skb_dequeue(arrvq));
756                 }
757                 spin_unlock_bh(&inputq->lock);
758                 __skb_queue_purge(&tmpq);
759                 kfree_skb(skb);
760         }
761         tipc_sk_rcv(net, inputq);
762 }
763
764 /**
765  * tipc_sk_proto_rcv - receive a connection mng protocol message
766  * @tsk: receiving socket
767  * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
768  */
769 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
770 {
771         struct tipc_msg *msg = buf_msg(*skb);
772         int conn_cong;
773         u32 dnode;
774         u32 own_node = tsk_own_node(tsk);
775         /* Ignore if connection cannot be validated: */
776         if (!tsk_peer_msg(tsk, msg))
777                 goto exit;
778
779         tsk->probing_state = TIPC_CONN_OK;
780
781         if (msg_type(msg) == CONN_ACK) {
782                 conn_cong = tsk_conn_cong(tsk);
783                 tsk->sent_unacked -= msg_msgcnt(msg);
784                 if (conn_cong)
785                         tsk->sk.sk_write_space(&tsk->sk);
786         } else if (msg_type(msg) == CONN_PROBE) {
787                 if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
788                         msg_set_type(msg, CONN_PROBE_REPLY);
789                         return;
790                 }
791         }
792         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
793 exit:
794         kfree_skb(*skb);
795         *skb = NULL;
796 }
797
798 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
799 {
800         struct sock *sk = sock->sk;
801         struct tipc_sock *tsk = tipc_sk(sk);
802         DEFINE_WAIT(wait);
803         int done;
804
805         do {
806                 int err = sock_error(sk);
807                 if (err)
808                         return err;
809                 if (sock->state == SS_DISCONNECTING)
810                         return -EPIPE;
811                 if (!*timeo_p)
812                         return -EAGAIN;
813                 if (signal_pending(current))
814                         return sock_intr_errno(*timeo_p);
815
816                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
817                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
818                 finish_wait(sk_sleep(sk), &wait);
819         } while (!done);
820         return 0;
821 }
822
823 /**
824  * tipc_sendmsg - send message in connectionless manner
825  * @sock: socket structure
826  * @m: message to send
827  * @dsz: amount of user data to be sent
828  *
829  * Message must have an destination specified explicitly.
830  * Used for SOCK_RDM and SOCK_DGRAM messages,
831  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
832  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
833  *
834  * Returns the number of bytes sent on success, or errno otherwise
835  */
836 static int tipc_sendmsg(struct socket *sock,
837                         struct msghdr *m, size_t dsz)
838 {
839         struct sock *sk = sock->sk;
840         int ret;
841
842         lock_sock(sk);
843         ret = __tipc_sendmsg(sock, m, dsz);
844         release_sock(sk);
845
846         return ret;
847 }
848
849 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
850 {
851         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
852         struct sock *sk = sock->sk;
853         struct tipc_sock *tsk = tipc_sk(sk);
854         struct net *net = sock_net(sk);
855         struct tipc_msg *mhdr = &tsk->phdr;
856         u32 dnode, dport;
857         struct sk_buff_head *pktchain = &sk->sk_write_queue;
858         struct sk_buff *skb;
859         struct tipc_name_seq *seq;
860         struct iov_iter save;
861         u32 mtu;
862         long timeo;
863         int rc;
864
865         if (dsz > TIPC_MAX_USER_MSG_SIZE)
866                 return -EMSGSIZE;
867         if (unlikely(!dest)) {
868                 if (tsk->connected && sock->state == SS_READY)
869                         dest = &tsk->remote;
870                 else
871                         return -EDESTADDRREQ;
872         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
873                    dest->family != AF_TIPC) {
874                 return -EINVAL;
875         }
876         if (unlikely(sock->state != SS_READY)) {
877                 if (sock->state == SS_LISTENING)
878                         return -EPIPE;
879                 if (sock->state != SS_UNCONNECTED)
880                         return -EISCONN;
881                 if (tsk->published)
882                         return -EOPNOTSUPP;
883                 if (dest->addrtype == TIPC_ADDR_NAME) {
884                         tsk->conn_type = dest->addr.name.name.type;
885                         tsk->conn_instance = dest->addr.name.name.instance;
886                 }
887         }
888         seq = &dest->addr.nameseq;
889         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
890
891         if (dest->addrtype == TIPC_ADDR_MCAST) {
892                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
893         } else if (dest->addrtype == TIPC_ADDR_NAME) {
894                 u32 type = dest->addr.name.name.type;
895                 u32 inst = dest->addr.name.name.instance;
896                 u32 domain = dest->addr.name.domain;
897
898                 dnode = domain;
899                 msg_set_type(mhdr, TIPC_NAMED_MSG);
900                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
901                 msg_set_nametype(mhdr, type);
902                 msg_set_nameinst(mhdr, inst);
903                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
904                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
905                 msg_set_destnode(mhdr, dnode);
906                 msg_set_destport(mhdr, dport);
907                 if (unlikely(!dport && !dnode))
908                         return -EHOSTUNREACH;
909         } else if (dest->addrtype == TIPC_ADDR_ID) {
910                 dnode = dest->addr.id.node;
911                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
912                 msg_set_lookup_scope(mhdr, 0);
913                 msg_set_destnode(mhdr, dnode);
914                 msg_set_destport(mhdr, dest->addr.id.ref);
915                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
916         }
917
918         save = m->msg_iter;
919 new_mtu:
920         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
921         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
922         if (rc < 0)
923                 return rc;
924
925         do {
926                 skb = skb_peek(pktchain);
927                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
928                 rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
929                 if (likely(rc >= 0)) {
930                         if (sock->state != SS_READY)
931                                 sock->state = SS_CONNECTING;
932                         rc = dsz;
933                         break;
934                 }
935                 if (rc == -EMSGSIZE) {
936                         m->msg_iter = save;
937                         goto new_mtu;
938                 }
939                 if (rc != -ELINKCONG)
940                         break;
941                 tsk->link_cong = 1;
942                 rc = tipc_wait_for_sndmsg(sock, &timeo);
943                 if (rc)
944                         __skb_queue_purge(pktchain);
945         } while (!rc);
946
947         return rc;
948 }
949
950 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
951 {
952         struct sock *sk = sock->sk;
953         struct tipc_sock *tsk = tipc_sk(sk);
954         DEFINE_WAIT(wait);
955         int done;
956
957         do {
958                 int err = sock_error(sk);
959                 if (err)
960                         return err;
961                 if (sock->state == SS_DISCONNECTING)
962                         return -EPIPE;
963                 else if (sock->state != SS_CONNECTED)
964                         return -ENOTCONN;
965                 if (!*timeo_p)
966                         return -EAGAIN;
967                 if (signal_pending(current))
968                         return sock_intr_errno(*timeo_p);
969
970                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
971                 done = sk_wait_event(sk, timeo_p,
972                                      (!tsk->link_cong &&
973                                       !tsk_conn_cong(tsk)) ||
974                                      !tsk->connected);
975                 finish_wait(sk_sleep(sk), &wait);
976         } while (!done);
977         return 0;
978 }
979
980 /**
981  * tipc_send_stream - send stream-oriented data
982  * @sock: socket structure
983  * @m: data to send
984  * @dsz: total length of data to be transmitted
985  *
986  * Used for SOCK_STREAM data.
987  *
988  * Returns the number of bytes sent on success (or partial success),
989  * or errno if no data sent
990  */
991 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
992 {
993         struct sock *sk = sock->sk;
994         int ret;
995
996         lock_sock(sk);
997         ret = __tipc_send_stream(sock, m, dsz);
998         release_sock(sk);
999
1000         return ret;
1001 }
1002
1003 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1004 {
1005         struct sock *sk = sock->sk;
1006         struct net *net = sock_net(sk);
1007         struct tipc_sock *tsk = tipc_sk(sk);
1008         struct tipc_msg *mhdr = &tsk->phdr;
1009         struct sk_buff_head *pktchain = &sk->sk_write_queue;
1010         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1011         u32 portid = tsk->portid;
1012         int rc = -EINVAL;
1013         long timeo;
1014         u32 dnode;
1015         uint mtu, send, sent = 0;
1016         struct iov_iter save;
1017
1018         /* Handle implied connection establishment */
1019         if (unlikely(dest)) {
1020                 rc = __tipc_sendmsg(sock, m, dsz);
1021                 if (dsz && (dsz == rc))
1022                         tsk->sent_unacked = 1;
1023                 return rc;
1024         }
1025         if (dsz > (uint)INT_MAX)
1026                 return -EMSGSIZE;
1027
1028         if (unlikely(sock->state != SS_CONNECTED)) {
1029                 if (sock->state == SS_DISCONNECTING)
1030                         return -EPIPE;
1031                 else
1032                         return -ENOTCONN;
1033         }
1034
1035         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1036         dnode = tsk_peer_node(tsk);
1037
1038 next:
1039         save = m->msg_iter;
1040         mtu = tsk->max_pkt;
1041         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1042         rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
1043         if (unlikely(rc < 0))
1044                 return rc;
1045         do {
1046                 if (likely(!tsk_conn_cong(tsk))) {
1047                         rc = tipc_link_xmit(net, pktchain, dnode, portid);
1048                         if (likely(!rc)) {
1049                                 tsk->sent_unacked++;
1050                                 sent += send;
1051                                 if (sent == dsz)
1052                                         break;
1053                                 goto next;
1054                         }
1055                         if (rc == -EMSGSIZE) {
1056                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1057                                                                  portid);
1058                                 m->msg_iter = save;
1059                                 goto next;
1060                         }
1061                         if (rc != -ELINKCONG)
1062                                 break;
1063                         tsk->link_cong = 1;
1064                 }
1065                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1066                 if (rc)
1067                         __skb_queue_purge(pktchain);
1068         } while (!rc);
1069
1070         return sent ? sent : rc;
1071 }
1072
1073 /**
1074  * tipc_send_packet - send a connection-oriented message
1075  * @sock: socket structure
1076  * @m: message to send
1077  * @dsz: length of data to be transmitted
1078  *
1079  * Used for SOCK_SEQPACKET messages.
1080  *
1081  * Returns the number of bytes sent on success, or errno otherwise
1082  */
1083 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1084 {
1085         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1086                 return -EMSGSIZE;
1087
1088         return tipc_send_stream(sock, m, dsz);
1089 }
1090
1091 /* tipc_sk_finish_conn - complete the setup of a connection
1092  */
1093 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1094                                 u32 peer_node)
1095 {
1096         struct sock *sk = &tsk->sk;
1097         struct net *net = sock_net(sk);
1098         struct tipc_msg *msg = &tsk->phdr;
1099
1100         msg_set_destnode(msg, peer_node);
1101         msg_set_destport(msg, peer_port);
1102         msg_set_type(msg, TIPC_CONN_MSG);
1103         msg_set_lookup_scope(msg, 0);
1104         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1105
1106         tsk->probing_intv = CONN_PROBING_INTERVAL;
1107         tsk->probing_state = TIPC_CONN_OK;
1108         tsk->connected = 1;
1109         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1110         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1111         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1112 }
1113
1114 /**
1115  * set_orig_addr - capture sender's address for received message
1116  * @m: descriptor for message info
1117  * @msg: received message header
1118  *
1119  * Note: Address is not captured if not requested by receiver.
1120  */
1121 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1122 {
1123         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1124
1125         if (addr) {
1126                 addr->family = AF_TIPC;
1127                 addr->addrtype = TIPC_ADDR_ID;
1128                 memset(&addr->addr, 0, sizeof(addr->addr));
1129                 addr->addr.id.ref = msg_origport(msg);
1130                 addr->addr.id.node = msg_orignode(msg);
1131                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1132                 addr->scope = 0;                /* could leave uninitialized */
1133                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1134         }
1135 }
1136
1137 /**
1138  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1139  * @m: descriptor for message info
1140  * @msg: received message header
1141  * @tsk: TIPC port associated with message
1142  *
1143  * Note: Ancillary data is not captured if not requested by receiver.
1144  *
1145  * Returns 0 if successful, otherwise errno
1146  */
1147 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1148                                  struct tipc_sock *tsk)
1149 {
1150         u32 anc_data[3];
1151         u32 err;
1152         u32 dest_type;
1153         int has_name;
1154         int res;
1155
1156         if (likely(m->msg_controllen == 0))
1157                 return 0;
1158
1159         /* Optionally capture errored message object(s) */
1160         err = msg ? msg_errcode(msg) : 0;
1161         if (unlikely(err)) {
1162                 anc_data[0] = err;
1163                 anc_data[1] = msg_data_sz(msg);
1164                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1165                 if (res)
1166                         return res;
1167                 if (anc_data[1]) {
1168                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1169                                        msg_data(msg));
1170                         if (res)
1171                                 return res;
1172                 }
1173         }
1174
1175         /* Optionally capture message destination object */
1176         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1177         switch (dest_type) {
1178         case TIPC_NAMED_MSG:
1179                 has_name = 1;
1180                 anc_data[0] = msg_nametype(msg);
1181                 anc_data[1] = msg_namelower(msg);
1182                 anc_data[2] = msg_namelower(msg);
1183                 break;
1184         case TIPC_MCAST_MSG:
1185                 has_name = 1;
1186                 anc_data[0] = msg_nametype(msg);
1187                 anc_data[1] = msg_namelower(msg);
1188                 anc_data[2] = msg_nameupper(msg);
1189                 break;
1190         case TIPC_CONN_MSG:
1191                 has_name = (tsk->conn_type != 0);
1192                 anc_data[0] = tsk->conn_type;
1193                 anc_data[1] = tsk->conn_instance;
1194                 anc_data[2] = tsk->conn_instance;
1195                 break;
1196         default:
1197                 has_name = 0;
1198         }
1199         if (has_name) {
1200                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1201                 if (res)
1202                         return res;
1203         }
1204
1205         return 0;
1206 }
1207
1208 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1209 {
1210         struct net *net = sock_net(&tsk->sk);
1211         struct sk_buff *skb = NULL;
1212         struct tipc_msg *msg;
1213         u32 peer_port = tsk_peer_port(tsk);
1214         u32 dnode = tsk_peer_node(tsk);
1215
1216         if (!tsk->connected)
1217                 return;
1218         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1219                               dnode, tsk_own_node(tsk), peer_port,
1220                               tsk->portid, TIPC_OK);
1221         if (!skb)
1222                 return;
1223         msg = buf_msg(skb);
1224         msg_set_msgcnt(msg, ack);
1225         tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1226 }
1227
1228 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1229 {
1230         struct sock *sk = sock->sk;
1231         DEFINE_WAIT(wait);
1232         long timeo = *timeop;
1233         int err;
1234
1235         for (;;) {
1236                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1237                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1238                         if (sock->state == SS_DISCONNECTING) {
1239                                 err = -ENOTCONN;
1240                                 break;
1241                         }
1242                         release_sock(sk);
1243                         timeo = schedule_timeout(timeo);
1244                         lock_sock(sk);
1245                 }
1246                 err = 0;
1247                 if (!skb_queue_empty(&sk->sk_receive_queue))
1248                         break;
1249                 err = -EAGAIN;
1250                 if (!timeo)
1251                         break;
1252                 err = sock_intr_errno(timeo);
1253                 if (signal_pending(current))
1254                         break;
1255         }
1256         finish_wait(sk_sleep(sk), &wait);
1257         *timeop = timeo;
1258         return err;
1259 }
1260
1261 /**
1262  * tipc_recvmsg - receive packet-oriented message
1263  * @m: descriptor for message info
1264  * @buf_len: total size of user buffer area
1265  * @flags: receive flags
1266  *
1267  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1268  * If the complete message doesn't fit in user area, truncate it.
1269  *
1270  * Returns size of returned message data, errno otherwise
1271  */
1272 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1273                         int flags)
1274 {
1275         struct sock *sk = sock->sk;
1276         struct tipc_sock *tsk = tipc_sk(sk);
1277         struct sk_buff *buf;
1278         struct tipc_msg *msg;
1279         long timeo;
1280         unsigned int sz;
1281         u32 err;
1282         int res;
1283
1284         /* Catch invalid receive requests */
1285         if (unlikely(!buf_len))
1286                 return -EINVAL;
1287
1288         lock_sock(sk);
1289
1290         if (unlikely(sock->state == SS_UNCONNECTED)) {
1291                 res = -ENOTCONN;
1292                 goto exit;
1293         }
1294
1295         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1296 restart:
1297
1298         /* Look for a message in receive queue; wait if necessary */
1299         res = tipc_wait_for_rcvmsg(sock, &timeo);
1300         if (res)
1301                 goto exit;
1302
1303         /* Look at first message in receive queue */
1304         buf = skb_peek(&sk->sk_receive_queue);
1305         msg = buf_msg(buf);
1306         sz = msg_data_sz(msg);
1307         err = msg_errcode(msg);
1308
1309         /* Discard an empty non-errored message & try again */
1310         if ((!sz) && (!err)) {
1311                 tsk_advance_rx_queue(sk);
1312                 goto restart;
1313         }
1314
1315         /* Capture sender's address (optional) */
1316         set_orig_addr(m, msg);
1317
1318         /* Capture ancillary data (optional) */
1319         res = tipc_sk_anc_data_recv(m, msg, tsk);
1320         if (res)
1321                 goto exit;
1322
1323         /* Capture message data (if valid) & compute return value (always) */
1324         if (!err) {
1325                 if (unlikely(buf_len < sz)) {
1326                         sz = buf_len;
1327                         m->msg_flags |= MSG_TRUNC;
1328                 }
1329                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1330                 if (res)
1331                         goto exit;
1332                 res = sz;
1333         } else {
1334                 if ((sock->state == SS_READY) ||
1335                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1336                         res = 0;
1337                 else
1338                         res = -ECONNRESET;
1339         }
1340
1341         /* Consume received message (optional) */
1342         if (likely(!(flags & MSG_PEEK))) {
1343                 if ((sock->state != SS_READY) &&
1344                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1345                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1346                         tsk->rcv_unacked = 0;
1347                 }
1348                 tsk_advance_rx_queue(sk);
1349         }
1350 exit:
1351         release_sock(sk);
1352         return res;
1353 }
1354
1355 /**
1356  * tipc_recv_stream - receive stream-oriented data
1357  * @m: descriptor for message info
1358  * @buf_len: total size of user buffer area
1359  * @flags: receive flags
1360  *
1361  * Used for SOCK_STREAM messages only.  If not enough data is available
1362  * will optionally wait for more; never truncates data.
1363  *
1364  * Returns size of returned message data, errno otherwise
1365  */
1366 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1367                             size_t buf_len, int flags)
1368 {
1369         struct sock *sk = sock->sk;
1370         struct tipc_sock *tsk = tipc_sk(sk);
1371         struct sk_buff *buf;
1372         struct tipc_msg *msg;
1373         long timeo;
1374         unsigned int sz;
1375         int sz_to_copy, target, needed;
1376         int sz_copied = 0;
1377         u32 err;
1378         int res = 0;
1379
1380         /* Catch invalid receive attempts */
1381         if (unlikely(!buf_len))
1382                 return -EINVAL;
1383
1384         lock_sock(sk);
1385
1386         if (unlikely(sock->state == SS_UNCONNECTED)) {
1387                 res = -ENOTCONN;
1388                 goto exit;
1389         }
1390
1391         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1392         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1393
1394 restart:
1395         /* Look for a message in receive queue; wait if necessary */
1396         res = tipc_wait_for_rcvmsg(sock, &timeo);
1397         if (res)
1398                 goto exit;
1399
1400         /* Look at first message in receive queue */
1401         buf = skb_peek(&sk->sk_receive_queue);
1402         msg = buf_msg(buf);
1403         sz = msg_data_sz(msg);
1404         err = msg_errcode(msg);
1405
1406         /* Discard an empty non-errored message & try again */
1407         if ((!sz) && (!err)) {
1408                 tsk_advance_rx_queue(sk);
1409                 goto restart;
1410         }
1411
1412         /* Optionally capture sender's address & ancillary data of first msg */
1413         if (sz_copied == 0) {
1414                 set_orig_addr(m, msg);
1415                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1416                 if (res)
1417                         goto exit;
1418         }
1419
1420         /* Capture message data (if valid) & compute return value (always) */
1421         if (!err) {
1422                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1423
1424                 sz -= offset;
1425                 needed = (buf_len - sz_copied);
1426                 sz_to_copy = (sz <= needed) ? sz : needed;
1427
1428                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1429                                             m, sz_to_copy);
1430                 if (res)
1431                         goto exit;
1432
1433                 sz_copied += sz_to_copy;
1434
1435                 if (sz_to_copy < sz) {
1436                         if (!(flags & MSG_PEEK))
1437                                 TIPC_SKB_CB(buf)->handle =
1438                                 (void *)(unsigned long)(offset + sz_to_copy);
1439                         goto exit;
1440                 }
1441         } else {
1442                 if (sz_copied != 0)
1443                         goto exit; /* can't add error msg to valid data */
1444
1445                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1446                         res = 0;
1447                 else
1448                         res = -ECONNRESET;
1449         }
1450
1451         /* Consume received message (optional) */
1452         if (likely(!(flags & MSG_PEEK))) {
1453                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1454                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1455                         tsk->rcv_unacked = 0;
1456                 }
1457                 tsk_advance_rx_queue(sk);
1458         }
1459
1460         /* Loop around if more data is required */
1461         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1462             (!skb_queue_empty(&sk->sk_receive_queue) ||
1463             (sz_copied < target)) &&    /* and more is ready or required */
1464             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1465             (!err))                     /* and haven't reached a FIN */
1466                 goto restart;
1467
1468 exit:
1469         release_sock(sk);
1470         return sz_copied ? sz_copied : res;
1471 }
1472
1473 /**
1474  * tipc_write_space - wake up thread if port congestion is released
1475  * @sk: socket
1476  */
1477 static void tipc_write_space(struct sock *sk)
1478 {
1479         struct socket_wq *wq;
1480
1481         rcu_read_lock();
1482         wq = rcu_dereference(sk->sk_wq);
1483         if (wq_has_sleeper(wq))
1484                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1485                                                 POLLWRNORM | POLLWRBAND);
1486         rcu_read_unlock();
1487 }
1488
1489 /**
1490  * tipc_data_ready - wake up threads to indicate messages have been received
1491  * @sk: socket
1492  * @len: the length of messages
1493  */
1494 static void tipc_data_ready(struct sock *sk)
1495 {
1496         struct socket_wq *wq;
1497
1498         rcu_read_lock();
1499         wq = rcu_dereference(sk->sk_wq);
1500         if (wq_has_sleeper(wq))
1501                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1502                                                 POLLRDNORM | POLLRDBAND);
1503         rcu_read_unlock();
1504 }
1505
1506 /**
1507  * filter_connect - Handle all incoming messages for a connection-based socket
1508  * @tsk: TIPC socket
1509  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1510  *
1511  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1512  */
1513 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
1514 {
1515         struct sock *sk = &tsk->sk;
1516         struct net *net = sock_net(sk);
1517         struct socket *sock = sk->sk_socket;
1518         struct tipc_msg *msg = buf_msg(*skb);
1519         int retval = -TIPC_ERR_NO_PORT;
1520
1521         if (msg_mcast(msg))
1522                 return retval;
1523
1524         switch ((int)sock->state) {
1525         case SS_CONNECTED:
1526                 /* Accept only connection-based messages sent by peer */
1527                 if (tsk_peer_msg(tsk, msg)) {
1528                         if (unlikely(msg_errcode(msg))) {
1529                                 sock->state = SS_DISCONNECTING;
1530                                 tsk->connected = 0;
1531                                 /* let timer expire on it's own */
1532                                 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1533                                                       tsk->portid);
1534                         }
1535                         retval = TIPC_OK;
1536                 }
1537                 break;
1538         case SS_CONNECTING:
1539                 /* Accept only ACK or NACK message */
1540
1541                 if (unlikely(!msg_connected(msg)))
1542                         break;
1543
1544                 if (unlikely(msg_errcode(msg))) {
1545                         sock->state = SS_DISCONNECTING;
1546                         sk->sk_err = ECONNREFUSED;
1547                         retval = TIPC_OK;
1548                         break;
1549                 }
1550
1551                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1552                         sock->state = SS_DISCONNECTING;
1553                         sk->sk_err = EINVAL;
1554                         retval = TIPC_OK;
1555                         break;
1556                 }
1557
1558                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1559                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1560                 sock->state = SS_CONNECTED;
1561
1562                 /* If an incoming message is an 'ACK-', it should be
1563                  * discarded here because it doesn't contain useful
1564                  * data. In addition, we should try to wake up
1565                  * connect() routine if sleeping.
1566                  */
1567                 if (msg_data_sz(msg) == 0) {
1568                         kfree_skb(*skb);
1569                         *skb = NULL;
1570                         if (waitqueue_active(sk_sleep(sk)))
1571                                 wake_up_interruptible(sk_sleep(sk));
1572                 }
1573                 retval = TIPC_OK;
1574                 break;
1575         case SS_LISTENING:
1576         case SS_UNCONNECTED:
1577                 /* Accept only SYN message */
1578                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1579                         retval = TIPC_OK;
1580                 break;
1581         case SS_DISCONNECTING:
1582                 break;
1583         default:
1584                 pr_err("Unknown socket state %u\n", sock->state);
1585         }
1586         return retval;
1587 }
1588
1589 /**
1590  * rcvbuf_limit - get proper overload limit of socket receive queue
1591  * @sk: socket
1592  * @buf: message
1593  *
1594  * For all connection oriented messages, irrespective of importance,
1595  * the default overload value (i.e. 67MB) is set as limit.
1596  *
1597  * For all connectionless messages, by default new queue limits are
1598  * as belows:
1599  *
1600  * TIPC_LOW_IMPORTANCE       (4 MB)
1601  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1602  * TIPC_HIGH_IMPORTANCE      (16 MB)
1603  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1604  *
1605  * Returns overload limit according to corresponding message importance
1606  */
1607 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1608 {
1609         struct tipc_msg *msg = buf_msg(buf);
1610
1611         if (msg_connected(msg))
1612                 return sysctl_tipc_rmem[2];
1613
1614         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1615                 msg_importance(msg);
1616 }
1617
1618 /**
1619  * filter_rcv - validate incoming message
1620  * @sk: socket
1621  * @skb: pointer to message. Set to NULL if buffer is consumed.
1622  *
1623  * Enqueues message on receive queue if acceptable; optionally handles
1624  * disconnect indication for a connected socket.
1625  *
1626  * Called with socket lock already taken
1627  *
1628  * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
1629  */
1630 static int filter_rcv(struct sock *sk, struct sk_buff **skb)
1631 {
1632         struct socket *sock = sk->sk_socket;
1633         struct tipc_sock *tsk = tipc_sk(sk);
1634         struct tipc_msg *msg = buf_msg(*skb);
1635         unsigned int limit = rcvbuf_limit(sk, *skb);
1636         int rc = TIPC_OK;
1637
1638         if (unlikely(msg_user(msg) == CONN_MANAGER)) {
1639                 tipc_sk_proto_rcv(tsk, skb);
1640                 return TIPC_OK;
1641         }
1642
1643         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1644                 kfree_skb(*skb);
1645                 tsk->link_cong = 0;
1646                 sk->sk_write_space(sk);
1647                 *skb = NULL;
1648                 return TIPC_OK;
1649         }
1650
1651         /* Reject message if it is wrong sort of message for socket */
1652         if (msg_type(msg) > TIPC_DIRECT_MSG)
1653                 return -TIPC_ERR_NO_PORT;
1654
1655         if (sock->state == SS_READY) {
1656                 if (msg_connected(msg))
1657                         return -TIPC_ERR_NO_PORT;
1658         } else {
1659                 rc = filter_connect(tsk, skb);
1660                 if (rc != TIPC_OK || !*skb)
1661                         return rc;
1662         }
1663
1664         /* Reject message if there isn't room to queue it */
1665         if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
1666                 return -TIPC_ERR_OVERLOAD;
1667
1668         /* Enqueue message */
1669         TIPC_SKB_CB(*skb)->handle = NULL;
1670         __skb_queue_tail(&sk->sk_receive_queue, *skb);
1671         skb_set_owner_r(*skb, sk);
1672
1673         sk->sk_data_ready(sk);
1674         *skb = NULL;
1675         return TIPC_OK;
1676 }
1677
1678 /**
1679  * tipc_backlog_rcv - handle incoming message from backlog queue
1680  * @sk: socket
1681  * @skb: message
1682  *
1683  * Caller must hold socket lock
1684  *
1685  * Returns 0
1686  */
1687 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1688 {
1689         int err;
1690         atomic_t *dcnt;
1691         u32 dnode;
1692         struct tipc_sock *tsk = tipc_sk(sk);
1693         struct net *net = sock_net(sk);
1694         uint truesize = skb->truesize;
1695
1696         err = filter_rcv(sk, &skb);
1697         if (likely(!skb)) {
1698                 dcnt = &tsk->dupl_rcvcnt;
1699                 if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1700                         atomic_add(truesize, dcnt);
1701                 return 0;
1702         }
1703         if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1704                 tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
1705         return 0;
1706 }
1707
1708 /**
1709  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1710  *                   inputq and try adding them to socket or backlog queue
1711  * @inputq: list of incoming buffers with potentially different destinations
1712  * @sk: socket where the buffers should be enqueued
1713  * @dport: port number for the socket
1714  * @_skb: returned buffer to be forwarded or rejected, if applicable
1715  *
1716  * Caller must hold socket lock
1717  *
1718  * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1719  * or -TIPC_ERR_NO_PORT
1720  */
1721 static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1722                            u32 dport, struct sk_buff **_skb)
1723 {
1724         unsigned int lim;
1725         atomic_t *dcnt;
1726         int err;
1727         struct sk_buff *skb;
1728         unsigned long time_limit = jiffies + 2;
1729
1730         while (skb_queue_len(inputq)) {
1731                 if (unlikely(time_after_eq(jiffies, time_limit)))
1732                         return TIPC_OK;
1733                 skb = tipc_skb_dequeue(inputq, dport);
1734                 if (unlikely(!skb))
1735                         return TIPC_OK;
1736                 if (!sock_owned_by_user(sk)) {
1737                         err = filter_rcv(sk, &skb);
1738                         if (likely(!skb))
1739                                 continue;
1740                         *_skb = skb;
1741                         return err;
1742                 }
1743                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1744                 if (sk->sk_backlog.len)
1745                         atomic_set(dcnt, 0);
1746                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1747                 if (likely(!sk_add_backlog(sk, skb, lim)))
1748                         continue;
1749                 *_skb = skb;
1750                 return -TIPC_ERR_OVERLOAD;
1751         }
1752         return TIPC_OK;
1753 }
1754
1755 /**
1756  * tipc_sk_rcv - handle a chain of incoming buffers
1757  * @inputq: buffer list containing the buffers
1758  * Consumes all buffers in list until inputq is empty
1759  * Note: may be called in multiple threads referring to the same queue
1760  * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1761  * Only node local calls check the return value, sending single-buffer queues
1762  */
1763 int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1764 {
1765         u32 dnode, dport = 0;
1766         int err = -TIPC_ERR_NO_PORT;
1767         struct sk_buff *skb;
1768         struct tipc_sock *tsk;
1769         struct tipc_net *tn;
1770         struct sock *sk;
1771
1772         while (skb_queue_len(inputq)) {
1773                 skb = NULL;
1774                 dport = tipc_skb_peek_port(inputq, dport);
1775                 tsk = tipc_sk_lookup(net, dport);
1776                 if (likely(tsk)) {
1777                         sk = &tsk->sk;
1778                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1779                                 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1780                                 spin_unlock_bh(&sk->sk_lock.slock);
1781                                 dport = 0;
1782                         }
1783                         sock_put(sk);
1784                 } else {
1785                         skb = tipc_skb_dequeue(inputq, dport);
1786                 }
1787                 if (likely(!skb))
1788                         continue;
1789                 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1790                         goto xmit;
1791                 if (!err) {
1792                         dnode = msg_destnode(buf_msg(skb));
1793                         goto xmit;
1794                 }
1795                 tn = net_generic(net, tipc_net_id);
1796                 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1797                         continue;
1798 xmit:
1799                 tipc_link_xmit_skb(net, skb, dnode, dport);
1800         }
1801         return err ? -EHOSTUNREACH : 0;
1802 }
1803
1804 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1805 {
1806         struct sock *sk = sock->sk;
1807         DEFINE_WAIT(wait);
1808         int done;
1809
1810         do {
1811                 int err = sock_error(sk);
1812                 if (err)
1813                         return err;
1814                 if (!*timeo_p)
1815                         return -ETIMEDOUT;
1816                 if (signal_pending(current))
1817                         return sock_intr_errno(*timeo_p);
1818
1819                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1820                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1821                 finish_wait(sk_sleep(sk), &wait);
1822         } while (!done);
1823         return 0;
1824 }
1825
1826 /**
1827  * tipc_connect - establish a connection to another TIPC port
1828  * @sock: socket structure
1829  * @dest: socket address for destination port
1830  * @destlen: size of socket address data structure
1831  * @flags: file-related flags associated with socket
1832  *
1833  * Returns 0 on success, errno otherwise
1834  */
1835 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1836                         int destlen, int flags)
1837 {
1838         struct sock *sk = sock->sk;
1839         struct tipc_sock *tsk = tipc_sk(sk);
1840         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1841         struct msghdr m = {NULL,};
1842         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1843         socket_state previous;
1844         int res = 0;
1845
1846         lock_sock(sk);
1847
1848         /* DGRAM/RDM connect(), just save the destaddr */
1849         if (sock->state == SS_READY) {
1850                 if (dst->family == AF_UNSPEC) {
1851                         memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1852                         tsk->connected = 0;
1853                 } else {
1854                         memcpy(&tsk->remote, dest, destlen);
1855                         tsk->connected = 1;
1856                 }
1857                 goto exit;
1858         }
1859
1860         /*
1861          * Reject connection attempt using multicast address
1862          *
1863          * Note: send_msg() validates the rest of the address fields,
1864          *       so there's no need to do it here
1865          */
1866         if (dst->addrtype == TIPC_ADDR_MCAST) {
1867                 res = -EINVAL;
1868                 goto exit;
1869         }
1870
1871         previous = sock->state;
1872         switch (sock->state) {
1873         case SS_UNCONNECTED:
1874                 /* Send a 'SYN-' to destination */
1875                 m.msg_name = dest;
1876                 m.msg_namelen = destlen;
1877
1878                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1879                  * indicate send_msg() is never blocked.
1880                  */
1881                 if (!timeout)
1882                         m.msg_flags = MSG_DONTWAIT;
1883
1884                 res = __tipc_sendmsg(sock, &m, 0);
1885                 if ((res < 0) && (res != -EWOULDBLOCK))
1886                         goto exit;
1887
1888                 /* Just entered SS_CONNECTING state; the only
1889                  * difference is that return value in non-blocking
1890                  * case is EINPROGRESS, rather than EALREADY.
1891                  */
1892                 res = -EINPROGRESS;
1893         case SS_CONNECTING:
1894                 if (previous == SS_CONNECTING)
1895                         res = -EALREADY;
1896                 if (!timeout)
1897                         goto exit;
1898                 timeout = msecs_to_jiffies(timeout);
1899                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1900                 res = tipc_wait_for_connect(sock, &timeout);
1901                 break;
1902         case SS_CONNECTED:
1903                 res = -EISCONN;
1904                 break;
1905         default:
1906                 res = -EINVAL;
1907                 break;
1908         }
1909 exit:
1910         release_sock(sk);
1911         return res;
1912 }
1913
1914 /**
1915  * tipc_listen - allow socket to listen for incoming connections
1916  * @sock: socket structure
1917  * @len: (unused)
1918  *
1919  * Returns 0 on success, errno otherwise
1920  */
1921 static int tipc_listen(struct socket *sock, int len)
1922 {
1923         struct sock *sk = sock->sk;
1924         int res;
1925
1926         lock_sock(sk);
1927
1928         if (sock->state != SS_UNCONNECTED)
1929                 res = -EINVAL;
1930         else {
1931                 sock->state = SS_LISTENING;
1932                 res = 0;
1933         }
1934
1935         release_sock(sk);
1936         return res;
1937 }
1938
1939 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1940 {
1941         struct sock *sk = sock->sk;
1942         DEFINE_WAIT(wait);
1943         int err;
1944
1945         /* True wake-one mechanism for incoming connections: only
1946          * one process gets woken up, not the 'whole herd'.
1947          * Since we do not 'race & poll' for established sockets
1948          * anymore, the common case will execute the loop only once.
1949         */
1950         for (;;) {
1951                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1952                                           TASK_INTERRUPTIBLE);
1953                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1954                         release_sock(sk);
1955                         timeo = schedule_timeout(timeo);
1956                         lock_sock(sk);
1957                 }
1958                 err = 0;
1959                 if (!skb_queue_empty(&sk->sk_receive_queue))
1960                         break;
1961                 err = -EINVAL;
1962                 if (sock->state != SS_LISTENING)
1963                         break;
1964                 err = -EAGAIN;
1965                 if (!timeo)
1966                         break;
1967                 err = sock_intr_errno(timeo);
1968                 if (signal_pending(current))
1969                         break;
1970         }
1971         finish_wait(sk_sleep(sk), &wait);
1972         return err;
1973 }
1974
1975 /**
1976  * tipc_accept - wait for connection request
1977  * @sock: listening socket
1978  * @newsock: new socket that is to be connected
1979  * @flags: file-related flags associated with socket
1980  *
1981  * Returns 0 on success, errno otherwise
1982  */
1983 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1984 {
1985         struct sock *new_sk, *sk = sock->sk;
1986         struct sk_buff *buf;
1987         struct tipc_sock *new_tsock;
1988         struct tipc_msg *msg;
1989         long timeo;
1990         int res;
1991
1992         lock_sock(sk);
1993
1994         if (sock->state != SS_LISTENING) {
1995                 res = -EINVAL;
1996                 goto exit;
1997         }
1998         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1999         res = tipc_wait_for_accept(sock, timeo);
2000         if (res)
2001                 goto exit;
2002
2003         buf = skb_peek(&sk->sk_receive_queue);
2004
2005         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2006         if (res)
2007                 goto exit;
2008
2009         new_sk = new_sock->sk;
2010         new_tsock = tipc_sk(new_sk);
2011         msg = buf_msg(buf);
2012
2013         /* we lock on new_sk; but lockdep sees the lock on sk */
2014         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2015
2016         /*
2017          * Reject any stray messages received by new socket
2018          * before the socket lock was taken (very, very unlikely)
2019          */
2020         tsk_rej_rx_queue(new_sk);
2021
2022         /* Connect new socket to it's peer */
2023         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2024         new_sock->state = SS_CONNECTED;
2025
2026         tsk_set_importance(new_tsock, msg_importance(msg));
2027         if (msg_named(msg)) {
2028                 new_tsock->conn_type = msg_nametype(msg);
2029                 new_tsock->conn_instance = msg_nameinst(msg);
2030         }
2031
2032         /*
2033          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2034          * Respond to 'SYN+' by queuing it on new socket.
2035          */
2036         if (!msg_data_sz(msg)) {
2037                 struct msghdr m = {NULL,};
2038
2039                 tsk_advance_rx_queue(sk);
2040                 __tipc_send_stream(new_sock, &m, 0);
2041         } else {
2042                 __skb_dequeue(&sk->sk_receive_queue);
2043                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2044                 skb_set_owner_r(buf, new_sk);
2045         }
2046         release_sock(new_sk);
2047 exit:
2048         release_sock(sk);
2049         return res;
2050 }
2051
2052 /**
2053  * tipc_shutdown - shutdown socket connection
2054  * @sock: socket structure
2055  * @how: direction to close (must be SHUT_RDWR)
2056  *
2057  * Terminates connection (if necessary), then purges socket's receive queue.
2058  *
2059  * Returns 0 on success, errno otherwise
2060  */
2061 static int tipc_shutdown(struct socket *sock, int how)
2062 {
2063         struct sock *sk = sock->sk;
2064         struct net *net = sock_net(sk);
2065         struct tipc_sock *tsk = tipc_sk(sk);
2066         struct sk_buff *skb;
2067         u32 dnode;
2068         int res;
2069
2070         if (how != SHUT_RDWR)
2071                 return -EINVAL;
2072
2073         lock_sock(sk);
2074
2075         switch (sock->state) {
2076         case SS_CONNECTING:
2077         case SS_CONNECTED:
2078
2079 restart:
2080                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2081                 skb = __skb_dequeue(&sk->sk_receive_queue);
2082                 if (skb) {
2083                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2084                                 kfree_skb(skb);
2085                                 goto restart;
2086                         }
2087                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2088                                              TIPC_CONN_SHUTDOWN))
2089                                 tipc_link_xmit_skb(net, skb, dnode,
2090                                                    tsk->portid);
2091                 } else {
2092                         dnode = tsk_peer_node(tsk);
2093
2094                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2095                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2096                                               0, dnode, tsk_own_node(tsk),
2097                                               tsk_peer_port(tsk),
2098                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2099                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
2100                 }
2101                 tsk->connected = 0;
2102                 sock->state = SS_DISCONNECTING;
2103                 tipc_node_remove_conn(net, dnode, tsk->portid);
2104                 /* fall through */
2105
2106         case SS_DISCONNECTING:
2107
2108                 /* Discard any unreceived messages */
2109                 __skb_queue_purge(&sk->sk_receive_queue);
2110
2111                 /* Wake up anyone sleeping in poll */
2112                 sk->sk_state_change(sk);
2113                 res = 0;
2114                 break;
2115
2116         default:
2117                 res = -ENOTCONN;
2118         }
2119
2120         release_sock(sk);
2121         return res;
2122 }
2123
2124 static void tipc_sk_timeout(unsigned long data)
2125 {
2126         struct tipc_sock *tsk = (struct tipc_sock *)data;
2127         struct sock *sk = &tsk->sk;
2128         struct sk_buff *skb = NULL;
2129         u32 peer_port, peer_node;
2130         u32 own_node = tsk_own_node(tsk);
2131
2132         bh_lock_sock(sk);
2133         if (!tsk->connected) {
2134                 bh_unlock_sock(sk);
2135                 goto exit;
2136         }
2137         peer_port = tsk_peer_port(tsk);
2138         peer_node = tsk_peer_node(tsk);
2139
2140         if (tsk->probing_state == TIPC_CONN_PROBING) {
2141                 /* Previous probe not answered -> self abort */
2142                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2143                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2144                                       own_node, peer_node, tsk->portid,
2145                                       peer_port, TIPC_ERR_NO_PORT);
2146         } else {
2147                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2148                                       INT_H_SIZE, 0, peer_node, own_node,
2149                                       peer_port, tsk->portid, TIPC_OK);
2150                 tsk->probing_state = TIPC_CONN_PROBING;
2151                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2152         }
2153         bh_unlock_sock(sk);
2154         if (skb)
2155                 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2156 exit:
2157         sock_put(sk);
2158 }
2159
2160 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2161                            struct tipc_name_seq const *seq)
2162 {
2163         struct net *net = sock_net(&tsk->sk);
2164         struct publication *publ;
2165         u32 key;
2166
2167         if (tsk->connected)
2168                 return -EINVAL;
2169         key = tsk->portid + tsk->pub_count + 1;
2170         if (key == tsk->portid)
2171                 return -EADDRINUSE;
2172
2173         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2174                                     scope, tsk->portid, key);
2175         if (unlikely(!publ))
2176                 return -EINVAL;
2177
2178         list_add(&publ->pport_list, &tsk->publications);
2179         tsk->pub_count++;
2180         tsk->published = 1;
2181         return 0;
2182 }
2183
2184 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2185                             struct tipc_name_seq const *seq)
2186 {
2187         struct net *net = sock_net(&tsk->sk);
2188         struct publication *publ;
2189         struct publication *safe;
2190         int rc = -EINVAL;
2191
2192         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2193                 if (seq) {
2194                         if (publ->scope != scope)
2195                                 continue;
2196                         if (publ->type != seq->type)
2197                                 continue;
2198                         if (publ->lower != seq->lower)
2199                                 continue;
2200                         if (publ->upper != seq->upper)
2201                                 break;
2202                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2203                                               publ->ref, publ->key);
2204                         rc = 0;
2205                         break;
2206                 }
2207                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2208                                       publ->ref, publ->key);
2209                 rc = 0;
2210         }
2211         if (list_empty(&tsk->publications))
2212                 tsk->published = 0;
2213         return rc;
2214 }
2215
2216 /* tipc_sk_reinit: set non-zero address in all existing sockets
2217  *                 when we go from standalone to network mode.
2218  */
2219 void tipc_sk_reinit(struct net *net)
2220 {
2221         struct tipc_net *tn = net_generic(net, tipc_net_id);
2222         const struct bucket_table *tbl;
2223         struct rhash_head *pos;
2224         struct tipc_sock *tsk;
2225         struct tipc_msg *msg;
2226         int i;
2227
2228         rcu_read_lock();
2229         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2230         for (i = 0; i < tbl->size; i++) {
2231                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2232                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2233                         msg = &tsk->phdr;
2234                         msg_set_prevnode(msg, tn->own_addr);
2235                         msg_set_orignode(msg, tn->own_addr);
2236                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2237                 }
2238         }
2239         rcu_read_unlock();
2240 }
2241
2242 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2243 {
2244         struct tipc_net *tn = net_generic(net, tipc_net_id);
2245         struct tipc_sock *tsk;
2246
2247         rcu_read_lock();
2248         tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2249         if (tsk)
2250                 sock_hold(&tsk->sk);
2251         rcu_read_unlock();
2252
2253         return tsk;
2254 }
2255
2256 static int tipc_sk_insert(struct tipc_sock *tsk)
2257 {
2258         struct sock *sk = &tsk->sk;
2259         struct net *net = sock_net(sk);
2260         struct tipc_net *tn = net_generic(net, tipc_net_id);
2261         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2262         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2263
2264         while (remaining--) {
2265                 portid++;
2266                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2267                         portid = TIPC_MIN_PORT;
2268                 tsk->portid = portid;
2269                 sock_hold(&tsk->sk);
2270                 if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2271                         return 0;
2272                 sock_put(&tsk->sk);
2273         }
2274
2275         return -1;
2276 }
2277
2278 static void tipc_sk_remove(struct tipc_sock *tsk)
2279 {
2280         struct sock *sk = &tsk->sk;
2281         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2282
2283         if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2284                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2285                 __sock_put(sk);
2286         }
2287 }
2288
2289 int tipc_sk_rht_init(struct net *net)
2290 {
2291         struct tipc_net *tn = net_generic(net, tipc_net_id);
2292         struct rhashtable_params rht_params = {
2293                 .nelem_hint = 192,
2294                 .head_offset = offsetof(struct tipc_sock, node),
2295                 .key_offset = offsetof(struct tipc_sock, portid),
2296                 .key_len = sizeof(u32), /* portid */
2297                 .hashfn = jhash,
2298                 .max_size = 1048576,
2299                 .min_size = 256,
2300         };
2301
2302         return rhashtable_init(&tn->sk_rht, &rht_params);
2303 }
2304
2305 void tipc_sk_rht_destroy(struct net *net)
2306 {
2307         struct tipc_net *tn = net_generic(net, tipc_net_id);
2308
2309         /* Wait for socket readers to complete */
2310         synchronize_net();
2311
2312         rhashtable_destroy(&tn->sk_rht);
2313 }
2314
2315 /**
2316  * tipc_setsockopt - set socket option
2317  * @sock: socket structure
2318  * @lvl: option level
2319  * @opt: option identifier
2320  * @ov: pointer to new option value
2321  * @ol: length of option value
2322  *
2323  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2324  * (to ease compatibility).
2325  *
2326  * Returns 0 on success, errno otherwise
2327  */
2328 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2329                            char __user *ov, unsigned int ol)
2330 {
2331         struct sock *sk = sock->sk;
2332         struct tipc_sock *tsk = tipc_sk(sk);
2333         u32 value;
2334         int res;
2335
2336         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2337                 return 0;
2338         if (lvl != SOL_TIPC)
2339                 return -ENOPROTOOPT;
2340         if (ol < sizeof(value))
2341                 return -EINVAL;
2342         res = get_user(value, (u32 __user *)ov);
2343         if (res)
2344                 return res;
2345
2346         lock_sock(sk);
2347
2348         switch (opt) {
2349         case TIPC_IMPORTANCE:
2350                 res = tsk_set_importance(tsk, value);
2351                 break;
2352         case TIPC_SRC_DROPPABLE:
2353                 if (sock->type != SOCK_STREAM)
2354                         tsk_set_unreliable(tsk, value);
2355                 else
2356                         res = -ENOPROTOOPT;
2357                 break;
2358         case TIPC_DEST_DROPPABLE:
2359                 tsk_set_unreturnable(tsk, value);
2360                 break;
2361         case TIPC_CONN_TIMEOUT:
2362                 tipc_sk(sk)->conn_timeout = value;
2363                 /* no need to set "res", since already 0 at this point */
2364                 break;
2365         default:
2366                 res = -EINVAL;
2367         }
2368
2369         release_sock(sk);
2370
2371         return res;
2372 }
2373
2374 /**
2375  * tipc_getsockopt - get socket option
2376  * @sock: socket structure
2377  * @lvl: option level
2378  * @opt: option identifier
2379  * @ov: receptacle for option value
2380  * @ol: receptacle for length of option value
2381  *
2382  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2383  * (to ease compatibility).
2384  *
2385  * Returns 0 on success, errno otherwise
2386  */
2387 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2388                            char __user *ov, int __user *ol)
2389 {
2390         struct sock *sk = sock->sk;
2391         struct tipc_sock *tsk = tipc_sk(sk);
2392         int len;
2393         u32 value;
2394         int res;
2395
2396         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2397                 return put_user(0, ol);
2398         if (lvl != SOL_TIPC)
2399                 return -ENOPROTOOPT;
2400         res = get_user(len, ol);
2401         if (res)
2402                 return res;
2403
2404         lock_sock(sk);
2405
2406         switch (opt) {
2407         case TIPC_IMPORTANCE:
2408                 value = tsk_importance(tsk);
2409                 break;
2410         case TIPC_SRC_DROPPABLE:
2411                 value = tsk_unreliable(tsk);
2412                 break;
2413         case TIPC_DEST_DROPPABLE:
2414                 value = tsk_unreturnable(tsk);
2415                 break;
2416         case TIPC_CONN_TIMEOUT:
2417                 value = tsk->conn_timeout;
2418                 /* no need to set "res", since already 0 at this point */
2419                 break;
2420         case TIPC_NODE_RECVQ_DEPTH:
2421                 value = 0; /* was tipc_queue_size, now obsolete */
2422                 break;
2423         case TIPC_SOCK_RECVQ_DEPTH:
2424                 value = skb_queue_len(&sk->sk_receive_queue);
2425                 break;
2426         default:
2427                 res = -EINVAL;
2428         }
2429
2430         release_sock(sk);
2431
2432         if (res)
2433                 return res;     /* "get" failed */
2434
2435         if (len < sizeof(value))
2436                 return -EINVAL;
2437
2438         if (copy_to_user(ov, &value, sizeof(value)))
2439                 return -EFAULT;
2440
2441         return put_user(sizeof(value), ol);
2442 }
2443
2444 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2445 {
2446         struct sock *sk = sock->sk;
2447         struct tipc_sioc_ln_req lnr;
2448         void __user *argp = (void __user *)arg;
2449
2450         switch (cmd) {
2451         case SIOCGETLINKNAME:
2452                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2453                         return -EFAULT;
2454                 if (!tipc_node_get_linkname(sock_net(sk),
2455                                             lnr.bearer_id & 0xffff, lnr.peer,
2456                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2457                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2458                                 return -EFAULT;
2459                         return 0;
2460                 }
2461                 return -EADDRNOTAVAIL;
2462         default:
2463                 return -ENOIOCTLCMD;
2464         }
2465 }
2466
2467 /* Protocol switches for the various types of TIPC sockets */
2468
2469 static const struct proto_ops msg_ops = {
2470         .owner          = THIS_MODULE,
2471         .family         = AF_TIPC,
2472         .release        = tipc_release,
2473         .bind           = tipc_bind,
2474         .connect        = tipc_connect,
2475         .socketpair     = sock_no_socketpair,
2476         .accept         = sock_no_accept,
2477         .getname        = tipc_getname,
2478         .poll           = tipc_poll,
2479         .ioctl          = tipc_ioctl,
2480         .listen         = sock_no_listen,
2481         .shutdown       = tipc_shutdown,
2482         .setsockopt     = tipc_setsockopt,
2483         .getsockopt     = tipc_getsockopt,
2484         .sendmsg        = tipc_sendmsg,
2485         .recvmsg        = tipc_recvmsg,
2486         .mmap           = sock_no_mmap,
2487         .sendpage       = sock_no_sendpage
2488 };
2489
2490 static const struct proto_ops packet_ops = {
2491         .owner          = THIS_MODULE,
2492         .family         = AF_TIPC,
2493         .release        = tipc_release,
2494         .bind           = tipc_bind,
2495         .connect        = tipc_connect,
2496         .socketpair     = sock_no_socketpair,
2497         .accept         = tipc_accept,
2498         .getname        = tipc_getname,
2499         .poll           = tipc_poll,
2500         .ioctl          = tipc_ioctl,
2501         .listen         = tipc_listen,
2502         .shutdown       = tipc_shutdown,
2503         .setsockopt     = tipc_setsockopt,
2504         .getsockopt     = tipc_getsockopt,
2505         .sendmsg        = tipc_send_packet,
2506         .recvmsg        = tipc_recvmsg,
2507         .mmap           = sock_no_mmap,
2508         .sendpage       = sock_no_sendpage
2509 };
2510
2511 static const struct proto_ops stream_ops = {
2512         .owner          = THIS_MODULE,
2513         .family         = AF_TIPC,
2514         .release        = tipc_release,
2515         .bind           = tipc_bind,
2516         .connect        = tipc_connect,
2517         .socketpair     = sock_no_socketpair,
2518         .accept         = tipc_accept,
2519         .getname        = tipc_getname,
2520         .poll           = tipc_poll,
2521         .ioctl          = tipc_ioctl,
2522         .listen         = tipc_listen,
2523         .shutdown       = tipc_shutdown,
2524         .setsockopt     = tipc_setsockopt,
2525         .getsockopt     = tipc_getsockopt,
2526         .sendmsg        = tipc_send_stream,
2527         .recvmsg        = tipc_recv_stream,
2528         .mmap           = sock_no_mmap,
2529         .sendpage       = sock_no_sendpage
2530 };
2531
2532 static const struct net_proto_family tipc_family_ops = {
2533         .owner          = THIS_MODULE,
2534         .family         = AF_TIPC,
2535         .create         = tipc_sk_create
2536 };
2537
2538 static struct proto tipc_proto = {
2539         .name           = "TIPC",
2540         .owner          = THIS_MODULE,
2541         .obj_size       = sizeof(struct tipc_sock),
2542         .sysctl_rmem    = sysctl_tipc_rmem
2543 };
2544
2545 /**
2546  * tipc_socket_init - initialize TIPC socket interface
2547  *
2548  * Returns 0 on success, errno otherwise
2549  */
2550 int tipc_socket_init(void)
2551 {
2552         int res;
2553
2554         res = proto_register(&tipc_proto, 1);
2555         if (res) {
2556                 pr_err("Failed to register TIPC protocol type\n");
2557                 goto out;
2558         }
2559
2560         res = sock_register(&tipc_family_ops);
2561         if (res) {
2562                 pr_err("Failed to register TIPC socket type\n");
2563                 proto_unregister(&tipc_proto);
2564                 goto out;
2565         }
2566  out:
2567         return res;
2568 }
2569
2570 /**
2571  * tipc_socket_stop - stop TIPC socket interface
2572  */
2573 void tipc_socket_stop(void)
2574 {
2575         sock_unregister(tipc_family_ops.family);
2576         proto_unregister(&tipc_proto);
2577 }
2578
2579 /* Caller should hold socket lock for the passed tipc socket. */
2580 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2581 {
2582         u32 peer_node;
2583         u32 peer_port;
2584         struct nlattr *nest;
2585
2586         peer_node = tsk_peer_node(tsk);
2587         peer_port = tsk_peer_port(tsk);
2588
2589         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2590
2591         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2592                 goto msg_full;
2593         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2594                 goto msg_full;
2595
2596         if (tsk->conn_type != 0) {
2597                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2598                         goto msg_full;
2599                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2600                         goto msg_full;
2601                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2602                         goto msg_full;
2603         }
2604         nla_nest_end(skb, nest);
2605
2606         return 0;
2607
2608 msg_full:
2609         nla_nest_cancel(skb, nest);
2610
2611         return -EMSGSIZE;
2612 }
2613
2614 /* Caller should hold socket lock for the passed tipc socket. */
2615 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2616                             struct tipc_sock *tsk)
2617 {
2618         int err;
2619         void *hdr;
2620         struct nlattr *attrs;
2621         struct net *net = sock_net(skb->sk);
2622         struct tipc_net *tn = net_generic(net, tipc_net_id);
2623
2624         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2625                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2626         if (!hdr)
2627                 goto msg_cancel;
2628
2629         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2630         if (!attrs)
2631                 goto genlmsg_cancel;
2632         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2633                 goto attr_msg_cancel;
2634         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2635                 goto attr_msg_cancel;
2636
2637         if (tsk->connected) {
2638                 err = __tipc_nl_add_sk_con(skb, tsk);
2639                 if (err)
2640                         goto attr_msg_cancel;
2641         } else if (!list_empty(&tsk->publications)) {
2642                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2643                         goto attr_msg_cancel;
2644         }
2645         nla_nest_end(skb, attrs);
2646         genlmsg_end(skb, hdr);
2647
2648         return 0;
2649
2650 attr_msg_cancel:
2651         nla_nest_cancel(skb, attrs);
2652 genlmsg_cancel:
2653         genlmsg_cancel(skb, hdr);
2654 msg_cancel:
2655         return -EMSGSIZE;
2656 }
2657
2658 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2659 {
2660         int err;
2661         struct tipc_sock *tsk;
2662         const struct bucket_table *tbl;
2663         struct rhash_head *pos;
2664         struct net *net = sock_net(skb->sk);
2665         struct tipc_net *tn = net_generic(net, tipc_net_id);
2666         u32 tbl_id = cb->args[0];
2667         u32 prev_portid = cb->args[1];
2668
2669         rcu_read_lock();
2670         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2671         for (; tbl_id < tbl->size; tbl_id++) {
2672                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2673                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2674                         if (prev_portid && prev_portid != tsk->portid) {
2675                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2676                                 continue;
2677                         }
2678
2679                         err = __tipc_nl_add_sk(skb, cb, tsk);
2680                         if (err) {
2681                                 prev_portid = tsk->portid;
2682                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2683                                 goto out;
2684                         }
2685                         prev_portid = 0;
2686                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2687                 }
2688         }
2689 out:
2690         rcu_read_unlock();
2691         cb->args[0] = tbl_id;
2692         cb->args[1] = prev_portid;
2693
2694         return skb->len;
2695 }
2696
2697 /* Caller should hold socket lock for the passed tipc socket. */
2698 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2699                                  struct netlink_callback *cb,
2700                                  struct publication *publ)
2701 {
2702         void *hdr;
2703         struct nlattr *attrs;
2704
2705         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2706                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2707         if (!hdr)
2708                 goto msg_cancel;
2709
2710         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2711         if (!attrs)
2712                 goto genlmsg_cancel;
2713
2714         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2715                 goto attr_msg_cancel;
2716         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2717                 goto attr_msg_cancel;
2718         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2719                 goto attr_msg_cancel;
2720         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2721                 goto attr_msg_cancel;
2722
2723         nla_nest_end(skb, attrs);
2724         genlmsg_end(skb, hdr);
2725
2726         return 0;
2727
2728 attr_msg_cancel:
2729         nla_nest_cancel(skb, attrs);
2730 genlmsg_cancel:
2731         genlmsg_cancel(skb, hdr);
2732 msg_cancel:
2733         return -EMSGSIZE;
2734 }
2735
2736 /* Caller should hold socket lock for the passed tipc socket. */
2737 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2738                                   struct netlink_callback *cb,
2739                                   struct tipc_sock *tsk, u32 *last_publ)
2740 {
2741         int err;
2742         struct publication *p;
2743
2744         if (*last_publ) {
2745                 list_for_each_entry(p, &tsk->publications, pport_list) {
2746                         if (p->key == *last_publ)
2747                                 break;
2748                 }
2749                 if (p->key != *last_publ) {
2750                         /* We never set seq or call nl_dump_check_consistent()
2751                          * this means that setting prev_seq here will cause the
2752                          * consistence check to fail in the netlink callback
2753                          * handler. Resulting in the last NLMSG_DONE message
2754                          * having the NLM_F_DUMP_INTR flag set.
2755                          */
2756                         cb->prev_seq = 1;
2757                         *last_publ = 0;
2758                         return -EPIPE;
2759                 }
2760         } else {
2761                 p = list_first_entry(&tsk->publications, struct publication,
2762                                      pport_list);
2763         }
2764
2765         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2766                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2767                 if (err) {
2768                         *last_publ = p->key;
2769                         return err;
2770                 }
2771         }
2772         *last_publ = 0;
2773
2774         return 0;
2775 }
2776
2777 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2778 {
2779         int err;
2780         u32 tsk_portid = cb->args[0];
2781         u32 last_publ = cb->args[1];
2782         u32 done = cb->args[2];
2783         struct net *net = sock_net(skb->sk);
2784         struct tipc_sock *tsk;
2785
2786         if (!tsk_portid) {
2787                 struct nlattr **attrs;
2788                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2789
2790                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2791                 if (err)
2792                         return err;
2793
2794                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2795                                        attrs[TIPC_NLA_SOCK],
2796                                        tipc_nl_sock_policy);
2797                 if (err)
2798                         return err;
2799
2800                 if (!sock[TIPC_NLA_SOCK_REF])
2801                         return -EINVAL;
2802
2803                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2804         }
2805
2806         if (done)
2807                 return 0;
2808
2809         tsk = tipc_sk_lookup(net, tsk_portid);
2810         if (!tsk)
2811                 return -EINVAL;
2812
2813         lock_sock(&tsk->sk);
2814         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2815         if (!err)
2816                 done = 1;
2817         release_sock(&tsk->sk);
2818         sock_put(&tsk->sk);
2819
2820         cb->args[0] = tsk_portid;
2821         cb->args[1] = last_publ;
2822         cb->args[2] = done;
2823
2824         return skb->len;
2825 }
This page took 0.191864 seconds and 4 git commands to generate.