Lines Matching +full:multi +full:- +full:cluster
1 // SPDX-License-Identifier: GPL-2.0-only
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
15 * This is the "low-level" comms layer.
18 * from other nodes in the cluster.
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
25 * whatever it needs for inter-node communication.
29 * up to the mid-level comms layer (which understands the
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
114 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
121 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
122 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
214 assert_spin_locked(&con->writequeue_lock); in lowcomms_queue_swork()
216 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_swork()
217 !test_bit(CF_APP_LIMITED, &con->flags) && in lowcomms_queue_swork()
218 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) in lowcomms_queue_swork()
219 queue_work(io_workqueue, &con->swork); in lowcomms_queue_swork()
225 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); in lowcomms_queue_rwork()
228 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_rwork()
229 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) in lowcomms_queue_rwork()
230 queue_work(io_workqueue, &con->rwork); in lowcomms_queue_rwork()
237 INIT_LIST_HEAD(&entry->msgs); in writequeue_entry_ctor()
256 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, in con_next_wq()
261 if (!e || e->users || e->len == 0) in con_next_wq()
272 if (con->nodeid == nodeid) in __find_con()
281 con->nodeid = nodeid; in dlm_con_init()
282 init_rwsem(&con->sock_lock); in dlm_con_init()
283 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
284 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
285 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
286 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
287 spin_lock_init(&con->addrs_lock); in dlm_con_init()
288 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
325 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
334 switch (x->ss_family) { in addr_compare()
338 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) in addr_compare()
340 if (sinx->sin_port != siny->sin_port) in addr_compare()
347 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) in addr_compare()
349 if (sinx->sin6_port != siny->sin6_port) in addr_compare()
368 return -1; in nodeid_to_addr()
374 return -ENOENT; in nodeid_to_addr()
377 spin_lock(&con->addrs_lock); in nodeid_to_addr()
378 if (!con->addr_count) { in nodeid_to_addr()
379 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
381 return -ENOENT; in nodeid_to_addr()
384 memcpy(&sas, &con->addr[con->curr_addr_index], in nodeid_to_addr()
388 con->curr_addr_index++; in nodeid_to_addr()
389 if (con->curr_addr_index == con->addr_count) in nodeid_to_addr()
390 con->curr_addr_index = 0; in nodeid_to_addr()
393 *mark = con->mark; in nodeid_to_addr()
394 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
407 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; in nodeid_to_addr()
411 ret6->sin6_addr = in6->sin6_addr; in nodeid_to_addr()
427 WARN_ON_ONCE(!con->addr_count); in addr_to_nodeid()
429 spin_lock(&con->addrs_lock); in addr_to_nodeid()
430 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { in addr_to_nodeid()
431 if (addr_compare(&con->addr[addr_i], addr)) { in addr_to_nodeid()
432 *nodeid = con->nodeid; in addr_to_nodeid()
433 *mark = con->mark; in addr_to_nodeid()
434 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
439 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
444 return -ENOENT; in addr_to_nodeid()
452 for (i = 0; i < con->addr_count; i++) { in dlm_lowcomms_con_has_addr()
453 if (addr_compare(&con->addr[i], addr)) in dlm_lowcomms_con_has_addr()
470 return -ENOMEM; in dlm_lowcomms_addr()
473 spin_lock(&con->addrs_lock); in dlm_lowcomms_addr()
474 if (!con->addr_count) { in dlm_lowcomms_addr()
475 memcpy(&con->addr[0], addr, sizeof(*addr)); in dlm_lowcomms_addr()
476 con->addr_count = 1; in dlm_lowcomms_addr()
477 con->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
478 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
485 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
487 return -EEXIST; in dlm_lowcomms_addr()
490 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
491 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
493 return -ENOSPC; in dlm_lowcomms_addr()
496 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); in dlm_lowcomms_addr()
498 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
509 set_bit(CF_RECV_INTR, &con->flags); in lowcomms_data_ready()
517 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
519 spin_lock_bh(&con->writequeue_lock); in lowcomms_write_space()
520 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
521 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
522 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
526 spin_unlock_bh(&con->writequeue_lock); in lowcomms_write_space()
534 if (sk->sk_shutdown == RCV_SHUTDOWN) in lowcomms_state_change()
554 return -ENOENT; in dlm_lowcomms_connect_node()
557 down_read(&con->sock_lock); in dlm_lowcomms_connect_node()
558 if (!con->sock) { in dlm_lowcomms_connect_node()
559 spin_lock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
561 spin_unlock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
563 up_read(&con->sock_lock); in dlm_lowcomms_connect_node()
579 return -ENOENT; in dlm_lowcomms_nodes_set_mark()
582 spin_lock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
583 con->mark = mark; in dlm_lowcomms_nodes_set_mark()
584 spin_unlock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
595 switch (sk->sk_family) { in lowcomms_error_report()
600 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
601 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
602 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
609 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
610 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
611 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
618 sk->sk_family, sk->sk_err, in lowcomms_error_report()
619 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
623 dlm_midcomms_unack_msg_resend(con->nodeid); in lowcomms_error_report()
634 sk->sk_user_data = NULL; in restore_callbacks()
635 sk->sk_data_ready = listen_sock.sk_data_ready; in restore_callbacks()
636 sk->sk_state_change = listen_sock.sk_state_change; in restore_callbacks()
637 sk->sk_write_space = listen_sock.sk_write_space; in restore_callbacks()
638 sk->sk_error_report = listen_sock.sk_error_report; in restore_callbacks()
644 struct sock *sk = sock->sk; in add_sock()
647 con->sock = sock; in add_sock()
649 sk->sk_user_data = con; in add_sock()
650 sk->sk_data_ready = lowcomms_data_ready; in add_sock()
651 sk->sk_write_space = lowcomms_write_space; in add_sock()
653 sk->sk_state_change = lowcomms_state_change; in add_sock()
654 sk->sk_allocation = GFP_NOFS; in add_sock()
655 sk->sk_use_task_frag = false; in add_sock()
656 sk->sk_error_report = lowcomms_error_report; in add_sock()
665 saddr->ss_family = dlm_local_addr[0].ss_family; in make_sockaddr()
666 if (saddr->ss_family == AF_INET) { in make_sockaddr()
668 in4_addr->sin_port = cpu_to_be16(port); in make_sockaddr()
670 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); in make_sockaddr()
673 in6_addr->sin6_port = cpu_to_be16(port); in make_sockaddr()
676 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); in make_sockaddr()
684 __free_page(e->page); in dlm_page_release()
692 kref_put(&msg->entry->ref, dlm_page_release); in dlm_msg_release()
700 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { in free_entry()
701 if (msg->orig_msg) { in free_entry()
702 msg->orig_msg->retransmit = false; in free_entry()
703 kref_put(&msg->orig_msg->ref, dlm_msg_release); in free_entry()
706 list_del(&msg->list); in free_entry()
707 kref_put(&msg->ref, dlm_msg_release); in free_entry()
710 list_del(&e->list); in free_entry()
711 kref_put(&e->ref, dlm_page_release); in free_entry()
716 lock_sock((*sock)->sk); in dlm_close_sock()
717 restore_callbacks((*sock)->sk); in dlm_close_sock()
718 release_sock((*sock)->sk); in dlm_close_sock()
726 if (con->othercon) in allow_connection_io()
727 clear_bit(CF_IO_STOP, &con->othercon->flags); in allow_connection_io()
728 clear_bit(CF_IO_STOP, &con->flags); in allow_connection_io()
733 if (con->othercon) in stop_connection_io()
734 stop_connection_io(con->othercon); in stop_connection_io()
736 spin_lock_bh(&con->writequeue_lock); in stop_connection_io()
737 set_bit(CF_IO_STOP, &con->flags); in stop_connection_io()
738 spin_unlock_bh(&con->writequeue_lock); in stop_connection_io()
740 down_write(&con->sock_lock); in stop_connection_io()
741 if (con->sock) { in stop_connection_io()
742 lock_sock(con->sock->sk); in stop_connection_io()
743 restore_callbacks(con->sock->sk); in stop_connection_io()
744 release_sock(con->sock->sk); in stop_connection_io()
746 up_write(&con->sock_lock); in stop_connection_io()
748 cancel_work_sync(&con->swork); in stop_connection_io()
749 cancel_work_sync(&con->rwork); in stop_connection_io()
757 if (con->othercon && and_other) in close_connection()
758 close_connection(con->othercon, false); in close_connection()
760 down_write(&con->sock_lock); in close_connection()
761 if (!con->sock) { in close_connection()
762 up_write(&con->sock_lock); in close_connection()
766 dlm_close_sock(&con->sock); in close_connection()
779 spin_lock_bh(&con->writequeue_lock); in close_connection()
780 if (!list_empty(&con->writequeue)) { in close_connection()
781 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
783 if (e->dirty) in close_connection()
786 spin_unlock_bh(&con->writequeue_lock); in close_connection()
788 con->rx_leftover = 0; in close_connection()
789 con->retries = 0; in close_connection()
790 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
791 clear_bit(CF_RECV_PENDING, &con->flags); in close_connection()
792 clear_bit(CF_SEND_PENDING, &con->flags); in close_connection()
793 up_write(&con->sock_lock); in close_connection()
800 if (con->othercon && and_other) in shutdown_connection()
801 shutdown_connection(con->othercon, false); in shutdown_connection()
804 down_read(&con->sock_lock); in shutdown_connection()
806 if (!con->sock) { in shutdown_connection()
807 up_read(&con->sock_lock); in shutdown_connection()
811 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
812 up_read(&con->sock_lock); in shutdown_connection()
818 ret = wait_event_timeout(con->shutdown_wait, !con->sock, in shutdown_connection()
842 pentry->buf = kmalloc(buflen, GFP_NOFS); in new_processqueue_entry()
843 if (!pentry->buf) { in new_processqueue_entry()
848 pentry->nodeid = nodeid; in new_processqueue_entry()
854 kfree(pentry->buf); in free_processqueue_entry()
877 list_del(&pentry->list); in process_dlm_messages()
881 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, in process_dlm_messages()
882 pentry->buflen); in process_dlm_messages()
894 list_del(&pentry->list); in process_dlm_messages()
907 pentry = new_processqueue_entry(con->nodeid, buflen); in receive_from_sock()
911 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); in receive_from_sock()
916 iov.iov_base = pentry->buf + con->rx_leftover; in receive_from_sock()
917 iov.iov_len = buflen - con->rx_leftover; in receive_from_sock()
921 clear_bit(CF_RECV_INTR, &con->flags); in receive_from_sock()
923 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
925 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
926 if (ret == -EAGAIN) { in receive_from_sock()
927 lock_sock(con->sock->sk); in receive_from_sock()
928 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { in receive_from_sock()
929 release_sock(con->sock->sk); in receive_from_sock()
933 clear_bit(CF_RECV_PENDING, &con->flags); in receive_from_sock()
934 release_sock(con->sock->sk); in receive_from_sock()
947 buflen_real = ret + con->rx_leftover; in receive_from_sock()
948 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, in receive_from_sock()
955 pentry->buflen = ret; in receive_from_sock()
961 con->rx_leftover = buflen_real - ret; in receive_from_sock()
962 memmove(con->rx_leftover_buf, pentry->buf + ret, in receive_from_sock()
963 con->rx_leftover); in receive_from_sock()
966 list_add_tail(&pentry->list, &processqueue); in receive_from_sock()
986 if (result == -EAGAIN) in accept_from_sock()
993 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); in accept_from_sock()
995 result = -ECONNABORTED; in accept_from_sock()
1006 log_print("connect from non cluster IPv4 node %pI4", in accept_from_sock()
1007 &sin->sin_addr); in accept_from_sock()
1014 log_print("connect from non cluster IPv6 node %pI6c", in accept_from_sock()
1015 &sin6->sin6_addr); in accept_from_sock()
1020 log_print("invalid family from non cluster node"); in accept_from_sock()
1025 return -1; in accept_from_sock()
1039 result = -ENOENT; in accept_from_sock()
1043 sock_set_mark(newsock->sk, mark); in accept_from_sock()
1045 down_write(&newcon->sock_lock); in accept_from_sock()
1046 if (newcon->sock) { in accept_from_sock()
1047 struct connection *othercon = newcon->othercon; in accept_from_sock()
1053 up_write(&newcon->sock_lock); in accept_from_sock()
1055 result = -ENOMEM; in accept_from_sock()
1060 lockdep_set_subclass(&othercon->sock_lock, 1); in accept_from_sock()
1061 newcon->othercon = othercon; in accept_from_sock()
1062 set_bit(CF_IS_OTHERCON, &othercon->flags); in accept_from_sock()
1068 down_write(&othercon->sock_lock); in accept_from_sock()
1072 lock_sock(othercon->sock->sk); in accept_from_sock()
1074 release_sock(othercon->sock->sk); in accept_from_sock()
1075 up_write(&othercon->sock_lock); in accept_from_sock()
1084 lock_sock(newcon->sock->sk); in accept_from_sock()
1086 release_sock(newcon->sock->sk); in accept_from_sock()
1088 up_write(&newcon->sock_lock); in accept_from_sock()
1101 * writequeue_entry_complete - try to delete and free write queue entry
1109 e->offset += completed; in writequeue_entry_complete()
1110 e->len -= completed; in writequeue_entry_complete()
1112 e->dirty = true; in writequeue_entry_complete()
1114 if (e->len == 0 && e->users == 0) in writequeue_entry_complete()
1119 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1134 result = sock_bind_add(sock->sk, addr, addr_len); in sctp_bind_addrs()
1168 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); in new_writequeue_entry()
1169 if (!entry->page) { in new_writequeue_entry()
1174 entry->offset = 0; in new_writequeue_entry()
1175 entry->len = 0; in new_writequeue_entry()
1176 entry->end = 0; in new_writequeue_entry()
1177 entry->dirty = false; in new_writequeue_entry()
1178 entry->con = con; in new_writequeue_entry()
1179 entry->users = 1; in new_writequeue_entry()
1180 kref_init(&entry->ref); in new_writequeue_entry()
1190 spin_lock_bh(&con->writequeue_lock); in new_wq_entry()
1191 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1192 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1194 kref_get(&e->ref); in new_wq_entry()
1196 *ppc = page_address(e->page) + e->end; in new_wq_entry()
1200 e->end += len; in new_wq_entry()
1201 e->users++; in new_wq_entry()
1210 kref_get(&e->ref); in new_wq_entry()
1211 *ppc = page_address(e->page); in new_wq_entry()
1212 e->end += len; in new_wq_entry()
1216 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1219 spin_unlock_bh(&con->writequeue_lock); in new_wq_entry()
1235 kref_init(&msg->ref); in dlm_lowcomms_new_msg_con()
1243 msg->retransmit = false; in dlm_lowcomms_new_msg_con()
1244 msg->orig_msg = NULL; in dlm_lowcomms_new_msg_con()
1245 msg->ppc = *ppc; in dlm_lowcomms_new_msg_con()
1246 msg->len = len; in dlm_lowcomms_new_msg_con()
1247 msg->entry = e; in dlm_lowcomms_new_msg_con()
1286 kref_get(&msg->ref); in dlm_lowcomms_new_msg()
1288 msg->idx = idx; in dlm_lowcomms_new_msg()
1295 struct writequeue_entry *e = msg->entry; in _dlm_lowcomms_commit_msg()
1296 struct connection *con = e->con; in _dlm_lowcomms_commit_msg()
1299 spin_lock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1300 kref_get(&msg->ref); in _dlm_lowcomms_commit_msg()
1301 list_add(&msg->list, &e->msgs); in _dlm_lowcomms_commit_msg()
1303 users = --e->users; in _dlm_lowcomms_commit_msg()
1307 e->len = DLM_WQ_LENGTH_BYTES(e); in _dlm_lowcomms_commit_msg()
1312 spin_unlock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1323 srcu_read_unlock(&connections_srcu, msg->idx); in dlm_lowcomms_commit_msg()
1325 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_commit_msg()
1331 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_put_msg()
1340 if (msg->retransmit) in dlm_lowcomms_resend_msg()
1343 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, in dlm_lowcomms_resend_msg()
1346 return -ENOMEM; in dlm_lowcomms_resend_msg()
1348 msg->retransmit = true; in dlm_lowcomms_resend_msg()
1349 kref_get(&msg->ref); in dlm_lowcomms_resend_msg()
1350 msg_resend->orig_msg = msg; in dlm_lowcomms_resend_msg()
1352 memcpy(ppc, msg->ppc, msg->len); in dlm_lowcomms_resend_msg()
1369 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1372 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1373 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1377 len = e->len; in send_to_sock()
1378 offset = e->offset; in send_to_sock()
1379 WARN_ON_ONCE(len == 0 && e->users == 0); in send_to_sock()
1380 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1382 bvec_set_page(&bvec, e->page, len, offset); in send_to_sock()
1384 ret = sock_sendmsg(con->sock, &msg); in send_to_sock()
1385 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1386 if (ret == -EAGAIN || ret == 0) { in send_to_sock()
1387 lock_sock(con->sock->sk); in send_to_sock()
1388 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1389 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1390 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1394 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); in send_to_sock()
1395 con->sock->sk->sk_write_pending++; in send_to_sock()
1397 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1398 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1399 release_sock(con->sock->sk); in send_to_sock()
1404 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1405 release_sock(con->sock->sk); in send_to_sock()
1412 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1414 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1423 spin_lock_bh(&con->writequeue_lock); in clean_one_writequeue()
1424 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1427 spin_unlock_bh(&con->writequeue_lock); in clean_one_writequeue()
1434 WARN_ON_ONCE(!list_empty(&con->writequeue)); in connection_release()
1435 WARN_ON_ONCE(con->sock); in connection_release()
1440 left the cluster */
1452 return -ENOENT; in dlm_lowcomms_close()
1460 hlist_del_rcu(&con->list); in dlm_lowcomms_close()
1464 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_close()
1465 if (con->othercon) { in dlm_lowcomms_close()
1466 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1467 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); in dlm_lowcomms_close()
1486 down_read(&con->sock_lock); in process_recv_sockets()
1487 if (!con->sock) { in process_recv_sockets()
1488 up_read(&con->sock_lock); in process_recv_sockets()
1496 up_read(&con->sock_lock); in process_recv_sockets()
1504 wake_up(&con->shutdown_wait); in process_recv_sockets()
1509 queue_work(io_workqueue, &con->rwork); in process_recv_sockets()
1514 if (test_bit(CF_IS_OTHERCON, &con->flags)) { in process_recv_sockets()
1517 spin_lock_bh(&con->writequeue_lock); in process_recv_sockets()
1519 spin_unlock_bh(&con->writequeue_lock); in process_recv_sockets()
1557 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1558 dlm_proto_ops->try_new_addr, &mark); in dlm_connect()
1560 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1566 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_connect()
1570 sock_set_mark(sock->sk, mark); in dlm_connect()
1571 dlm_proto_ops->sockopts(sock); in dlm_connect()
1573 result = dlm_proto_ops->bind(sock); in dlm_connect()
1581 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1583 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, in dlm_connect()
1586 case -EINPROGRESS: in dlm_connect()
1593 dlm_close_sock(&con->sock); in dlm_connect()
1607 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1609 down_read(&con->sock_lock); in process_send_sockets()
1610 if (!con->sock) { in process_send_sockets()
1611 up_read(&con->sock_lock); in process_send_sockets()
1612 down_write(&con->sock_lock); in process_send_sockets()
1613 if (!con->sock) { in process_send_sockets()
1618 case -EINPROGRESS: in process_send_sockets()
1627 up_write(&con->sock_lock); in process_send_sockets()
1629 con->nodeid, con->retries++, ret); in process_send_sockets()
1632 * future we should send a event to cluster in process_send_sockets()
1636 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1640 downgrade_write(&con->sock_lock); in process_send_sockets()
1646 up_read(&con->sock_lock); in process_send_sockets()
1655 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1662 spin_lock_bh(&con->writequeue_lock); in process_send_sockets()
1664 spin_unlock_bh(&con->writequeue_lock); in process_send_sockets()
1692 return -ENOMEM; in work_start()
1704 return -ENOMEM; in work_start()
1716 lock_sock(listen_con.sock->sk); in dlm_lowcomms_shutdown()
1717 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; in dlm_lowcomms_shutdown()
1718 release_sock(listen_con.sock->sk); in dlm_lowcomms_shutdown()
1732 if (con->othercon) in dlm_lowcomms_shutdown()
1733 clean_one_writequeue(con->othercon); in dlm_lowcomms_shutdown()
1752 dlm_proto_ops->name); in dlm_listen_for_all()
1754 result = dlm_proto_ops->listen_validate(); in dlm_listen_for_all()
1759 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_listen_for_all()
1765 sock_set_mark(sock->sk, dlm_config.ci_mark); in dlm_listen_for_all()
1766 dlm_proto_ops->listen_sockopts(sock); in dlm_listen_for_all()
1768 result = dlm_proto_ops->listen_bind(sock); in dlm_listen_for_all()
1772 lock_sock(sock->sk); in dlm_listen_for_all()
1773 listen_sock.sk_data_ready = sock->sk->sk_data_ready; in dlm_listen_for_all()
1774 listen_sock.sk_write_space = sock->sk->sk_write_space; in dlm_listen_for_all()
1775 listen_sock.sk_error_report = sock->sk->sk_error_report; in dlm_listen_for_all()
1776 listen_sock.sk_state_change = sock->sk->sk_state_change; in dlm_listen_for_all()
1780 sock->sk->sk_allocation = GFP_NOFS; in dlm_listen_for_all()
1781 sock->sk->sk_use_task_frag = false; in dlm_listen_for_all()
1782 sock->sk->sk_data_ready = lowcomms_listen_data_ready; in dlm_listen_for_all()
1783 release_sock(sock->sk); in dlm_listen_for_all()
1785 result = sock->ops->listen(sock, 128); in dlm_listen_for_all()
1803 /* Bind to our cluster-known address connecting to avoid in dlm_tcp_bind()
1827 /* We don't support multi-homed hosts */ in dlm_tcp_listen_validate()
1829 log_print("Detect multi-homed hosts but use only the first IP address."); in dlm_tcp_listen_validate()
1830 log_print("Try SCTP, if you want to enable multi-link."); in dlm_tcp_listen_validate()
1839 tcp_sock_set_nodelay(sock->sk); in dlm_tcp_sockopts()
1845 sock_set_reuseaddr(sock->sk); in dlm_tcp_listen_sockopts()
1884 sock_set_sndtimeo(sock->sk, 5); in dlm_sctp_connect()
1886 sock_set_sndtimeo(sock->sk, 0); in dlm_sctp_connect()
1894 return -EOPNOTSUPP; in dlm_sctp_listen_validate()
1909 sctp_sock_set_nodelay(sock->sk); in dlm_sctp_sockopts()
1910 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); in dlm_sctp_sockopts()
1931 error = -ENOTCONN; in dlm_lowcomms_start()
1951 error = -EINVAL; in dlm_lowcomms_start()
1988 hlist_del_rcu(&con->list); in dlm_lowcomms_exit()
1991 if (con->othercon) in dlm_lowcomms_exit()
1992 call_srcu(&connections_srcu, &con->othercon->rcu, in dlm_lowcomms_exit()
1994 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_exit()